123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- package wrapper
- import (
- "crypto/tls"
- "encoding/json"
- "errors"
- "github.com/eclipse/paho.mqtt.golang"
- "strings"
- "time"
- )
- type MQTTClient struct {
- Host string
- User, Passwd string
- ClientID string
- CleanSession bool
- Order bool
- KeepAliveInterval time.Duration
- PingTimeout time.Duration
- MessageChannelDepth uint
- OnConnect mqtt.OnConnectHandler
- OnLost mqtt.ConnectionLostHandler
- FileStorePath string
- WillTopic string
- WillMessage string
- WillQOS byte
- WillRetained bool
- TLSConfig *tls.Config
- client mqtt.Client
- }
- func NewMQTTClient() *MQTTClient {
- return &MQTTClient{}
- }
- func (mc *MQTTClient) Start() {
- opts := mqtt.NewClientOptions()
- opts.AddBroker(mc.Host)
- opts.SetClientID(mc.ClientID)
- opts.SetUsername(mc.User)
- opts.SetPassword(mc.Passwd)
- opts.SetAutoReconnect(true)
- opts.SetCleanSession(mc.CleanSession)
- if mc.TLSConfig != nil {
- //common.LogInfo("SSL/TLS is enabled!")
- opts.SetTLSConfig(mc.TLSConfig)
- }
- if strings.Compare(mc.FileStorePath, "memory") != 0 {
- //common.LogInfo("Use File Store!")
- opts.SetStore(mqtt.NewFileStore(mc.FileStorePath))
- }
- if mc.OnConnect != nil {
- opts.SetOnConnectHandler(mc.OnConnect)
- }
- if mc.OnLost != nil {
- opts.SetConnectionLostHandler(mc.OnLost)
- }
- if strings.Compare(mc.WillTopic, "") != 0 {
- opts.SetWill(mc.WillTopic, mc.WillMessage, mc.WillQOS, mc.WillRetained)
- }
- if mc.Order != false {
- opts.SetOrderMatters(true)
- }
- opts.SetKeepAlive(mc.KeepAliveInterval)
- opts.SetPingTimeout(mc.PingTimeout)
- opts.SetMessageChannelDepth(mc.MessageChannelDepth)
- // Create mqtt client.
- client := mqtt.NewClient(opts)
- mc.client = client
- }
- func (mc *MQTTClient) Connect() error {
- if mc.client == nil {
- return errors.New("未创建MQTT客户端")
- }
- if token := mc.client.Connect(); token.Wait() && token.Error() != nil {
- return token.Error()
- }
- return nil
- }
- //Publish
- // We Encode this model message and publish it.
- func (mc *MQTTClient) Publish(topic string, qos byte, retained bool, msg *Message) error {
- if msg == nil {
- return errors.New("消息不可为空")
- }
- if mc.client == nil {
- return errors.New("未创建MQTT客户端")
- }
- if !mc.client.IsConnectionOpen() {
- return errors.New("未连接到 Mqtt Broker")
- }
- //rawData, err := translator.NewTransCoding().Encode(msg)
- rawData, err := json.Marshal(msg)
- if err != nil {
- return err
- }
- if token := mc.client.Publish(topic, qos, retained, rawData); token.Wait() && token.Error() != nil {
- return token.Error()
- }
- return nil
- }
- //Subscribe
- // We Override the function for the fn can process the model message.
- func (mc *MQTTClient) Subscribe(topic string, qos byte, fn func(tpc string, msg *Message)) error {
- if mc.client == nil {
- return errors.New("未创建MQTT客户端")
- }
- if !mc.client.IsConnectionOpen() {
- return errors.New("未连接到 Mqtt Broker")
- }
- callback := func(client mqtt.Client, message mqtt.Message) {
- msg := &Message{}
- rawData := message.Payload()
- //Logger.Info("Raw Data: ", zap.String("msg", string(rawData)))
- //err := translator.NewTransCoding().Decode(rawData, msg)
- err := json.Unmarshal(rawData, msg)
- if err != nil {
- Logger.Info("Error message format, Ignored!")
- return
- }
- fn(message.Topic(), msg)
- }
- if token := mc.client.Subscribe(topic, qos, callback); token.Wait() && token.Error() != nil {
- return token.Error()
- }
- return nil
- }
- func (mc *MQTTClient) Unsubscribe(topics string) error {
- if mc.client == nil {
- return errors.New("未创建MQTT客户端")
- }
- if !mc.client.IsConnectionOpen() {
- return errors.New("未连接")
- }
- if token := mc.client.Unsubscribe(topics); token.Wait() && token.Error() != nil {
- return token.Error()
- }
- return nil
- }
- func (mc *MQTTClient) Close() {
- mc.client.Disconnect(250)
- }
|