123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- package wrapper
- import (
- "crypto/tls"
- "errors"
- "fmt"
- "github.com/eclipse/paho.mqtt.golang"
- "go.uber.org/zap"
- "time"
- )
- const (
- MqttClientDisconnected = "Disconnected"
- MqttClientConnected = "Connected"
- RetryCount = 5
- CloudAccessSleep = 5
- )
- type Client struct {
- // scheme://host:port
- // Where "scheme" is one of "tcp", "ssl", or "ws", "host" is the ip-address (or hostname)
- // and "port" is the port on which the broker is accepting connections.
- Host string
- User, Passwd string
- // the client id to be used by this client when
- // connecting to the MQTT broker.
- ClientID string
- // the amount of time (in seconds) that the client
- // should wait before sending a PING request to the broker.
- // default as 120s.
- keepAliveInterval time.Duration
- // the amount of time (in seconds) that the client
- // will wait after sending a PING request to the broker.
- // default as 120s
- PingTimeout time.Duration
- //the size of the internal queue that holds messages while the
- // client is temporairily offline.
- MessageCacheDepth uint
- //0: QOSAtMostOnce, 1: QOSAtLeastOnce, 2: QOSExactlyOnce.
- QOS byte
- //if the flag set true, server will store the message and
- //can be delivered to future subscribers.
- Retain bool
- //the state of client.
- State string
- // tls config
- tlsConfig *tls.Config
- client *MQTTClient
- }
- func NewClient(host, user, passwd, clientID string) *Client {
- if host == "" || clientID == "" {
- return nil
- }
- client := &Client{
- Host: host,
- User: user,
- Passwd: passwd,
- ClientID: clientID,
- keepAliveInterval: 120 * time.Second,
- PingTimeout: 120 * time.Second,
- State: MqttClientDisconnected,
- MessageCacheDepth: 100,
- QOS: byte(2),
- Retain: false,
- }
- return client
- }
- func (c *Client) SetkeepAliveInterval(k time.Duration) {
- c.keepAliveInterval = k
- }
- func (c *Client) SetPingTimeout(k time.Duration) {
- c.PingTimeout = k
- }
- func (c *Client) SetTlsConfig(config *tls.Config) {
- c.tlsConfig = config
- }
- func (c *Client) SetQOS(qos byte) {
- c.QOS = qos
- }
- func (c *Client) SetMessageCacheDepth(depth uint) {
- c.MessageCacheDepth = depth
- }
- func (c *Client) SetRetain(ret bool) {
- c.Retain = ret
- }
- func (c *Client) Start() error {
- // Create mqtt client.
- client := &MQTTClient{
- Host: c.Host,
- User: c.User,
- Passwd: c.Passwd,
- ClientID: c.ClientID,
- Order: false,
- KeepAliveInterval: c.keepAliveInterval,
- PingTimeout: c.PingTimeout,
- MessageChannelDepth: c.MessageCacheDepth,
- CleanSession: true,
- FileStorePath: "memory",
- OnConnect: c.ClientOnConnect,
- OnLost: c.ClientOnLost,
- WillTopic: "", //no will topic.
- TLSConfig: c.tlsConfig,
- }
- c.client = client
- //Start the mqtt client
- c.client.Start()
- var (
- i int
- err error
- )
- for i = 0; i < RetryCount; i++ {
- //Logger.Info(fmt.Sprintf("正在尝试第 %d 次连接MQTT", i + 1))
- err = c.client.Connect()
- if err != nil {
- Logger.Info(fmt.Sprintf("MQTT连接失败[%s], 5s后重试...", err.Error()))
- } else {
- //Logger.Info("MQTT连接成功!!!")
- break
- }
- time.Sleep(CloudAccessSleep * time.Second)
- }
- if i >= RetryCount-1 && err != nil {
- return errors.New("超过重试次数")
- } else {
- return nil
- }
- }
- func (c *Client) Publish(topic string, msg *Message) error {
- return c.client.Publish(topic, c.QOS, c.Retain, msg)
- }
- func (c *Client) Subscribe(topic string, fn func(topic string, msg *Message)) error {
- return c.client.Subscribe(topic, c.QOS, fn)
- }
- func (c *Client) Unsubscribe(topics string) error {
- return c.client.Unsubscribe(topics)
- }
- func (c *Client) Close() {
- c.client.Close()
- }
- func (c *Client) ClientOnConnect(client mqtt.Client) {
- Logger.Info("[MQTT]连接成功", zap.String("client_id", c.ClientID))
- c.State = MqttClientConnected
- }
- func (c *Client) ClientOnLost(client mqtt.Client, err error) {
- Logger.Info("[MQTT]失去连接", zap.String("client_id", c.ClientID), zap.String("error", err.Error()))
- c.State = MqttClientDisconnected
- time.Sleep(CloudAccessSleep * time.Second)
- go func() {
- _ = c.Start()
- }()
- }
|