mqtt.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package wrapper
  2. import (
  3. "crypto/tls"
  4. "encoding/json"
  5. "errors"
  6. "github.com/eclipse/paho.mqtt.golang"
  7. "strings"
  8. "time"
  9. )
  10. type MQTTClient struct {
  11. Host string
  12. User, Passwd string
  13. ClientID string
  14. CleanSession bool
  15. Order bool
  16. KeepAliveInterval time.Duration
  17. PingTimeout time.Duration
  18. MessageChannelDepth uint
  19. OnConnect mqtt.OnConnectHandler
  20. OnLost mqtt.ConnectionLostHandler
  21. FileStorePath string
  22. WillTopic string
  23. WillMessage string
  24. WillQOS byte
  25. WillRetained bool
  26. TLSConfig *tls.Config
  27. client mqtt.Client
  28. }
  29. func NewMQTTClient() *MQTTClient {
  30. return &MQTTClient{}
  31. }
  32. func (mc *MQTTClient) Start() {
  33. opts := mqtt.NewClientOptions()
  34. opts.AddBroker(mc.Host)
  35. opts.SetClientID(mc.ClientID)
  36. opts.SetUsername(mc.User)
  37. opts.SetPassword(mc.Passwd)
  38. opts.SetAutoReconnect(true)
  39. opts.SetCleanSession(mc.CleanSession)
  40. if mc.TLSConfig != nil {
  41. //common.LogInfo("SSL/TLS is enabled!")
  42. opts.SetTLSConfig(mc.TLSConfig)
  43. }
  44. if strings.Compare(mc.FileStorePath, "memory") != 0 {
  45. //common.LogInfo("Use File Store!")
  46. opts.SetStore(mqtt.NewFileStore(mc.FileStorePath))
  47. }
  48. if mc.OnConnect != nil {
  49. opts.SetOnConnectHandler(mc.OnConnect)
  50. }
  51. if mc.OnLost != nil {
  52. opts.SetConnectionLostHandler(mc.OnLost)
  53. }
  54. if strings.Compare(mc.WillTopic, "") != 0 {
  55. opts.SetWill(mc.WillTopic, mc.WillMessage, mc.WillQOS, mc.WillRetained)
  56. }
  57. if mc.Order != false {
  58. opts.SetOrderMatters(true)
  59. }
  60. opts.SetKeepAlive(mc.KeepAliveInterval)
  61. opts.SetPingTimeout(mc.PingTimeout)
  62. opts.SetMessageChannelDepth(mc.MessageChannelDepth)
  63. // Create mqtt client.
  64. client := mqtt.NewClient(opts)
  65. mc.client = client
  66. }
  67. func (mc *MQTTClient) Connect() error {
  68. if mc.client == nil {
  69. return errors.New("未创建MQTT客户端")
  70. }
  71. if token := mc.client.Connect(); token.Wait() && token.Error() != nil {
  72. return token.Error()
  73. }
  74. return nil
  75. }
  76. //Publish
  77. // We Encode this model message and publish it.
  78. func (mc *MQTTClient) Publish(topic string, qos byte, retained bool, msg *Message) error {
  79. if msg == nil {
  80. return errors.New("消息不可为空")
  81. }
  82. if mc.client == nil {
  83. return errors.New("未创建MQTT客户端")
  84. }
  85. if !mc.client.IsConnectionOpen() {
  86. return errors.New("未连接到 Mqtt Broker")
  87. }
  88. //rawData, err := translator.NewTransCoding().Encode(msg)
  89. rawData, err := json.Marshal(msg)
  90. if err != nil {
  91. return err
  92. }
  93. if token := mc.client.Publish(topic, qos, retained, rawData); token.Wait() && token.Error() != nil {
  94. return token.Error()
  95. }
  96. return nil
  97. }
  98. //Subscribe
  99. // We Override the function for the fn can process the model message.
  100. func (mc *MQTTClient) Subscribe(topic string, qos byte, fn func(tpc string, msg *Message)) error {
  101. if mc.client == nil {
  102. return errors.New("未创建MQTT客户端")
  103. }
  104. if !mc.client.IsConnectionOpen() {
  105. return errors.New("未连接到 Mqtt Broker")
  106. }
  107. callback := func(client mqtt.Client, message mqtt.Message) {
  108. msg := &Message{}
  109. rawData := message.Payload()
  110. //Logger.Info("Raw Data: ", zap.String("msg", string(rawData)))
  111. //err := translator.NewTransCoding().Decode(rawData, msg)
  112. err := json.Unmarshal(rawData, msg)
  113. if err != nil {
  114. Logger.Info("Error message format, Ignored!")
  115. return
  116. }
  117. fn(message.Topic(), msg)
  118. }
  119. if token := mc.client.Subscribe(topic, qos, callback); token.Wait() && token.Error() != nil {
  120. return token.Error()
  121. }
  122. return nil
  123. }
  124. func (mc *MQTTClient) Unsubscribe(topics string) error {
  125. if mc.client == nil {
  126. return errors.New("未创建MQTT客户端")
  127. }
  128. if !mc.client.IsConnectionOpen() {
  129. return errors.New("未连接")
  130. }
  131. if token := mc.client.Unsubscribe(topics); token.Wait() && token.Error() != nil {
  132. return token.Error()
  133. }
  134. return nil
  135. }
  136. func (mc *MQTTClient) Close() {
  137. mc.client.Disconnect(250)
  138. }