'use strict' /** * Module dependencies */ var events = require('events') var Store = require('./store') var mqttPacket = require('mqtt-packet') var Writable = require('readable-stream').Writable var inherits = require('inherits') var reInterval = require('reinterval') var validations = require('./validations') var xtend = require('xtend') var setImmediate = global.setImmediate || function (callback) { // works in node v0.8 process.nextTick(callback) } var defaultConnectOptions = { keepalive: 60, reschedulePings: true, protocolId: 'MQTT', protocolVersion: 4, reconnectPeriod: 1000, connectTimeout: 30 * 1000, clean: true, resubscribe: true } var errors = { 0: '', 1: 'Unacceptable protocol version', 2: 'Identifier rejected', 3: 'Server unavailable', 4: 'Bad username or password', 5: 'Not authorized', 16: 'No matching subscribers', 17: 'No subscription existed', 128: 'Unspecified error', 129: 'Malformed Packet', 130: 'Protocol Error', 131: 'Implementation specific error', 132: 'Unsupported Protocol Version', 133: 'Client Identifier not valid', 134: 'Bad User Name or Password', 135: 'Not authorized', 136: 'Server unavailable', 137: 'Server busy', 138: 'Banned', 139: 'Server shutting down', 140: 'Bad authentication method', 141: 'Keep Alive timeout', 142: 'Session taken over', 143: 'Topic Filter invalid', 144: 'Topic Name invalid', 145: 'Packet identifier in use', 146: 'Packet Identifier not found', 147: 'Receive Maximum exceeded', 148: 'Topic Alias invalid', 149: 'Packet too large', 150: 'Message rate too high', 151: 'Quota exceeded', 152: 'Administrative action', 153: 'Payload format invalid', 154: 'Retain not supported', 155: 'QoS not supported', 156: 'Use another server', 157: 'Server moved', 158: 'Shared Subscriptions not supported', 159: 'Connection rate exceeded', 160: 'Maximum connect time', 161: 'Subscription Identifiers not supported', 162: 'Wildcard Subscriptions not supported' } function defaultId () { return 'mqttjs_' + Math.random().toString(16).substr(2, 8) } function sendPacket (client, packet, cb) { client.emit('packetsend', packet) var result = mqttPacket.writeToStream(packet, client.stream, client.options) if (!result && cb) { client.stream.once('drain', cb) } else if (cb) { cb() } } function flush (queue) { if (queue) { Object.keys(queue).forEach(function (messageId) { if (typeof queue[messageId].cb === 'function') { queue[messageId].cb(new Error('Connection closed')) delete queue[messageId] } }) } } function flushVolatile (queue) { if (queue) { Object.keys(queue).forEach(function (messageId) { if (queue[messageId].volatile && typeof queue[messageId].cb === 'function') { queue[messageId].cb(new Error('Connection closed')) delete queue[messageId] } }) } } function storeAndSend (client, packet, cb, cbStorePut) { client.outgoingStore.put(packet, function storedPacket (err) { if (err) { return cb && cb(err) } cbStorePut() sendPacket(client, packet, cb) }) } function nop () {} /** * MqttClient constructor * * @param {Stream} stream - stream * @param {Object} [options] - connection options * (see Connection#connect) */ function MqttClient (streamBuilder, options) { var k var that = this if (!(this instanceof MqttClient)) { return new MqttClient(streamBuilder, options) } this.options = options || {} // Defaults for (k in defaultConnectOptions) { if (typeof this.options[k] === 'undefined') { this.options[k] = defaultConnectOptions[k] } else { this.options[k] = options[k] } } this.options.clientId = (typeof options.clientId === 'string') ? options.clientId : defaultId() this.options.customHandleAcks = (options.protocolVersion === 5 && options.customHandleAcks) ? options.customHandleAcks : function () { arguments[3](0) } this.streamBuilder = streamBuilder // Inflight message storages this.outgoingStore = options.outgoingStore || new Store() this.incomingStore = options.incomingStore || new Store() // Should QoS zero messages be queued when the connection is broken? this.queueQoSZero = options.queueQoSZero === undefined ? true : options.queueQoSZero // map of subscribed topics to support reconnection this._resubscribeTopics = {} // map of a subscribe messageId and a topic this.messageIdToTopic = {} // Ping timer, setup in _setupPingTimer this.pingTimer = null // Is the client connected? this.connected = false // Are we disconnecting? this.disconnecting = false // Packet queue this.queue = [] // connack timer this.connackTimer = null // Reconnect timer this.reconnectTimer = null // Is processing store? this._storeProcessing = false // Packet Ids are put into the store during store processing this._packetIdsDuringStoreProcessing = {} /** * MessageIDs starting with 1 * ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810 */ this.nextId = Math.max(1, Math.floor(Math.random() * 65535)) // Inflight callbacks this.outgoing = {} // True if connection is first time. this._firstConnection = true // Mark disconnected on stream close this.on('close', function () { this.connected = false clearTimeout(this.connackTimer) }) // Send queued packets this.on('connect', function () { var queue = this.queue function deliver () { var entry = queue.shift() var packet = null if (!entry) { return } packet = entry.packet that._sendPacket( packet, function (err) { if (entry.cb) { entry.cb(err) } deliver() } ) } deliver() }) // Clear ping timer this.on('close', function () { if (that.pingTimer !== null) { that.pingTimer.clear() that.pingTimer = null } }) // Setup reconnect timer on disconnect this.on('close', this._setupReconnect) events.EventEmitter.call(this) this._setupStream() } inherits(MqttClient, events.EventEmitter) /** * setup the event handlers in the inner stream. * * @api private */ MqttClient.prototype._setupStream = function () { var connectPacket var that = this var writable = new Writable() var parser = mqttPacket.parser(this.options) var completeParse = null var packets = [] this._clearReconnect() this.stream = this.streamBuilder(this) parser.on('packet', function (packet) { packets.push(packet) }) function nextTickWork () { if (packets.length) { process.nextTick(work) } else { var done = completeParse completeParse = null done() } } function work () { var packet = packets.shift() if (packet) { that._handlePacket(packet, nextTickWork) } else { var done = completeParse completeParse = null if (done) done() } } writable._write = function (buf, enc, done) { completeParse = done parser.parse(buf) work() } this.stream.pipe(writable) // Suppress connection errors this.stream.on('error', nop) // Echo stream close this.stream.on('close', function () { flushVolatile(that.outgoing) that.emit('close') }) // Send a connect packet connectPacket = Object.create(this.options) connectPacket.cmd = 'connect' // avoid message queue sendPacket(this, connectPacket) // Echo connection errors parser.on('error', this.emit.bind(this, 'error')) // auth if (this.options.properties) { if (!this.options.properties.authenticationMethod && this.options.properties.authenticationData) { this.emit('error', new Error('Packet has no Authentication Method')) return this } if (this.options.properties.authenticationMethod && this.options.authPacket && typeof this.options.authPacket === 'object') { var authPacket = xtend({cmd: 'auth', reasonCode: 0}, this.options.authPacket) sendPacket(this, authPacket) } } // many drain listeners are needed for qos 1 callbacks if the connection is intermittent this.stream.setMaxListeners(1000) clearTimeout(this.connackTimer) this.connackTimer = setTimeout(function () { that._cleanUp(true) }, this.options.connectTimeout) } MqttClient.prototype._handlePacket = function (packet, done) { var options = this.options if (options.protocolVersion === 5 && options.properties && options.properties.maximumPacketSize && options.properties.maximumPacketSize < packet.length) { this.emit('error', new Error('exceeding packets size ' + packet.cmd)) this.end({reasonCode: 149, properties: { reasonString: 'Maximum packet size was exceeded' }}) return this } this.emit('packetreceive', packet) switch (packet.cmd) { case 'publish': this._handlePublish(packet, done) break case 'puback': case 'pubrec': case 'pubcomp': case 'suback': case 'unsuback': this._handleAck(packet) done() break case 'pubrel': this._handlePubrel(packet, done) break case 'connack': this._handleConnack(packet) done() break case 'pingresp': this._handlePingresp(packet) done() break case 'disconnect': this._handleDisconnect(packet) done() break default: // do nothing // maybe we should do an error handling // or just log it break } } MqttClient.prototype._checkDisconnecting = function (callback) { if (this.disconnecting) { if (callback) { callback(new Error('client disconnecting')) } else { this.emit('error', new Error('client disconnecting')) } } return this.disconnecting } /** * publish - publish to * * @param {String} topic - topic to publish to * @param {String, Buffer} message - message to publish * @param {Object} [opts] - publish options, includes: * {Number} qos - qos level to publish on * {Boolean} retain - whether or not to retain the message * {Boolean} dup - whether or not mark a message as duplicate * {Function} cbStorePut - function(){} called when message is put into `outgoingStore` * @param {Function} [callback] - function(err){} * called when publish succeeds or fails * @returns {MqttClient} this - for chaining * @api public * * @example client.publish('topic', 'message'); * @example * client.publish('topic', 'message', {qos: 1, retain: true, dup: true}); * @example client.publish('topic', 'message', console.log); */ MqttClient.prototype.publish = function (topic, message, opts, callback) { var packet var options = this.options // .publish(topic, payload, cb); if (typeof opts === 'function') { callback = opts opts = null } // default opts var defaultOpts = {qos: 0, retain: false, dup: false} opts = xtend(defaultOpts, opts) if (this._checkDisconnecting(callback)) { return this } packet = { cmd: 'publish', topic: topic, payload: message, qos: opts.qos, retain: opts.retain, messageId: this._nextId(), dup: opts.dup } if (options.protocolVersion === 5) { packet.properties = opts.properties if ((!options.properties && packet.properties && packet.properties.topicAlias) || ((opts.properties && options.properties) && ((opts.properties.topicAlias && options.properties.topicAliasMaximum && opts.properties.topicAlias > options.properties.topicAliasMaximum) || (!options.properties.topicAliasMaximum && opts.properties.topicAlias)))) { /* if we are don`t setup topic alias or topic alias maximum less than topic alias or server don`t give topic alias maximum, we are removing topic alias from packet */ delete packet.properties.topicAlias } } switch (opts.qos) { case 1: case 2: // Add to callbacks this.outgoing[packet.messageId] = { volatile: false, cb: callback || nop } if (this._storeProcessing) { this._packetIdsDuringStoreProcessing[packet.messageId] = false this._storePacket(packet, undefined, opts.cbStorePut) } else { this._sendPacket(packet, undefined, opts.cbStorePut) } break default: if (this._storeProcessing) { this._storePacket(packet, callback, opts.cbStorePut) } else { this._sendPacket(packet, callback, opts.cbStorePut) } break } return this } /** * subscribe - subscribe to * * @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos} * @param {Object} [opts] - optional subscription options, includes: * {Number} qos - subscribe qos level * @param {Function} [callback] - function(err, granted){} where: * {Error} err - subscription error (none at the moment!) * {Array} granted - array of {topic: 't', qos: 0} * @returns {MqttClient} this - for chaining * @api public * @example client.subscribe('topic'); * @example client.subscribe('topic', {qos: 1}); * @example client.subscribe({'topic': {qos: 0}, 'topic2': {qos: 1}}, console.log); * @example client.subscribe('topic', console.log); */ MqttClient.prototype.subscribe = function () { var packet var args = new Array(arguments.length) for (var i = 0; i < arguments.length; i++) { args[i] = arguments[i] } var subs = [] var obj = args.shift() var resubscribe = obj.resubscribe var callback = args.pop() || nop var opts = args.pop() var invalidTopic var that = this var version = this.options.protocolVersion delete obj.resubscribe if (typeof obj === 'string') { obj = [obj] } if (typeof callback !== 'function') { opts = callback callback = nop } invalidTopic = validations.validateTopics(obj) if (invalidTopic !== null) { setImmediate(callback, new Error('Invalid topic ' + invalidTopic)) return this } if (this._checkDisconnecting(callback)) { return this } var defaultOpts = { qos: 0 } if (version === 5) { defaultOpts.nl = false defaultOpts.rap = false defaultOpts.rh = 0 } opts = xtend(defaultOpts, opts) if (Array.isArray(obj)) { obj.forEach(function (topic) { if (!that._resubscribeTopics.hasOwnProperty(topic) || that._resubscribeTopics[topic].qos < opts.qos || resubscribe) { var currentOpts = { topic: topic, qos: opts.qos } if (version === 5) { currentOpts.nl = opts.nl currentOpts.rap = opts.rap currentOpts.rh = opts.rh currentOpts.properties = opts.properties } subs.push(currentOpts) } }) } else { Object .keys(obj) .forEach(function (k) { if (!that._resubscribeTopics.hasOwnProperty(k) || that._resubscribeTopics[k].qos < obj[k].qos || resubscribe) { var currentOpts = { topic: k, qos: obj[k].qos } if (version === 5) { currentOpts.nl = obj[k].nl currentOpts.rap = obj[k].rap currentOpts.rh = obj[k].rh currentOpts.properties = opts.properties } subs.push(currentOpts) } }) } packet = { cmd: 'subscribe', subscriptions: subs, qos: 1, retain: false, dup: false, messageId: this._nextId() } if (opts.properties) { packet.properties = opts.properties } if (!subs.length) { callback(null, []) return } // subscriptions to resubscribe to in case of disconnect if (this.options.resubscribe) { var topics = [] subs.forEach(function (sub) { if (that.options.reconnectPeriod > 0) { var topic = { qos: sub.qos } if (version === 5) { topic.nl = sub.nl || false topic.rap = sub.rap || false topic.rh = sub.rh || 0 topic.properties = sub.properties } that._resubscribeTopics[sub.topic] = topic topics.push(sub.topic) } }) that.messageIdToTopic[packet.messageId] = topics } this.outgoing[packet.messageId] = { volatile: true, cb: function (err, packet) { if (!err) { var granted = packet.granted for (var i = 0; i < granted.length; i += 1) { subs[i].qos = granted[i] } } callback(err, subs) } } this._sendPacket(packet) return this } /** * unsubscribe - unsubscribe from topic(s) * * @param {String, Array} topic - topics to unsubscribe from * @param {Object} [opts] - optional subscription options, includes: * {Object} properties - properties of unsubscribe packet * @param {Function} [callback] - callback fired on unsuback * @returns {MqttClient} this - for chaining * @api public * @example client.unsubscribe('topic'); * @example client.unsubscribe('topic', console.log); */ MqttClient.prototype.unsubscribe = function () { var packet = { cmd: 'unsubscribe', qos: 1, messageId: this._nextId() } var that = this var args = new Array(arguments.length) for (var i = 0; i < arguments.length; i++) { args[i] = arguments[i] } var topic = args.shift() var callback = args.pop() || nop var opts = args.pop() if (typeof topic === 'string') { topic = [topic] } if (typeof callback !== 'function') { opts = callback callback = nop } if (this._checkDisconnecting(callback)) { return this } if (typeof topic === 'string') { packet.unsubscriptions = [topic] } else if (typeof topic === 'object' && topic.length) { packet.unsubscriptions = topic } if (this.options.resubscribe) { packet.unsubscriptions.forEach(function (topic) { delete that._resubscribeTopics[topic] }) } if (typeof opts === 'object' && opts.properties) { packet.properties = opts.properties } this.outgoing[packet.messageId] = { volatile: true, cb: callback } this._sendPacket(packet) return this } /** * end - close connection * * @returns {MqttClient} this - for chaining * @param {Boolean} force - do not wait for all in-flight messages to be acked * @param {Function} cb - called when the client has been closed * * @api public */ MqttClient.prototype.end = function () { var that = this var force = arguments[0] var opts = arguments[1] var cb = arguments[2] if (force == null || typeof force !== 'boolean') { cb = opts || nop opts = force force = false if (typeof opts !== 'object') { cb = opts opts = null if (typeof cb !== 'function') { cb = nop } } } if (typeof opts !== 'object') { cb = opts opts = null } cb = cb || nop function closeStores () { that.disconnected = true that.incomingStore.close(function () { that.outgoingStore.close(function () { if (cb) { cb.apply(null, arguments) } that.emit('end') }) }) if (that._deferredReconnect) { that._deferredReconnect() } } function finish () { // defer closesStores of an I/O cycle, // just to make sure things are // ok for websockets that._cleanUp(force, setImmediate.bind(null, closeStores), opts) } if (this.disconnecting) { return this } this._clearReconnect() this.disconnecting = true if (!force && Object.keys(this.outgoing).length > 0) { // wait 10ms, just to be sure we received all of it this.once('outgoingEmpty', setTimeout.bind(null, finish, 10)) } else { finish() } return this } /** * removeOutgoingMessage - remove a message in outgoing store * the outgoing callback will be called withe Error('Message removed') if the message is removed * * @param {Number} mid - messageId to remove message * @returns {MqttClient} this - for chaining * @api public * * @example client.removeOutgoingMessage(client.getLastMessageId()); */ MqttClient.prototype.removeOutgoingMessage = function (mid) { var cb = this.outgoing[mid] ? this.outgoing[mid].cb : null delete this.outgoing[mid] this.outgoingStore.del({messageId: mid}, function () { cb(new Error('Message removed')) }) return this } /** * reconnect - connect again using the same options as connect() * * @param {Object} [opts] - optional reconnect options, includes: * {Store} incomingStore - a store for the incoming packets * {Store} outgoingStore - a store for the outgoing packets * if opts is not given, current stores are used * @returns {MqttClient} this - for chaining * * @api public */ MqttClient.prototype.reconnect = function (opts) { var that = this var f = function () { if (opts) { that.options.incomingStore = opts.incomingStore that.options.outgoingStore = opts.outgoingStore } else { that.options.incomingStore = null that.options.outgoingStore = null } that.incomingStore = that.options.incomingStore || new Store() that.outgoingStore = that.options.outgoingStore || new Store() that.disconnecting = false that.disconnected = false that._deferredReconnect = null that._reconnect() } if (this.disconnecting && !this.disconnected) { this._deferredReconnect = f } else { f() } return this } /** * _reconnect - implement reconnection * @api privateish */ MqttClient.prototype._reconnect = function () { this.emit('reconnect') this._setupStream() } /** * _setupReconnect - setup reconnect timer */ MqttClient.prototype._setupReconnect = function () { var that = this if (!that.disconnecting && !that.reconnectTimer && (that.options.reconnectPeriod > 0)) { if (!this.reconnecting) { this.emit('offline') this.reconnecting = true } that.reconnectTimer = setInterval(function () { that._reconnect() }, that.options.reconnectPeriod) } } /** * _clearReconnect - clear the reconnect timer */ MqttClient.prototype._clearReconnect = function () { if (this.reconnectTimer) { clearInterval(this.reconnectTimer) this.reconnectTimer = null } } /** * _cleanUp - clean up on connection end * @api private */ MqttClient.prototype._cleanUp = function (forced, done) { var opts = arguments[2] if (done) { this.stream.on('close', done) } if (forced) { if ((this.options.reconnectPeriod === 0) && this.options.clean) { flush(this.outgoing) } this.stream.destroy() } else { var packet = xtend({ cmd: 'disconnect' }, opts) this._sendPacket( packet, setImmediate.bind( null, this.stream.end.bind(this.stream) ) ) } if (!this.disconnecting) { this._clearReconnect() this._setupReconnect() } if (this.pingTimer !== null) { this.pingTimer.clear() this.pingTimer = null } if (done && !this.connected) { this.stream.removeListener('close', done) done() } } /** * _sendPacket - send or queue a packet * @param {String} type - packet type (see `protocol`) * @param {Object} packet - packet options * @param {Function} cb - callback when the packet is sent * @param {Function} cbStorePut - called when message is put into outgoingStore * @api private */ MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) { cbStorePut = cbStorePut || nop if (!this.connected) { this._storePacket(packet, cb, cbStorePut) return } // When sending a packet, reschedule the ping timer this._shiftPingInterval() switch (packet.cmd) { case 'publish': break case 'pubrel': storeAndSend(this, packet, cb, cbStorePut) return default: sendPacket(this, packet, cb) return } switch (packet.qos) { case 2: case 1: storeAndSend(this, packet, cb, cbStorePut) break /** * no need of case here since it will be caught by default * and jshint comply that before default it must be a break * anyway it will result in -1 evaluation */ case 0: /* falls through */ default: sendPacket(this, packet, cb) break } } /** * _storePacket - queue a packet * @param {String} type - packet type (see `protocol`) * @param {Object} packet - packet options * @param {Function} cb - callback when the packet is sent * @param {Function} cbStorePut - called when message is put into outgoingStore * @api private */ MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) { cbStorePut = cbStorePut || nop if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') { this.queue.push({ packet: packet, cb: cb }) } else if (packet.qos > 0) { cb = this.outgoing[packet.messageId] ? this.outgoing[packet.messageId].cb : null this.outgoingStore.put(packet, function (err) { if (err) { return cb && cb(err) } cbStorePut() }) } else if (cb) { cb(new Error('No connection to broker')) } } /** * _setupPingTimer - setup the ping timer * * @api private */ MqttClient.prototype._setupPingTimer = function () { var that = this if (!this.pingTimer && this.options.keepalive) { this.pingResp = true this.pingTimer = reInterval(function () { that._checkPing() }, this.options.keepalive * 1000) } } /** * _shiftPingInterval - reschedule the ping interval * * @api private */ MqttClient.prototype._shiftPingInterval = function () { if (this.pingTimer && this.options.keepalive && this.options.reschedulePings) { this.pingTimer.reschedule(this.options.keepalive * 1000) } } /** * _checkPing - check if a pingresp has come back, and ping the server again * * @api private */ MqttClient.prototype._checkPing = function () { if (this.pingResp) { this.pingResp = false this._sendPacket({ cmd: 'pingreq' }) } else { // do a forced cleanup since socket will be in bad shape this._cleanUp(true) } } /** * _handlePingresp - handle a pingresp * * @api private */ MqttClient.prototype._handlePingresp = function () { this.pingResp = true } /** * _handleConnack * * @param {Object} packet * @api private */ MqttClient.prototype._handleConnack = function (packet) { var options = this.options var version = options.protocolVersion var rc = version === 5 ? packet.reasonCode : packet.returnCode clearTimeout(this.connackTimer) if (packet.properties) { if (packet.properties.topicAliasMaximum) { if (!options.properties) { options.properties = {} } options.properties.topicAliasMaximum = packet.properties.topicAliasMaximum } if (packet.properties.serverKeepAlive && options.keepalive) { options.keepalive = packet.properties.serverKeepAlive this._shiftPingInterval() } if (packet.properties.maximumPacketSize) { if (!options.properties) { options.properties = {} } options.properties.maximumPacketSize = packet.properties.maximumPacketSize } } if (rc === 0) { this.reconnecting = false this._onConnect(packet) } else if (rc > 0) { var err = new Error('Connection refused: ' + errors[rc]) err.code = rc this.emit('error', err) } } /** * _handlePublish * * @param {Object} packet * @api private */ /* those late 2 case should be rewrite to comply with coding style: case 1: case 0: // do not wait sending a puback // no callback passed if (1 === qos) { this._sendPacket({ cmd: 'puback', messageId: mid }); } // emit the message event for both qos 1 and 0 this.emit('message', topic, message, packet); this.handleMessage(packet, done); break; default: // do nothing but every switch mus have a default // log or throw an error about unknown qos break; for now i just suppressed the warnings */ MqttClient.prototype._handlePublish = function (packet, done) { done = typeof done !== 'undefined' ? done : nop var topic = packet.topic.toString() var message = packet.payload var qos = packet.qos var mid = packet.messageId var that = this var options = this.options var validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153] switch (qos) { case 2: { options.customHandleAcks(topic, message, packet, function (error, code) { if (!(error instanceof Error)) { code = error error = null } if (error) { return that.emit('error', error) } if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for pubrec')) } if (code) { that._sendPacket({cmd: 'pubrec', messageId: mid, reasonCode: code}, done) } else { that.incomingStore.put(packet, function () { that._sendPacket({cmd: 'pubrec', messageId: mid}, done) }) } }) break } case 1: { // emit the message event options.customHandleAcks(topic, message, packet, function (error, code) { if (!(error instanceof Error)) { code = error error = null } if (error) { return that.emit('error', error) } if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for puback')) } if (!code) { that.emit('message', topic, message, packet) } that.handleMessage(packet, function (err) { if (err) { return done && done(err) } that._sendPacket({cmd: 'puback', messageId: mid, reasonCode: code}, done) }) }) break } case 0: // emit the message event this.emit('message', topic, message, packet) this.handleMessage(packet, done) break default: // do nothing // log or throw an error about unknown qos break } } /** * Handle messages with backpressure support, one at a time. * Override at will. * * @param Packet packet the packet * @param Function callback call when finished * @api public */ MqttClient.prototype.handleMessage = function (packet, callback) { callback() } /** * _handleAck * * @param {Object} packet * @api private */ MqttClient.prototype._handleAck = function (packet) { /* eslint no-fallthrough: "off" */ var mid = packet.messageId var type = packet.cmd var response = null var cb = this.outgoing[mid] ? this.outgoing[mid].cb : null var that = this var err if (!cb) { // Server sent an ack in error, ignore it. return } // Process switch (type) { case 'pubcomp': // same thing as puback for QoS 2 case 'puback': var pubackRC = packet.reasonCode // Callback - we're done if (pubackRC && pubackRC > 0 && pubackRC !== 16) { err = new Error('Publish error: ' + errors[pubackRC]) err.code = pubackRC cb(err, packet) } delete this.outgoing[mid] this.outgoingStore.del(packet, cb) break case 'pubrec': response = { cmd: 'pubrel', qos: 2, messageId: mid } var pubrecRC = packet.reasonCode if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) { err = new Error('Publish error: ' + errors[pubrecRC]) err.code = pubrecRC cb(err, packet) } else { this._sendPacket(response) } break case 'suback': delete this.outgoing[mid] for (var grantedI = 0; grantedI < packet.granted.length; grantedI++) { if ((packet.granted[grantedI] & 0x80) !== 0) { // suback with Failure status var topics = this.messageIdToTopic[mid] if (topics) { topics.forEach(function (topic) { delete that._resubscribeTopics[topic] }) } } } cb(null, packet) break case 'unsuback': delete this.outgoing[mid] cb(null) break default: that.emit('error', new Error('unrecognized packet type')) } if (this.disconnecting && Object.keys(this.outgoing).length === 0) { this.emit('outgoingEmpty') } } /** * _handlePubrel * * @param {Object} packet * @api private */ MqttClient.prototype._handlePubrel = function (packet, callback) { callback = typeof callback !== 'undefined' ? callback : nop var mid = packet.messageId var that = this var comp = {cmd: 'pubcomp', messageId: mid} that.incomingStore.get(packet, function (err, pub) { if (!err) { that.emit('message', pub.topic, pub.payload, pub) that.handleMessage(pub, function (err) { if (err) { return callback(err) } that.incomingStore.del(pub, nop) that._sendPacket(comp, callback) }) } else { that._sendPacket(comp, callback) } }) } /** * _handleDisconnect * * @param {Object} packet * @api private */ MqttClient.prototype._handleDisconnect = function (packet) { this.emit('disconnect', packet) } /** * _nextId * @return unsigned int */ MqttClient.prototype._nextId = function () { // id becomes current state of this.nextId and increments afterwards var id = this.nextId++ // Ensure 16 bit unsigned int (max 65535, nextId got one higher) if (this.nextId === 65536) { this.nextId = 1 } return id } /** * getLastMessageId * @return unsigned int */ MqttClient.prototype.getLastMessageId = function () { return (this.nextId === 1) ? 65535 : (this.nextId - 1) } /** * _resubscribe * @api private */ MqttClient.prototype._resubscribe = function (connack) { var _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics) if (!this._firstConnection && (this.options.clean || (this.options.protocolVersion === 5 && !connack.sessionPresent)) && _resubscribeTopicsKeys.length > 0) { if (this.options.resubscribe) { if (this.options.protocolVersion === 5) { for (var topicI = 0; topicI < _resubscribeTopicsKeys.length; topicI++) { var resubscribeTopic = {} resubscribeTopic[_resubscribeTopicsKeys[topicI]] = this._resubscribeTopics[_resubscribeTopicsKeys[topicI]] resubscribeTopic.resubscribe = true this.subscribe(resubscribeTopic, {properties: resubscribeTopic[_resubscribeTopicsKeys[topicI]].properties}) } } else { this._resubscribeTopics.resubscribe = true this.subscribe(this._resubscribeTopics) } } else { this._resubscribeTopics = {} } } this._firstConnection = false } /** * _onConnect * * @api private */ MqttClient.prototype._onConnect = function (packet) { if (this.disconnected) { this.emit('connect', packet) return } var that = this this._setupPingTimer() this._resubscribe(packet) this.connected = true function startStreamProcess () { var outStore = that.outgoingStore.createStream() function clearStoreProcessing () { that._storeProcessing = false that._packetIdsDuringStoreProcessing = {} } that.once('close', remove) outStore.on('error', function (err) { clearStoreProcessing() that.removeListener('close', remove) that.emit('error', err) }) function remove () { outStore.destroy() outStore = null clearStoreProcessing() } function storeDeliver () { // edge case, we wrapped this twice if (!outStore) { return } that._storeProcessing = true var packet = outStore.read(1) var cb if (!packet) { // read when data is available in the future outStore.once('readable', storeDeliver) return } // Skip already processed store packets if (that._packetIdsDuringStoreProcessing[packet.messageId]) { storeDeliver() return } // Avoid unnecessary stream read operations when disconnected if (!that.disconnecting && !that.reconnectTimer) { cb = that.outgoing[packet.messageId] ? that.outgoing[packet.messageId].cb : null that.outgoing[packet.messageId] = { volatile: false, cb: function (err, status) { // Ensure that the original callback passed in to publish gets invoked if (cb) { cb(err, status) } storeDeliver() } } that._packetIdsDuringStoreProcessing[packet.messageId] = true that._sendPacket(packet) } else if (outStore.destroy) { outStore.destroy() } } outStore.on('end', function () { var allProcessed = true for (var id in that._packetIdsDuringStoreProcessing) { if (!that._packetIdsDuringStoreProcessing[id]) { allProcessed = false break } } if (allProcessed) { clearStoreProcessing() that.removeListener('close', remove) that.emit('connect', packet) } else { startStreamProcess() } }) storeDeliver() } // start flowing startStreamProcess() } module.exports = MqttClient