package wrapper import ( "crypto/tls" "errors" "fmt" "github.com/eclipse/paho.mqtt.golang" "go.uber.org/zap" "time" ) const ( MqttClientDisconnected = "Disconnected" MqttClientConnected = "Connected" RetryCount = 5 CloudAccessSleep = 5 ) type Client struct { // scheme://host:port // Where "scheme" is one of "tcp", "ssl", or "ws", "host" is the ip-address (or hostname) // and "port" is the port on which the broker is accepting connections. Host string User, Passwd string // the client id to be used by this client when // connecting to the MQTT broker. ClientID string // the amount of time (in seconds) that the client // should wait before sending a PING request to the broker. // default as 120s. keepAliveInterval time.Duration // the amount of time (in seconds) that the client // will wait after sending a PING request to the broker. // default as 120s PingTimeout time.Duration //the size of the internal queue that holds messages while the // client is temporairily offline. MessageCacheDepth uint //0: QOSAtMostOnce, 1: QOSAtLeastOnce, 2: QOSExactlyOnce. QOS byte //if the flag set true, server will store the message and //can be delivered to future subscribers. Retain bool //the state of client. State string // tls config tlsConfig *tls.Config client *MQTTClient } func NewClient(host, user, passwd, clientID string) *Client { if host == "" || clientID == "" { return nil } client := &Client{ Host: host, User: user, Passwd: passwd, ClientID: clientID, keepAliveInterval: 120 * time.Second, PingTimeout: 120 * time.Second, State: MqttClientDisconnected, MessageCacheDepth: 100, QOS: byte(2), Retain: false, } return client } func (c *Client) SetkeepAliveInterval(k time.Duration) { c.keepAliveInterval = k } func (c *Client) SetPingTimeout(k time.Duration) { c.PingTimeout = k } func (c *Client) SetTlsConfig(config *tls.Config) { c.tlsConfig = config } func (c *Client) SetQOS(qos byte) { c.QOS = qos } func (c *Client) SetMessageCacheDepth(depth uint) { c.MessageCacheDepth = depth } func (c *Client) SetRetain(ret bool) { c.Retain = ret } func (c *Client) Start() error { // Create mqtt client. client := &MQTTClient{ Host: c.Host, User: c.User, Passwd: c.Passwd, ClientID: c.ClientID, Order: false, KeepAliveInterval: c.keepAliveInterval, PingTimeout: c.PingTimeout, MessageChannelDepth: c.MessageCacheDepth, CleanSession: true, FileStorePath: "memory", OnConnect: c.ClientOnConnect, OnLost: c.ClientOnLost, WillTopic: "", //no will topic. TLSConfig: c.tlsConfig, } c.client = client //Start the mqtt client c.client.Start() var ( i int err error ) for i = 0; i < RetryCount; i++ { //Logger.Info(fmt.Sprintf("正在尝试第 %d 次连接MQTT", i + 1)) err = c.client.Connect() if err != nil { Logger.Info(fmt.Sprintf("MQTT连接失败[%s], 5s后重试...", err.Error())) } else { //Logger.Info("MQTT连接成功!!!") break } time.Sleep(CloudAccessSleep * time.Second) } if i >= RetryCount-1 && err != nil { return errors.New("超过重试次数") } else { return nil } } func (c *Client) Publish(topic string, msg *Message) error { return c.client.Publish(topic, c.QOS, c.Retain, msg) } func (c *Client) Subscribe(topic string, fn func(topic string, msg *Message)) error { return c.client.Subscribe(topic, c.QOS, fn) } func (c *Client) Unsubscribe(topics string) error { return c.client.Unsubscribe(topics) } func (c *Client) Close() { c.client.Close() } func (c *Client) ClientOnConnect(client mqtt.Client) { Logger.Info("[MQTT]连接成功", zap.String("client_id", c.ClientID)) c.State = MqttClientConnected } func (c *Client) ClientOnLost(client mqtt.Client, err error) { Logger.Info("[MQTT]失去连接", zap.String("client_id", c.ClientID), zap.String("error", err.Error())) c.State = MqttClientDisconnected time.Sleep(CloudAccessSleep * time.Second) go func() { _ = c.Start() }() }