client.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package wrapper
  2. import (
  3. "crypto/tls"
  4. "errors"
  5. "fmt"
  6. "github.com/eclipse/paho.mqtt.golang"
  7. "go.uber.org/zap"
  8. "time"
  9. )
  10. const (
  11. MqttClientDisconnected = "Disconnected"
  12. MqttClientConnected = "Connected"
  13. RetryCount = 5
  14. CloudAccessSleep = 5
  15. )
  16. type Client struct {
  17. // scheme://host:port
  18. // Where "scheme" is one of "tcp", "ssl", or "ws", "host" is the ip-address (or hostname)
  19. // and "port" is the port on which the broker is accepting connections.
  20. Host string
  21. User, Passwd string
  22. // the client id to be used by this client when
  23. // connecting to the MQTT broker.
  24. ClientID string
  25. // the amount of time (in seconds) that the client
  26. // should wait before sending a PING request to the broker.
  27. // default as 120s.
  28. keepAliveInterval time.Duration
  29. // the amount of time (in seconds) that the client
  30. // will wait after sending a PING request to the broker.
  31. // default as 120s
  32. PingTimeout time.Duration
  33. //the size of the internal queue that holds messages while the
  34. // client is temporairily offline.
  35. MessageCacheDepth uint
  36. //0: QOSAtMostOnce, 1: QOSAtLeastOnce, 2: QOSExactlyOnce.
  37. QOS byte
  38. //if the flag set true, server will store the message and
  39. //can be delivered to future subscribers.
  40. Retain bool
  41. //the state of client.
  42. State string
  43. // tls config
  44. tlsConfig *tls.Config
  45. client *MQTTClient
  46. }
  47. func NewClient(host, user, passwd, clientID string) *Client {
  48. if host == "" || clientID == "" {
  49. return nil
  50. }
  51. client := &Client{
  52. Host: host,
  53. User: user,
  54. Passwd: passwd,
  55. ClientID: clientID,
  56. keepAliveInterval: 120 * time.Second,
  57. PingTimeout: 120 * time.Second,
  58. State: MqttClientDisconnected,
  59. MessageCacheDepth: 100,
  60. QOS: byte(2),
  61. Retain: false,
  62. }
  63. return client
  64. }
  65. func (c *Client) SetkeepAliveInterval(k time.Duration) {
  66. c.keepAliveInterval = k
  67. }
  68. func (c *Client) SetPingTimeout(k time.Duration) {
  69. c.PingTimeout = k
  70. }
  71. func (c *Client) SetTlsConfig(config *tls.Config) {
  72. c.tlsConfig = config
  73. }
  74. func (c *Client) SetQOS(qos byte) {
  75. c.QOS = qos
  76. }
  77. func (c *Client) SetMessageCacheDepth(depth uint) {
  78. c.MessageCacheDepth = depth
  79. }
  80. func (c *Client) SetRetain(ret bool) {
  81. c.Retain = ret
  82. }
  83. func (c *Client) Start() error {
  84. // Create mqtt client.
  85. client := &MQTTClient{
  86. Host: c.Host,
  87. User: c.User,
  88. Passwd: c.Passwd,
  89. ClientID: c.ClientID,
  90. Order: false,
  91. KeepAliveInterval: c.keepAliveInterval,
  92. PingTimeout: c.PingTimeout,
  93. MessageChannelDepth: c.MessageCacheDepth,
  94. CleanSession: true,
  95. FileStorePath: "memory",
  96. OnConnect: c.ClientOnConnect,
  97. OnLost: c.ClientOnLost,
  98. WillTopic: "", //no will topic.
  99. TLSConfig: c.tlsConfig,
  100. }
  101. c.client = client
  102. //Start the mqtt client
  103. c.client.Start()
  104. var (
  105. i int
  106. err error
  107. )
  108. for i = 0; i < RetryCount; i++ {
  109. //Logger.Info(fmt.Sprintf("正在尝试第 %d 次连接MQTT", i + 1))
  110. err = c.client.Connect()
  111. if err != nil {
  112. Logger.Info(fmt.Sprintf("MQTT连接失败[%s], 5s后重试...", err.Error()))
  113. } else {
  114. //Logger.Info("MQTT连接成功!!!")
  115. break
  116. }
  117. time.Sleep(CloudAccessSleep * time.Second)
  118. }
  119. if i >= RetryCount-1 && err != nil {
  120. return errors.New("超过重试次数")
  121. } else {
  122. return nil
  123. }
  124. }
  125. func (c *Client) Publish(topic string, msg *Message) error {
  126. return c.client.Publish(topic, c.QOS, c.Retain, msg)
  127. }
  128. func (c *Client) Subscribe(topic string, fn func(topic string, msg *Message)) error {
  129. return c.client.Subscribe(topic, c.QOS, fn)
  130. }
  131. func (c *Client) Unsubscribe(topics string) error {
  132. return c.client.Unsubscribe(topics)
  133. }
  134. func (c *Client) Close() {
  135. c.client.Close()
  136. }
  137. func (c *Client) ClientOnConnect(client mqtt.Client) {
  138. Logger.Info("[MQTT]连接成功", zap.String("client_id", c.ClientID))
  139. c.State = MqttClientConnected
  140. }
  141. func (c *Client) ClientOnLost(client mqtt.Client, err error) {
  142. Logger.Info("[MQTT]失去连接", zap.String("client_id", c.ClientID), zap.String("error", err.Error()))
  143. c.State = MqttClientDisconnected
  144. time.Sleep(CloudAccessSleep * time.Second)
  145. go func() {
  146. _ = c.Start()
  147. }()
  148. }