package wrapper import ( "crypto/tls" "encoding/json" "errors" "github.com/eclipse/paho.mqtt.golang" "strings" "time" ) type MQTTClient struct { Host string User, Passwd string ClientID string CleanSession bool Order bool KeepAliveInterval time.Duration PingTimeout time.Duration MessageChannelDepth uint OnConnect mqtt.OnConnectHandler OnLost mqtt.ConnectionLostHandler FileStorePath string WillTopic string WillMessage string WillQOS byte WillRetained bool TLSConfig *tls.Config client mqtt.Client } func NewMQTTClient() *MQTTClient { return &MQTTClient{} } func (mc *MQTTClient) Start() { opts := mqtt.NewClientOptions() opts.AddBroker(mc.Host) opts.SetClientID(mc.ClientID) opts.SetUsername(mc.User) opts.SetPassword(mc.Passwd) opts.SetAutoReconnect(true) opts.SetCleanSession(mc.CleanSession) if mc.TLSConfig != nil { //common.LogInfo("SSL/TLS is enabled!") opts.SetTLSConfig(mc.TLSConfig) } if strings.Compare(mc.FileStorePath, "memory") != 0 { //common.LogInfo("Use File Store!") opts.SetStore(mqtt.NewFileStore(mc.FileStorePath)) } if mc.OnConnect != nil { opts.SetOnConnectHandler(mc.OnConnect) } if mc.OnLost != nil { opts.SetConnectionLostHandler(mc.OnLost) } if strings.Compare(mc.WillTopic, "") != 0 { opts.SetWill(mc.WillTopic, mc.WillMessage, mc.WillQOS, mc.WillRetained) } if mc.Order != false { opts.SetOrderMatters(true) } opts.SetKeepAlive(mc.KeepAliveInterval) opts.SetPingTimeout(mc.PingTimeout) opts.SetMessageChannelDepth(mc.MessageChannelDepth) // Create mqtt client. client := mqtt.NewClient(opts) mc.client = client } func (mc *MQTTClient) Connect() error { if mc.client == nil { return errors.New("未创建MQTT客户端") } if token := mc.client.Connect(); token.Wait() && token.Error() != nil { return token.Error() } return nil } //Publish // We Encode this model message and publish it. func (mc *MQTTClient) Publish(topic string, qos byte, retained bool, msg *Message) error { if msg == nil { return errors.New("消息不可为空") } if mc.client == nil { return errors.New("未创建MQTT客户端") } if !mc.client.IsConnectionOpen() { return errors.New("未连接到 Mqtt Broker") } //rawData, err := translator.NewTransCoding().Encode(msg) rawData, err := json.Marshal(msg) if err != nil { return err } if token := mc.client.Publish(topic, qos, retained, rawData); token.Wait() && token.Error() != nil { return token.Error() } return nil } //Subscribe // We Override the function for the fn can process the model message. func (mc *MQTTClient) Subscribe(topic string, qos byte, fn func(tpc string, msg *Message)) error { if mc.client == nil { return errors.New("未创建MQTT客户端") } if !mc.client.IsConnectionOpen() { return errors.New("未连接到 Mqtt Broker") } callback := func(client mqtt.Client, message mqtt.Message) { msg := &Message{} rawData := message.Payload() //Logger.Info("Raw Data: ", zap.String("msg", string(rawData))) //err := translator.NewTransCoding().Decode(rawData, msg) err := json.Unmarshal(rawData, msg) if err != nil { Logger.Info("Error message format, Ignored!") return } fn(message.Topic(), msg) } if token := mc.client.Subscribe(topic, qos, callback); token.Wait() && token.Error() != nil { return token.Error() } return nil } func (mc *MQTTClient) Unsubscribe(topics string) error { if mc.client == nil { return errors.New("未创建MQTT客户端") } if !mc.client.IsConnectionOpen() { return errors.New("未连接") } if token := mc.client.Unsubscribe(topics); token.Wait() && token.Error() != nil { return token.Error() } return nil } func (mc *MQTTClient) Close() { mc.client.Disconnect(250) }