12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460 |
- 'use strict'
- /**
- * Module dependencies
- */
- var events = require('events')
- var Store = require('./store')
- var mqttPacket = require('mqtt-packet')
- var Writable = require('readable-stream').Writable
- var inherits = require('inherits')
- var reInterval = require('reinterval')
- var validations = require('./validations')
- var xtend = require('xtend')
- var setImmediate = global.setImmediate || function (callback) {
- // works in node v0.8
- process.nextTick(callback)
- }
- var defaultConnectOptions = {
- keepalive: 60,
- reschedulePings: true,
- protocolId: 'MQTT',
- protocolVersion: 4,
- reconnectPeriod: 1000,
- connectTimeout: 30 * 1000,
- clean: true,
- resubscribe: true
- }
- var errors = {
- 0: '',
- 1: 'Unacceptable protocol version',
- 2: 'Identifier rejected',
- 3: 'Server unavailable',
- 4: 'Bad username or password',
- 5: 'Not authorized',
- 16: 'No matching subscribers',
- 17: 'No subscription existed',
- 128: 'Unspecified error',
- 129: 'Malformed Packet',
- 130: 'Protocol Error',
- 131: 'Implementation specific error',
- 132: 'Unsupported Protocol Version',
- 133: 'Client Identifier not valid',
- 134: 'Bad User Name or Password',
- 135: 'Not authorized',
- 136: 'Server unavailable',
- 137: 'Server busy',
- 138: 'Banned',
- 139: 'Server shutting down',
- 140: 'Bad authentication method',
- 141: 'Keep Alive timeout',
- 142: 'Session taken over',
- 143: 'Topic Filter invalid',
- 144: 'Topic Name invalid',
- 145: 'Packet identifier in use',
- 146: 'Packet Identifier not found',
- 147: 'Receive Maximum exceeded',
- 148: 'Topic Alias invalid',
- 149: 'Packet too large',
- 150: 'Message rate too high',
- 151: 'Quota exceeded',
- 152: 'Administrative action',
- 153: 'Payload format invalid',
- 154: 'Retain not supported',
- 155: 'QoS not supported',
- 156: 'Use another server',
- 157: 'Server moved',
- 158: 'Shared Subscriptions not supported',
- 159: 'Connection rate exceeded',
- 160: 'Maximum connect time',
- 161: 'Subscription Identifiers not supported',
- 162: 'Wildcard Subscriptions not supported'
- }
- function defaultId () {
- return 'mqttjs_' + Math.random().toString(16).substr(2, 8)
- }
- function sendPacket (client, packet, cb) {
- client.emit('packetsend', packet)
- var result = mqttPacket.writeToStream(packet, client.stream, client.options)
- if (!result && cb) {
- client.stream.once('drain', cb)
- } else if (cb) {
- cb()
- }
- }
- function flush (queue) {
- if (queue) {
- Object.keys(queue).forEach(function (messageId) {
- if (typeof queue[messageId].cb === 'function') {
- queue[messageId].cb(new Error('Connection closed'))
- delete queue[messageId]
- }
- })
- }
- }
- function flushVolatile (queue) {
- if (queue) {
- Object.keys(queue).forEach(function (messageId) {
- if (queue[messageId].volatile && typeof queue[messageId].cb === 'function') {
- queue[messageId].cb(new Error('Connection closed'))
- delete queue[messageId]
- }
- })
- }
- }
- function storeAndSend (client, packet, cb, cbStorePut) {
- client.outgoingStore.put(packet, function storedPacket (err) {
- if (err) {
- return cb && cb(err)
- }
- cbStorePut()
- sendPacket(client, packet, cb)
- })
- }
- function nop () {}
- /**
- * MqttClient constructor
- *
- * @param {Stream} stream - stream
- * @param {Object} [options] - connection options
- * (see Connection#connect)
- */
- function MqttClient (streamBuilder, options) {
- var k
- var that = this
- if (!(this instanceof MqttClient)) {
- return new MqttClient(streamBuilder, options)
- }
- this.options = options || {}
- // Defaults
- for (k in defaultConnectOptions) {
- if (typeof this.options[k] === 'undefined') {
- this.options[k] = defaultConnectOptions[k]
- } else {
- this.options[k] = options[k]
- }
- }
- this.options.clientId = (typeof options.clientId === 'string') ? options.clientId : defaultId()
- this.options.customHandleAcks = (options.protocolVersion === 5 && options.customHandleAcks) ? options.customHandleAcks : function () { arguments[3](0) }
- this.streamBuilder = streamBuilder
- // Inflight message storages
- this.outgoingStore = options.outgoingStore || new Store()
- this.incomingStore = options.incomingStore || new Store()
- // Should QoS zero messages be queued when the connection is broken?
- this.queueQoSZero = options.queueQoSZero === undefined ? true : options.queueQoSZero
- // map of subscribed topics to support reconnection
- this._resubscribeTopics = {}
- // map of a subscribe messageId and a topic
- this.messageIdToTopic = {}
- // Ping timer, setup in _setupPingTimer
- this.pingTimer = null
- // Is the client connected?
- this.connected = false
- // Are we disconnecting?
- this.disconnecting = false
- // Packet queue
- this.queue = []
- // connack timer
- this.connackTimer = null
- // Reconnect timer
- this.reconnectTimer = null
- // Is processing store?
- this._storeProcessing = false
- // Packet Ids are put into the store during store processing
- this._packetIdsDuringStoreProcessing = {}
- /**
- * MessageIDs starting with 1
- * ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810
- */
- this.nextId = Math.max(1, Math.floor(Math.random() * 65535))
- // Inflight callbacks
- this.outgoing = {}
- // True if connection is first time.
- this._firstConnection = true
- // Mark disconnected on stream close
- this.on('close', function () {
- this.connected = false
- clearTimeout(this.connackTimer)
- })
- // Send queued packets
- this.on('connect', function () {
- var queue = this.queue
- function deliver () {
- var entry = queue.shift()
- var packet = null
- if (!entry) {
- return
- }
- packet = entry.packet
- that._sendPacket(
- packet,
- function (err) {
- if (entry.cb) {
- entry.cb(err)
- }
- deliver()
- }
- )
- }
- deliver()
- })
- // Clear ping timer
- this.on('close', function () {
- if (that.pingTimer !== null) {
- that.pingTimer.clear()
- that.pingTimer = null
- }
- })
- // Setup reconnect timer on disconnect
- this.on('close', this._setupReconnect)
- events.EventEmitter.call(this)
- this._setupStream()
- }
- inherits(MqttClient, events.EventEmitter)
- /**
- * setup the event handlers in the inner stream.
- *
- * @api private
- */
- MqttClient.prototype._setupStream = function () {
- var connectPacket
- var that = this
- var writable = new Writable()
- var parser = mqttPacket.parser(this.options)
- var completeParse = null
- var packets = []
- this._clearReconnect()
- this.stream = this.streamBuilder(this)
- parser.on('packet', function (packet) {
- packets.push(packet)
- })
- function nextTickWork () {
- if (packets.length) {
- process.nextTick(work)
- } else {
- var done = completeParse
- completeParse = null
- done()
- }
- }
- function work () {
- var packet = packets.shift()
- if (packet) {
- that._handlePacket(packet, nextTickWork)
- } else {
- var done = completeParse
- completeParse = null
- if (done) done()
- }
- }
- writable._write = function (buf, enc, done) {
- completeParse = done
- parser.parse(buf)
- work()
- }
- this.stream.pipe(writable)
- // Suppress connection errors
- this.stream.on('error', nop)
- // Echo stream close
- this.stream.on('close', function () {
- flushVolatile(that.outgoing)
- that.emit('close')
- })
- // Send a connect packet
- connectPacket = Object.create(this.options)
- connectPacket.cmd = 'connect'
- // avoid message queue
- sendPacket(this, connectPacket)
- // Echo connection errors
- parser.on('error', this.emit.bind(this, 'error'))
- // auth
- if (this.options.properties) {
- if (!this.options.properties.authenticationMethod && this.options.properties.authenticationData) {
- this.emit('error', new Error('Packet has no Authentication Method'))
- return this
- }
- if (this.options.properties.authenticationMethod && this.options.authPacket && typeof this.options.authPacket === 'object') {
- var authPacket = xtend({cmd: 'auth', reasonCode: 0}, this.options.authPacket)
- sendPacket(this, authPacket)
- }
- }
- // many drain listeners are needed for qos 1 callbacks if the connection is intermittent
- this.stream.setMaxListeners(1000)
- clearTimeout(this.connackTimer)
- this.connackTimer = setTimeout(function () {
- that._cleanUp(true)
- }, this.options.connectTimeout)
- }
- MqttClient.prototype._handlePacket = function (packet, done) {
- var options = this.options
- if (options.protocolVersion === 5 && options.properties && options.properties.maximumPacketSize && options.properties.maximumPacketSize < packet.length) {
- this.emit('error', new Error('exceeding packets size ' + packet.cmd))
- this.end({reasonCode: 149, properties: { reasonString: 'Maximum packet size was exceeded' }})
- return this
- }
- this.emit('packetreceive', packet)
- switch (packet.cmd) {
- case 'publish':
- this._handlePublish(packet, done)
- break
- case 'puback':
- case 'pubrec':
- case 'pubcomp':
- case 'suback':
- case 'unsuback':
- this._handleAck(packet)
- done()
- break
- case 'pubrel':
- this._handlePubrel(packet, done)
- break
- case 'connack':
- this._handleConnack(packet)
- done()
- break
- case 'pingresp':
- this._handlePingresp(packet)
- done()
- break
- case 'disconnect':
- this._handleDisconnect(packet)
- done()
- break
- default:
- // do nothing
- // maybe we should do an error handling
- // or just log it
- break
- }
- }
- MqttClient.prototype._checkDisconnecting = function (callback) {
- if (this.disconnecting) {
- if (callback) {
- callback(new Error('client disconnecting'))
- } else {
- this.emit('error', new Error('client disconnecting'))
- }
- }
- return this.disconnecting
- }
- /**
- * publish - publish <message> to <topic>
- *
- * @param {String} topic - topic to publish to
- * @param {String, Buffer} message - message to publish
- * @param {Object} [opts] - publish options, includes:
- * {Number} qos - qos level to publish on
- * {Boolean} retain - whether or not to retain the message
- * {Boolean} dup - whether or not mark a message as duplicate
- * {Function} cbStorePut - function(){} called when message is put into `outgoingStore`
- * @param {Function} [callback] - function(err){}
- * called when publish succeeds or fails
- * @returns {MqttClient} this - for chaining
- * @api public
- *
- * @example client.publish('topic', 'message');
- * @example
- * client.publish('topic', 'message', {qos: 1, retain: true, dup: true});
- * @example client.publish('topic', 'message', console.log);
- */
- MqttClient.prototype.publish = function (topic, message, opts, callback) {
- var packet
- var options = this.options
- // .publish(topic, payload, cb);
- if (typeof opts === 'function') {
- callback = opts
- opts = null
- }
- // default opts
- var defaultOpts = {qos: 0, retain: false, dup: false}
- opts = xtend(defaultOpts, opts)
- if (this._checkDisconnecting(callback)) {
- return this
- }
- packet = {
- cmd: 'publish',
- topic: topic,
- payload: message,
- qos: opts.qos,
- retain: opts.retain,
- messageId: this._nextId(),
- dup: opts.dup
- }
- if (options.protocolVersion === 5) {
- packet.properties = opts.properties
- if ((!options.properties && packet.properties && packet.properties.topicAlias) || ((opts.properties && options.properties) &&
- ((opts.properties.topicAlias && options.properties.topicAliasMaximum && opts.properties.topicAlias > options.properties.topicAliasMaximum) ||
- (!options.properties.topicAliasMaximum && opts.properties.topicAlias)))) {
- /*
- if we are don`t setup topic alias or
- topic alias maximum less than topic alias or
- server don`t give topic alias maximum,
- we are removing topic alias from packet
- */
- delete packet.properties.topicAlias
- }
- }
- switch (opts.qos) {
- case 1:
- case 2:
- // Add to callbacks
- this.outgoing[packet.messageId] = {
- volatile: false,
- cb: callback || nop
- }
- if (this._storeProcessing) {
- this._packetIdsDuringStoreProcessing[packet.messageId] = false
- this._storePacket(packet, undefined, opts.cbStorePut)
- } else {
- this._sendPacket(packet, undefined, opts.cbStorePut)
- }
- break
- default:
- if (this._storeProcessing) {
- this._storePacket(packet, callback, opts.cbStorePut)
- } else {
- this._sendPacket(packet, callback, opts.cbStorePut)
- }
- break
- }
- return this
- }
- /**
- * subscribe - subscribe to <topic>
- *
- * @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos}
- * @param {Object} [opts] - optional subscription options, includes:
- * {Number} qos - subscribe qos level
- * @param {Function} [callback] - function(err, granted){} where:
- * {Error} err - subscription error (none at the moment!)
- * {Array} granted - array of {topic: 't', qos: 0}
- * @returns {MqttClient} this - for chaining
- * @api public
- * @example client.subscribe('topic');
- * @example client.subscribe('topic', {qos: 1});
- * @example client.subscribe({'topic': {qos: 0}, 'topic2': {qos: 1}}, console.log);
- * @example client.subscribe('topic', console.log);
- */
- MqttClient.prototype.subscribe = function () {
- var packet
- var args = new Array(arguments.length)
- for (var i = 0; i < arguments.length; i++) {
- args[i] = arguments[i]
- }
- var subs = []
- var obj = args.shift()
- var resubscribe = obj.resubscribe
- var callback = args.pop() || nop
- var opts = args.pop()
- var invalidTopic
- var that = this
- var version = this.options.protocolVersion
- delete obj.resubscribe
- if (typeof obj === 'string') {
- obj = [obj]
- }
- if (typeof callback !== 'function') {
- opts = callback
- callback = nop
- }
- invalidTopic = validations.validateTopics(obj)
- if (invalidTopic !== null) {
- setImmediate(callback, new Error('Invalid topic ' + invalidTopic))
- return this
- }
- if (this._checkDisconnecting(callback)) {
- return this
- }
- var defaultOpts = {
- qos: 0
- }
- if (version === 5) {
- defaultOpts.nl = false
- defaultOpts.rap = false
- defaultOpts.rh = 0
- }
- opts = xtend(defaultOpts, opts)
- if (Array.isArray(obj)) {
- obj.forEach(function (topic) {
- if (!that._resubscribeTopics.hasOwnProperty(topic) ||
- that._resubscribeTopics[topic].qos < opts.qos ||
- resubscribe) {
- var currentOpts = {
- topic: topic,
- qos: opts.qos
- }
- if (version === 5) {
- currentOpts.nl = opts.nl
- currentOpts.rap = opts.rap
- currentOpts.rh = opts.rh
- currentOpts.properties = opts.properties
- }
- subs.push(currentOpts)
- }
- })
- } else {
- Object
- .keys(obj)
- .forEach(function (k) {
- if (!that._resubscribeTopics.hasOwnProperty(k) ||
- that._resubscribeTopics[k].qos < obj[k].qos ||
- resubscribe) {
- var currentOpts = {
- topic: k,
- qos: obj[k].qos
- }
- if (version === 5) {
- currentOpts.nl = obj[k].nl
- currentOpts.rap = obj[k].rap
- currentOpts.rh = obj[k].rh
- currentOpts.properties = opts.properties
- }
- subs.push(currentOpts)
- }
- })
- }
- packet = {
- cmd: 'subscribe',
- subscriptions: subs,
- qos: 1,
- retain: false,
- dup: false,
- messageId: this._nextId()
- }
- if (opts.properties) {
- packet.properties = opts.properties
- }
- if (!subs.length) {
- callback(null, [])
- return
- }
- // subscriptions to resubscribe to in case of disconnect
- if (this.options.resubscribe) {
- var topics = []
- subs.forEach(function (sub) {
- if (that.options.reconnectPeriod > 0) {
- var topic = { qos: sub.qos }
- if (version === 5) {
- topic.nl = sub.nl || false
- topic.rap = sub.rap || false
- topic.rh = sub.rh || 0
- topic.properties = sub.properties
- }
- that._resubscribeTopics[sub.topic] = topic
- topics.push(sub.topic)
- }
- })
- that.messageIdToTopic[packet.messageId] = topics
- }
- this.outgoing[packet.messageId] = {
- volatile: true,
- cb: function (err, packet) {
- if (!err) {
- var granted = packet.granted
- for (var i = 0; i < granted.length; i += 1) {
- subs[i].qos = granted[i]
- }
- }
- callback(err, subs)
- }
- }
- this._sendPacket(packet)
- return this
- }
- /**
- * unsubscribe - unsubscribe from topic(s)
- *
- * @param {String, Array} topic - topics to unsubscribe from
- * @param {Object} [opts] - optional subscription options, includes:
- * {Object} properties - properties of unsubscribe packet
- * @param {Function} [callback] - callback fired on unsuback
- * @returns {MqttClient} this - for chaining
- * @api public
- * @example client.unsubscribe('topic');
- * @example client.unsubscribe('topic', console.log);
- */
- MqttClient.prototype.unsubscribe = function () {
- var packet = {
- cmd: 'unsubscribe',
- qos: 1,
- messageId: this._nextId()
- }
- var that = this
- var args = new Array(arguments.length)
- for (var i = 0; i < arguments.length; i++) {
- args[i] = arguments[i]
- }
- var topic = args.shift()
- var callback = args.pop() || nop
- var opts = args.pop()
- if (typeof topic === 'string') {
- topic = [topic]
- }
- if (typeof callback !== 'function') {
- opts = callback
- callback = nop
- }
- if (this._checkDisconnecting(callback)) {
- return this
- }
- if (typeof topic === 'string') {
- packet.unsubscriptions = [topic]
- } else if (typeof topic === 'object' && topic.length) {
- packet.unsubscriptions = topic
- }
- if (this.options.resubscribe) {
- packet.unsubscriptions.forEach(function (topic) {
- delete that._resubscribeTopics[topic]
- })
- }
- if (typeof opts === 'object' && opts.properties) {
- packet.properties = opts.properties
- }
- this.outgoing[packet.messageId] = {
- volatile: true,
- cb: callback
- }
- this._sendPacket(packet)
- return this
- }
- /**
- * end - close connection
- *
- * @returns {MqttClient} this - for chaining
- * @param {Boolean} force - do not wait for all in-flight messages to be acked
- * @param {Function} cb - called when the client has been closed
- *
- * @api public
- */
- MqttClient.prototype.end = function () {
- var that = this
- var force = arguments[0]
- var opts = arguments[1]
- var cb = arguments[2]
- if (force == null || typeof force !== 'boolean') {
- cb = opts || nop
- opts = force
- force = false
- if (typeof opts !== 'object') {
- cb = opts
- opts = null
- if (typeof cb !== 'function') {
- cb = nop
- }
- }
- }
- if (typeof opts !== 'object') {
- cb = opts
- opts = null
- }
- cb = cb || nop
- function closeStores () {
- that.disconnected = true
- that.incomingStore.close(function () {
- that.outgoingStore.close(function () {
- if (cb) {
- cb.apply(null, arguments)
- }
- that.emit('end')
- })
- })
- if (that._deferredReconnect) {
- that._deferredReconnect()
- }
- }
- function finish () {
- // defer closesStores of an I/O cycle,
- // just to make sure things are
- // ok for websockets
- that._cleanUp(force, setImmediate.bind(null, closeStores), opts)
- }
- if (this.disconnecting) {
- return this
- }
- this._clearReconnect()
- this.disconnecting = true
- if (!force && Object.keys(this.outgoing).length > 0) {
- // wait 10ms, just to be sure we received all of it
- this.once('outgoingEmpty', setTimeout.bind(null, finish, 10))
- } else {
- finish()
- }
- return this
- }
- /**
- * removeOutgoingMessage - remove a message in outgoing store
- * the outgoing callback will be called withe Error('Message removed') if the message is removed
- *
- * @param {Number} mid - messageId to remove message
- * @returns {MqttClient} this - for chaining
- * @api public
- *
- * @example client.removeOutgoingMessage(client.getLastMessageId());
- */
- MqttClient.prototype.removeOutgoingMessage = function (mid) {
- var cb = this.outgoing[mid] ? this.outgoing[mid].cb : null
- delete this.outgoing[mid]
- this.outgoingStore.del({messageId: mid}, function () {
- cb(new Error('Message removed'))
- })
- return this
- }
- /**
- * reconnect - connect again using the same options as connect()
- *
- * @param {Object} [opts] - optional reconnect options, includes:
- * {Store} incomingStore - a store for the incoming packets
- * {Store} outgoingStore - a store for the outgoing packets
- * if opts is not given, current stores are used
- * @returns {MqttClient} this - for chaining
- *
- * @api public
- */
- MqttClient.prototype.reconnect = function (opts) {
- var that = this
- var f = function () {
- if (opts) {
- that.options.incomingStore = opts.incomingStore
- that.options.outgoingStore = opts.outgoingStore
- } else {
- that.options.incomingStore = null
- that.options.outgoingStore = null
- }
- that.incomingStore = that.options.incomingStore || new Store()
- that.outgoingStore = that.options.outgoingStore || new Store()
- that.disconnecting = false
- that.disconnected = false
- that._deferredReconnect = null
- that._reconnect()
- }
- if (this.disconnecting && !this.disconnected) {
- this._deferredReconnect = f
- } else {
- f()
- }
- return this
- }
- /**
- * _reconnect - implement reconnection
- * @api privateish
- */
- MqttClient.prototype._reconnect = function () {
- this.emit('reconnect')
- this._setupStream()
- }
- /**
- * _setupReconnect - setup reconnect timer
- */
- MqttClient.prototype._setupReconnect = function () {
- var that = this
- if (!that.disconnecting && !that.reconnectTimer && (that.options.reconnectPeriod > 0)) {
- if (!this.reconnecting) {
- this.emit('offline')
- this.reconnecting = true
- }
- that.reconnectTimer = setInterval(function () {
- that._reconnect()
- }, that.options.reconnectPeriod)
- }
- }
- /**
- * _clearReconnect - clear the reconnect timer
- */
- MqttClient.prototype._clearReconnect = function () {
- if (this.reconnectTimer) {
- clearInterval(this.reconnectTimer)
- this.reconnectTimer = null
- }
- }
- /**
- * _cleanUp - clean up on connection end
- * @api private
- */
- MqttClient.prototype._cleanUp = function (forced, done) {
- var opts = arguments[2]
- if (done) {
- this.stream.on('close', done)
- }
- if (forced) {
- if ((this.options.reconnectPeriod === 0) && this.options.clean) {
- flush(this.outgoing)
- }
- this.stream.destroy()
- } else {
- var packet = xtend({ cmd: 'disconnect' }, opts)
- this._sendPacket(
- packet,
- setImmediate.bind(
- null,
- this.stream.end.bind(this.stream)
- )
- )
- }
- if (!this.disconnecting) {
- this._clearReconnect()
- this._setupReconnect()
- }
- if (this.pingTimer !== null) {
- this.pingTimer.clear()
- this.pingTimer = null
- }
- if (done && !this.connected) {
- this.stream.removeListener('close', done)
- done()
- }
- }
- /**
- * _sendPacket - send or queue a packet
- * @param {String} type - packet type (see `protocol`)
- * @param {Object} packet - packet options
- * @param {Function} cb - callback when the packet is sent
- * @param {Function} cbStorePut - called when message is put into outgoingStore
- * @api private
- */
- MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) {
- cbStorePut = cbStorePut || nop
- if (!this.connected) {
- this._storePacket(packet, cb, cbStorePut)
- return
- }
- // When sending a packet, reschedule the ping timer
- this._shiftPingInterval()
- switch (packet.cmd) {
- case 'publish':
- break
- case 'pubrel':
- storeAndSend(this, packet, cb, cbStorePut)
- return
- default:
- sendPacket(this, packet, cb)
- return
- }
- switch (packet.qos) {
- case 2:
- case 1:
- storeAndSend(this, packet, cb, cbStorePut)
- break
- /**
- * no need of case here since it will be caught by default
- * and jshint comply that before default it must be a break
- * anyway it will result in -1 evaluation
- */
- case 0:
- /* falls through */
- default:
- sendPacket(this, packet, cb)
- break
- }
- }
- /**
- * _storePacket - queue a packet
- * @param {String} type - packet type (see `protocol`)
- * @param {Object} packet - packet options
- * @param {Function} cb - callback when the packet is sent
- * @param {Function} cbStorePut - called when message is put into outgoingStore
- * @api private
- */
- MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) {
- cbStorePut = cbStorePut || nop
- if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') {
- this.queue.push({ packet: packet, cb: cb })
- } else if (packet.qos > 0) {
- cb = this.outgoing[packet.messageId] ? this.outgoing[packet.messageId].cb : null
- this.outgoingStore.put(packet, function (err) {
- if (err) {
- return cb && cb(err)
- }
- cbStorePut()
- })
- } else if (cb) {
- cb(new Error('No connection to broker'))
- }
- }
- /**
- * _setupPingTimer - setup the ping timer
- *
- * @api private
- */
- MqttClient.prototype._setupPingTimer = function () {
- var that = this
- if (!this.pingTimer && this.options.keepalive) {
- this.pingResp = true
- this.pingTimer = reInterval(function () {
- that._checkPing()
- }, this.options.keepalive * 1000)
- }
- }
- /**
- * _shiftPingInterval - reschedule the ping interval
- *
- * @api private
- */
- MqttClient.prototype._shiftPingInterval = function () {
- if (this.pingTimer && this.options.keepalive && this.options.reschedulePings) {
- this.pingTimer.reschedule(this.options.keepalive * 1000)
- }
- }
- /**
- * _checkPing - check if a pingresp has come back, and ping the server again
- *
- * @api private
- */
- MqttClient.prototype._checkPing = function () {
- if (this.pingResp) {
- this.pingResp = false
- this._sendPacket({ cmd: 'pingreq' })
- } else {
- // do a forced cleanup since socket will be in bad shape
- this._cleanUp(true)
- }
- }
- /**
- * _handlePingresp - handle a pingresp
- *
- * @api private
- */
- MqttClient.prototype._handlePingresp = function () {
- this.pingResp = true
- }
- /**
- * _handleConnack
- *
- * @param {Object} packet
- * @api private
- */
- MqttClient.prototype._handleConnack = function (packet) {
- var options = this.options
- var version = options.protocolVersion
- var rc = version === 5 ? packet.reasonCode : packet.returnCode
- clearTimeout(this.connackTimer)
- if (packet.properties) {
- if (packet.properties.topicAliasMaximum) {
- if (!options.properties) { options.properties = {} }
- options.properties.topicAliasMaximum = packet.properties.topicAliasMaximum
- }
- if (packet.properties.serverKeepAlive && options.keepalive) {
- options.keepalive = packet.properties.serverKeepAlive
- this._shiftPingInterval()
- }
- if (packet.properties.maximumPacketSize) {
- if (!options.properties) { options.properties = {} }
- options.properties.maximumPacketSize = packet.properties.maximumPacketSize
- }
- }
- if (rc === 0) {
- this.reconnecting = false
- this._onConnect(packet)
- } else if (rc > 0) {
- var err = new Error('Connection refused: ' + errors[rc])
- err.code = rc
- this.emit('error', err)
- }
- }
- /**
- * _handlePublish
- *
- * @param {Object} packet
- * @api private
- */
- /*
- those late 2 case should be rewrite to comply with coding style:
- case 1:
- case 0:
- // do not wait sending a puback
- // no callback passed
- if (1 === qos) {
- this._sendPacket({
- cmd: 'puback',
- messageId: mid
- });
- }
- // emit the message event for both qos 1 and 0
- this.emit('message', topic, message, packet);
- this.handleMessage(packet, done);
- break;
- default:
- // do nothing but every switch mus have a default
- // log or throw an error about unknown qos
- break;
- for now i just suppressed the warnings
- */
- MqttClient.prototype._handlePublish = function (packet, done) {
- done = typeof done !== 'undefined' ? done : nop
- var topic = packet.topic.toString()
- var message = packet.payload
- var qos = packet.qos
- var mid = packet.messageId
- var that = this
- var options = this.options
- var validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153]
- switch (qos) {
- case 2: {
- options.customHandleAcks(topic, message, packet, function (error, code) {
- if (!(error instanceof Error)) {
- code = error
- error = null
- }
- if (error) { return that.emit('error', error) }
- if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for pubrec')) }
- if (code) {
- that._sendPacket({cmd: 'pubrec', messageId: mid, reasonCode: code}, done)
- } else {
- that.incomingStore.put(packet, function () {
- that._sendPacket({cmd: 'pubrec', messageId: mid}, done)
- })
- }
- })
- break
- }
- case 1: {
- // emit the message event
- options.customHandleAcks(topic, message, packet, function (error, code) {
- if (!(error instanceof Error)) {
- code = error
- error = null
- }
- if (error) { return that.emit('error', error) }
- if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for puback')) }
- if (!code) { that.emit('message', topic, message, packet) }
- that.handleMessage(packet, function (err) {
- if (err) {
- return done && done(err)
- }
- that._sendPacket({cmd: 'puback', messageId: mid, reasonCode: code}, done)
- })
- })
- break
- }
- case 0:
- // emit the message event
- this.emit('message', topic, message, packet)
- this.handleMessage(packet, done)
- break
- default:
- // do nothing
- // log or throw an error about unknown qos
- break
- }
- }
- /**
- * Handle messages with backpressure support, one at a time.
- * Override at will.
- *
- * @param Packet packet the packet
- * @param Function callback call when finished
- * @api public
- */
- MqttClient.prototype.handleMessage = function (packet, callback) {
- callback()
- }
- /**
- * _handleAck
- *
- * @param {Object} packet
- * @api private
- */
- MqttClient.prototype._handleAck = function (packet) {
- /* eslint no-fallthrough: "off" */
- var mid = packet.messageId
- var type = packet.cmd
- var response = null
- var cb = this.outgoing[mid] ? this.outgoing[mid].cb : null
- var that = this
- var err
- if (!cb) {
- // Server sent an ack in error, ignore it.
- return
- }
- // Process
- switch (type) {
- case 'pubcomp':
- // same thing as puback for QoS 2
- case 'puback':
- var pubackRC = packet.reasonCode
- // Callback - we're done
- if (pubackRC && pubackRC > 0 && pubackRC !== 16) {
- err = new Error('Publish error: ' + errors[pubackRC])
- err.code = pubackRC
- cb(err, packet)
- }
- delete this.outgoing[mid]
- this.outgoingStore.del(packet, cb)
- break
- case 'pubrec':
- response = {
- cmd: 'pubrel',
- qos: 2,
- messageId: mid
- }
- var pubrecRC = packet.reasonCode
- if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) {
- err = new Error('Publish error: ' + errors[pubrecRC])
- err.code = pubrecRC
- cb(err, packet)
- } else {
- this._sendPacket(response)
- }
- break
- case 'suback':
- delete this.outgoing[mid]
- for (var grantedI = 0; grantedI < packet.granted.length; grantedI++) {
- if ((packet.granted[grantedI] & 0x80) !== 0) {
- // suback with Failure status
- var topics = this.messageIdToTopic[mid]
- if (topics) {
- topics.forEach(function (topic) {
- delete that._resubscribeTopics[topic]
- })
- }
- }
- }
- cb(null, packet)
- break
- case 'unsuback':
- delete this.outgoing[mid]
- cb(null)
- break
- default:
- that.emit('error', new Error('unrecognized packet type'))
- }
- if (this.disconnecting &&
- Object.keys(this.outgoing).length === 0) {
- this.emit('outgoingEmpty')
- }
- }
- /**
- * _handlePubrel
- *
- * @param {Object} packet
- * @api private
- */
- MqttClient.prototype._handlePubrel = function (packet, callback) {
- callback = typeof callback !== 'undefined' ? callback : nop
- var mid = packet.messageId
- var that = this
- var comp = {cmd: 'pubcomp', messageId: mid}
- that.incomingStore.get(packet, function (err, pub) {
- if (!err) {
- that.emit('message', pub.topic, pub.payload, pub)
- that.handleMessage(pub, function (err) {
- if (err) {
- return callback(err)
- }
- that.incomingStore.del(pub, nop)
- that._sendPacket(comp, callback)
- })
- } else {
- that._sendPacket(comp, callback)
- }
- })
- }
- /**
- * _handleDisconnect
- *
- * @param {Object} packet
- * @api private
- */
- MqttClient.prototype._handleDisconnect = function (packet) {
- this.emit('disconnect', packet)
- }
- /**
- * _nextId
- * @return unsigned int
- */
- MqttClient.prototype._nextId = function () {
- // id becomes current state of this.nextId and increments afterwards
- var id = this.nextId++
- // Ensure 16 bit unsigned int (max 65535, nextId got one higher)
- if (this.nextId === 65536) {
- this.nextId = 1
- }
- return id
- }
- /**
- * getLastMessageId
- * @return unsigned int
- */
- MqttClient.prototype.getLastMessageId = function () {
- return (this.nextId === 1) ? 65535 : (this.nextId - 1)
- }
- /**
- * _resubscribe
- * @api private
- */
- MqttClient.prototype._resubscribe = function (connack) {
- var _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics)
- if (!this._firstConnection &&
- (this.options.clean || (this.options.protocolVersion === 5 && !connack.sessionPresent)) &&
- _resubscribeTopicsKeys.length > 0) {
- if (this.options.resubscribe) {
- if (this.options.protocolVersion === 5) {
- for (var topicI = 0; topicI < _resubscribeTopicsKeys.length; topicI++) {
- var resubscribeTopic = {}
- resubscribeTopic[_resubscribeTopicsKeys[topicI]] = this._resubscribeTopics[_resubscribeTopicsKeys[topicI]]
- resubscribeTopic.resubscribe = true
- this.subscribe(resubscribeTopic, {properties: resubscribeTopic[_resubscribeTopicsKeys[topicI]].properties})
- }
- } else {
- this._resubscribeTopics.resubscribe = true
- this.subscribe(this._resubscribeTopics)
- }
- } else {
- this._resubscribeTopics = {}
- }
- }
- this._firstConnection = false
- }
- /**
- * _onConnect
- *
- * @api private
- */
- MqttClient.prototype._onConnect = function (packet) {
- if (this.disconnected) {
- this.emit('connect', packet)
- return
- }
- var that = this
- this._setupPingTimer()
- this._resubscribe(packet)
- this.connected = true
- function startStreamProcess () {
- var outStore = that.outgoingStore.createStream()
- function clearStoreProcessing () {
- that._storeProcessing = false
- that._packetIdsDuringStoreProcessing = {}
- }
- that.once('close', remove)
- outStore.on('error', function (err) {
- clearStoreProcessing()
- that.removeListener('close', remove)
- that.emit('error', err)
- })
- function remove () {
- outStore.destroy()
- outStore = null
- clearStoreProcessing()
- }
- function storeDeliver () {
- // edge case, we wrapped this twice
- if (!outStore) {
- return
- }
- that._storeProcessing = true
- var packet = outStore.read(1)
- var cb
- if (!packet) {
- // read when data is available in the future
- outStore.once('readable', storeDeliver)
- return
- }
- // Skip already processed store packets
- if (that._packetIdsDuringStoreProcessing[packet.messageId]) {
- storeDeliver()
- return
- }
- // Avoid unnecessary stream read operations when disconnected
- if (!that.disconnecting && !that.reconnectTimer) {
- cb = that.outgoing[packet.messageId] ? that.outgoing[packet.messageId].cb : null
- that.outgoing[packet.messageId] = {
- volatile: false,
- cb: function (err, status) {
- // Ensure that the original callback passed in to publish gets invoked
- if (cb) {
- cb(err, status)
- }
- storeDeliver()
- }
- }
- that._packetIdsDuringStoreProcessing[packet.messageId] = true
- that._sendPacket(packet)
- } else if (outStore.destroy) {
- outStore.destroy()
- }
- }
- outStore.on('end', function () {
- var allProcessed = true
- for (var id in that._packetIdsDuringStoreProcessing) {
- if (!that._packetIdsDuringStoreProcessing[id]) {
- allProcessed = false
- break
- }
- }
- if (allProcessed) {
- clearStoreProcessing()
- that.removeListener('close', remove)
- that.emit('connect', packet)
- } else {
- startStreamProcess()
- }
- })
- storeDeliver()
- }
- // start flowing
- startStreamProcess()
- }
- module.exports = MqttClient
|