client.js 36 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460
  1. 'use strict'
  2. /**
  3. * Module dependencies
  4. */
  5. var events = require('events')
  6. var Store = require('./store')
  7. var mqttPacket = require('mqtt-packet')
  8. var Writable = require('readable-stream').Writable
  9. var inherits = require('inherits')
  10. var reInterval = require('reinterval')
  11. var validations = require('./validations')
  12. var xtend = require('xtend')
  13. var setImmediate = global.setImmediate || function (callback) {
  14. // works in node v0.8
  15. process.nextTick(callback)
  16. }
  17. var defaultConnectOptions = {
  18. keepalive: 60,
  19. reschedulePings: true,
  20. protocolId: 'MQTT',
  21. protocolVersion: 4,
  22. reconnectPeriod: 1000,
  23. connectTimeout: 30 * 1000,
  24. clean: true,
  25. resubscribe: true
  26. }
  27. var errors = {
  28. 0: '',
  29. 1: 'Unacceptable protocol version',
  30. 2: 'Identifier rejected',
  31. 3: 'Server unavailable',
  32. 4: 'Bad username or password',
  33. 5: 'Not authorized',
  34. 16: 'No matching subscribers',
  35. 17: 'No subscription existed',
  36. 128: 'Unspecified error',
  37. 129: 'Malformed Packet',
  38. 130: 'Protocol Error',
  39. 131: 'Implementation specific error',
  40. 132: 'Unsupported Protocol Version',
  41. 133: 'Client Identifier not valid',
  42. 134: 'Bad User Name or Password',
  43. 135: 'Not authorized',
  44. 136: 'Server unavailable',
  45. 137: 'Server busy',
  46. 138: 'Banned',
  47. 139: 'Server shutting down',
  48. 140: 'Bad authentication method',
  49. 141: 'Keep Alive timeout',
  50. 142: 'Session taken over',
  51. 143: 'Topic Filter invalid',
  52. 144: 'Topic Name invalid',
  53. 145: 'Packet identifier in use',
  54. 146: 'Packet Identifier not found',
  55. 147: 'Receive Maximum exceeded',
  56. 148: 'Topic Alias invalid',
  57. 149: 'Packet too large',
  58. 150: 'Message rate too high',
  59. 151: 'Quota exceeded',
  60. 152: 'Administrative action',
  61. 153: 'Payload format invalid',
  62. 154: 'Retain not supported',
  63. 155: 'QoS not supported',
  64. 156: 'Use another server',
  65. 157: 'Server moved',
  66. 158: 'Shared Subscriptions not supported',
  67. 159: 'Connection rate exceeded',
  68. 160: 'Maximum connect time',
  69. 161: 'Subscription Identifiers not supported',
  70. 162: 'Wildcard Subscriptions not supported'
  71. }
  72. function defaultId () {
  73. return 'mqttjs_' + Math.random().toString(16).substr(2, 8)
  74. }
  75. function sendPacket (client, packet, cb) {
  76. client.emit('packetsend', packet)
  77. var result = mqttPacket.writeToStream(packet, client.stream, client.options)
  78. if (!result && cb) {
  79. client.stream.once('drain', cb)
  80. } else if (cb) {
  81. cb()
  82. }
  83. }
  84. function flush (queue) {
  85. if (queue) {
  86. Object.keys(queue).forEach(function (messageId) {
  87. if (typeof queue[messageId].cb === 'function') {
  88. queue[messageId].cb(new Error('Connection closed'))
  89. delete queue[messageId]
  90. }
  91. })
  92. }
  93. }
  94. function flushVolatile (queue) {
  95. if (queue) {
  96. Object.keys(queue).forEach(function (messageId) {
  97. if (queue[messageId].volatile && typeof queue[messageId].cb === 'function') {
  98. queue[messageId].cb(new Error('Connection closed'))
  99. delete queue[messageId]
  100. }
  101. })
  102. }
  103. }
  104. function storeAndSend (client, packet, cb, cbStorePut) {
  105. client.outgoingStore.put(packet, function storedPacket (err) {
  106. if (err) {
  107. return cb && cb(err)
  108. }
  109. cbStorePut()
  110. sendPacket(client, packet, cb)
  111. })
  112. }
  113. function nop () {}
  114. /**
  115. * MqttClient constructor
  116. *
  117. * @param {Stream} stream - stream
  118. * @param {Object} [options] - connection options
  119. * (see Connection#connect)
  120. */
  121. function MqttClient (streamBuilder, options) {
  122. var k
  123. var that = this
  124. if (!(this instanceof MqttClient)) {
  125. return new MqttClient(streamBuilder, options)
  126. }
  127. this.options = options || {}
  128. // Defaults
  129. for (k in defaultConnectOptions) {
  130. if (typeof this.options[k] === 'undefined') {
  131. this.options[k] = defaultConnectOptions[k]
  132. } else {
  133. this.options[k] = options[k]
  134. }
  135. }
  136. this.options.clientId = (typeof options.clientId === 'string') ? options.clientId : defaultId()
  137. this.options.customHandleAcks = (options.protocolVersion === 5 && options.customHandleAcks) ? options.customHandleAcks : function () { arguments[3](0) }
  138. this.streamBuilder = streamBuilder
  139. // Inflight message storages
  140. this.outgoingStore = options.outgoingStore || new Store()
  141. this.incomingStore = options.incomingStore || new Store()
  142. // Should QoS zero messages be queued when the connection is broken?
  143. this.queueQoSZero = options.queueQoSZero === undefined ? true : options.queueQoSZero
  144. // map of subscribed topics to support reconnection
  145. this._resubscribeTopics = {}
  146. // map of a subscribe messageId and a topic
  147. this.messageIdToTopic = {}
  148. // Ping timer, setup in _setupPingTimer
  149. this.pingTimer = null
  150. // Is the client connected?
  151. this.connected = false
  152. // Are we disconnecting?
  153. this.disconnecting = false
  154. // Packet queue
  155. this.queue = []
  156. // connack timer
  157. this.connackTimer = null
  158. // Reconnect timer
  159. this.reconnectTimer = null
  160. // Is processing store?
  161. this._storeProcessing = false
  162. // Packet Ids are put into the store during store processing
  163. this._packetIdsDuringStoreProcessing = {}
  164. /**
  165. * MessageIDs starting with 1
  166. * ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810
  167. */
  168. this.nextId = Math.max(1, Math.floor(Math.random() * 65535))
  169. // Inflight callbacks
  170. this.outgoing = {}
  171. // True if connection is first time.
  172. this._firstConnection = true
  173. // Mark disconnected on stream close
  174. this.on('close', function () {
  175. this.connected = false
  176. clearTimeout(this.connackTimer)
  177. })
  178. // Send queued packets
  179. this.on('connect', function () {
  180. var queue = this.queue
  181. function deliver () {
  182. var entry = queue.shift()
  183. var packet = null
  184. if (!entry) {
  185. return
  186. }
  187. packet = entry.packet
  188. that._sendPacket(
  189. packet,
  190. function (err) {
  191. if (entry.cb) {
  192. entry.cb(err)
  193. }
  194. deliver()
  195. }
  196. )
  197. }
  198. deliver()
  199. })
  200. // Clear ping timer
  201. this.on('close', function () {
  202. if (that.pingTimer !== null) {
  203. that.pingTimer.clear()
  204. that.pingTimer = null
  205. }
  206. })
  207. // Setup reconnect timer on disconnect
  208. this.on('close', this._setupReconnect)
  209. events.EventEmitter.call(this)
  210. this._setupStream()
  211. }
  212. inherits(MqttClient, events.EventEmitter)
  213. /**
  214. * setup the event handlers in the inner stream.
  215. *
  216. * @api private
  217. */
  218. MqttClient.prototype._setupStream = function () {
  219. var connectPacket
  220. var that = this
  221. var writable = new Writable()
  222. var parser = mqttPacket.parser(this.options)
  223. var completeParse = null
  224. var packets = []
  225. this._clearReconnect()
  226. this.stream = this.streamBuilder(this)
  227. parser.on('packet', function (packet) {
  228. packets.push(packet)
  229. })
  230. function nextTickWork () {
  231. if (packets.length) {
  232. process.nextTick(work)
  233. } else {
  234. var done = completeParse
  235. completeParse = null
  236. done()
  237. }
  238. }
  239. function work () {
  240. var packet = packets.shift()
  241. if (packet) {
  242. that._handlePacket(packet, nextTickWork)
  243. } else {
  244. var done = completeParse
  245. completeParse = null
  246. if (done) done()
  247. }
  248. }
  249. writable._write = function (buf, enc, done) {
  250. completeParse = done
  251. parser.parse(buf)
  252. work()
  253. }
  254. this.stream.pipe(writable)
  255. // Suppress connection errors
  256. this.stream.on('error', nop)
  257. // Echo stream close
  258. this.stream.on('close', function () {
  259. flushVolatile(that.outgoing)
  260. that.emit('close')
  261. })
  262. // Send a connect packet
  263. connectPacket = Object.create(this.options)
  264. connectPacket.cmd = 'connect'
  265. // avoid message queue
  266. sendPacket(this, connectPacket)
  267. // Echo connection errors
  268. parser.on('error', this.emit.bind(this, 'error'))
  269. // auth
  270. if (this.options.properties) {
  271. if (!this.options.properties.authenticationMethod && this.options.properties.authenticationData) {
  272. this.emit('error', new Error('Packet has no Authentication Method'))
  273. return this
  274. }
  275. if (this.options.properties.authenticationMethod && this.options.authPacket && typeof this.options.authPacket === 'object') {
  276. var authPacket = xtend({cmd: 'auth', reasonCode: 0}, this.options.authPacket)
  277. sendPacket(this, authPacket)
  278. }
  279. }
  280. // many drain listeners are needed for qos 1 callbacks if the connection is intermittent
  281. this.stream.setMaxListeners(1000)
  282. clearTimeout(this.connackTimer)
  283. this.connackTimer = setTimeout(function () {
  284. that._cleanUp(true)
  285. }, this.options.connectTimeout)
  286. }
  287. MqttClient.prototype._handlePacket = function (packet, done) {
  288. var options = this.options
  289. if (options.protocolVersion === 5 && options.properties && options.properties.maximumPacketSize && options.properties.maximumPacketSize < packet.length) {
  290. this.emit('error', new Error('exceeding packets size ' + packet.cmd))
  291. this.end({reasonCode: 149, properties: { reasonString: 'Maximum packet size was exceeded' }})
  292. return this
  293. }
  294. this.emit('packetreceive', packet)
  295. switch (packet.cmd) {
  296. case 'publish':
  297. this._handlePublish(packet, done)
  298. break
  299. case 'puback':
  300. case 'pubrec':
  301. case 'pubcomp':
  302. case 'suback':
  303. case 'unsuback':
  304. this._handleAck(packet)
  305. done()
  306. break
  307. case 'pubrel':
  308. this._handlePubrel(packet, done)
  309. break
  310. case 'connack':
  311. this._handleConnack(packet)
  312. done()
  313. break
  314. case 'pingresp':
  315. this._handlePingresp(packet)
  316. done()
  317. break
  318. case 'disconnect':
  319. this._handleDisconnect(packet)
  320. done()
  321. break
  322. default:
  323. // do nothing
  324. // maybe we should do an error handling
  325. // or just log it
  326. break
  327. }
  328. }
  329. MqttClient.prototype._checkDisconnecting = function (callback) {
  330. if (this.disconnecting) {
  331. if (callback) {
  332. callback(new Error('client disconnecting'))
  333. } else {
  334. this.emit('error', new Error('client disconnecting'))
  335. }
  336. }
  337. return this.disconnecting
  338. }
  339. /**
  340. * publish - publish <message> to <topic>
  341. *
  342. * @param {String} topic - topic to publish to
  343. * @param {String, Buffer} message - message to publish
  344. * @param {Object} [opts] - publish options, includes:
  345. * {Number} qos - qos level to publish on
  346. * {Boolean} retain - whether or not to retain the message
  347. * {Boolean} dup - whether or not mark a message as duplicate
  348. * {Function} cbStorePut - function(){} called when message is put into `outgoingStore`
  349. * @param {Function} [callback] - function(err){}
  350. * called when publish succeeds or fails
  351. * @returns {MqttClient} this - for chaining
  352. * @api public
  353. *
  354. * @example client.publish('topic', 'message');
  355. * @example
  356. * client.publish('topic', 'message', {qos: 1, retain: true, dup: true});
  357. * @example client.publish('topic', 'message', console.log);
  358. */
  359. MqttClient.prototype.publish = function (topic, message, opts, callback) {
  360. var packet
  361. var options = this.options
  362. // .publish(topic, payload, cb);
  363. if (typeof opts === 'function') {
  364. callback = opts
  365. opts = null
  366. }
  367. // default opts
  368. var defaultOpts = {qos: 0, retain: false, dup: false}
  369. opts = xtend(defaultOpts, opts)
  370. if (this._checkDisconnecting(callback)) {
  371. return this
  372. }
  373. packet = {
  374. cmd: 'publish',
  375. topic: topic,
  376. payload: message,
  377. qos: opts.qos,
  378. retain: opts.retain,
  379. messageId: this._nextId(),
  380. dup: opts.dup
  381. }
  382. if (options.protocolVersion === 5) {
  383. packet.properties = opts.properties
  384. if ((!options.properties && packet.properties && packet.properties.topicAlias) || ((opts.properties && options.properties) &&
  385. ((opts.properties.topicAlias && options.properties.topicAliasMaximum && opts.properties.topicAlias > options.properties.topicAliasMaximum) ||
  386. (!options.properties.topicAliasMaximum && opts.properties.topicAlias)))) {
  387. /*
  388. if we are don`t setup topic alias or
  389. topic alias maximum less than topic alias or
  390. server don`t give topic alias maximum,
  391. we are removing topic alias from packet
  392. */
  393. delete packet.properties.topicAlias
  394. }
  395. }
  396. switch (opts.qos) {
  397. case 1:
  398. case 2:
  399. // Add to callbacks
  400. this.outgoing[packet.messageId] = {
  401. volatile: false,
  402. cb: callback || nop
  403. }
  404. if (this._storeProcessing) {
  405. this._packetIdsDuringStoreProcessing[packet.messageId] = false
  406. this._storePacket(packet, undefined, opts.cbStorePut)
  407. } else {
  408. this._sendPacket(packet, undefined, opts.cbStorePut)
  409. }
  410. break
  411. default:
  412. if (this._storeProcessing) {
  413. this._storePacket(packet, callback, opts.cbStorePut)
  414. } else {
  415. this._sendPacket(packet, callback, opts.cbStorePut)
  416. }
  417. break
  418. }
  419. return this
  420. }
  421. /**
  422. * subscribe - subscribe to <topic>
  423. *
  424. * @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos}
  425. * @param {Object} [opts] - optional subscription options, includes:
  426. * {Number} qos - subscribe qos level
  427. * @param {Function} [callback] - function(err, granted){} where:
  428. * {Error} err - subscription error (none at the moment!)
  429. * {Array} granted - array of {topic: 't', qos: 0}
  430. * @returns {MqttClient} this - for chaining
  431. * @api public
  432. * @example client.subscribe('topic');
  433. * @example client.subscribe('topic', {qos: 1});
  434. * @example client.subscribe({'topic': {qos: 0}, 'topic2': {qos: 1}}, console.log);
  435. * @example client.subscribe('topic', console.log);
  436. */
  437. MqttClient.prototype.subscribe = function () {
  438. var packet
  439. var args = new Array(arguments.length)
  440. for (var i = 0; i < arguments.length; i++) {
  441. args[i] = arguments[i]
  442. }
  443. var subs = []
  444. var obj = args.shift()
  445. var resubscribe = obj.resubscribe
  446. var callback = args.pop() || nop
  447. var opts = args.pop()
  448. var invalidTopic
  449. var that = this
  450. var version = this.options.protocolVersion
  451. delete obj.resubscribe
  452. if (typeof obj === 'string') {
  453. obj = [obj]
  454. }
  455. if (typeof callback !== 'function') {
  456. opts = callback
  457. callback = nop
  458. }
  459. invalidTopic = validations.validateTopics(obj)
  460. if (invalidTopic !== null) {
  461. setImmediate(callback, new Error('Invalid topic ' + invalidTopic))
  462. return this
  463. }
  464. if (this._checkDisconnecting(callback)) {
  465. return this
  466. }
  467. var defaultOpts = {
  468. qos: 0
  469. }
  470. if (version === 5) {
  471. defaultOpts.nl = false
  472. defaultOpts.rap = false
  473. defaultOpts.rh = 0
  474. }
  475. opts = xtend(defaultOpts, opts)
  476. if (Array.isArray(obj)) {
  477. obj.forEach(function (topic) {
  478. if (!that._resubscribeTopics.hasOwnProperty(topic) ||
  479. that._resubscribeTopics[topic].qos < opts.qos ||
  480. resubscribe) {
  481. var currentOpts = {
  482. topic: topic,
  483. qos: opts.qos
  484. }
  485. if (version === 5) {
  486. currentOpts.nl = opts.nl
  487. currentOpts.rap = opts.rap
  488. currentOpts.rh = opts.rh
  489. currentOpts.properties = opts.properties
  490. }
  491. subs.push(currentOpts)
  492. }
  493. })
  494. } else {
  495. Object
  496. .keys(obj)
  497. .forEach(function (k) {
  498. if (!that._resubscribeTopics.hasOwnProperty(k) ||
  499. that._resubscribeTopics[k].qos < obj[k].qos ||
  500. resubscribe) {
  501. var currentOpts = {
  502. topic: k,
  503. qos: obj[k].qos
  504. }
  505. if (version === 5) {
  506. currentOpts.nl = obj[k].nl
  507. currentOpts.rap = obj[k].rap
  508. currentOpts.rh = obj[k].rh
  509. currentOpts.properties = opts.properties
  510. }
  511. subs.push(currentOpts)
  512. }
  513. })
  514. }
  515. packet = {
  516. cmd: 'subscribe',
  517. subscriptions: subs,
  518. qos: 1,
  519. retain: false,
  520. dup: false,
  521. messageId: this._nextId()
  522. }
  523. if (opts.properties) {
  524. packet.properties = opts.properties
  525. }
  526. if (!subs.length) {
  527. callback(null, [])
  528. return
  529. }
  530. // subscriptions to resubscribe to in case of disconnect
  531. if (this.options.resubscribe) {
  532. var topics = []
  533. subs.forEach(function (sub) {
  534. if (that.options.reconnectPeriod > 0) {
  535. var topic = { qos: sub.qos }
  536. if (version === 5) {
  537. topic.nl = sub.nl || false
  538. topic.rap = sub.rap || false
  539. topic.rh = sub.rh || 0
  540. topic.properties = sub.properties
  541. }
  542. that._resubscribeTopics[sub.topic] = topic
  543. topics.push(sub.topic)
  544. }
  545. })
  546. that.messageIdToTopic[packet.messageId] = topics
  547. }
  548. this.outgoing[packet.messageId] = {
  549. volatile: true,
  550. cb: function (err, packet) {
  551. if (!err) {
  552. var granted = packet.granted
  553. for (var i = 0; i < granted.length; i += 1) {
  554. subs[i].qos = granted[i]
  555. }
  556. }
  557. callback(err, subs)
  558. }
  559. }
  560. this._sendPacket(packet)
  561. return this
  562. }
  563. /**
  564. * unsubscribe - unsubscribe from topic(s)
  565. *
  566. * @param {String, Array} topic - topics to unsubscribe from
  567. * @param {Object} [opts] - optional subscription options, includes:
  568. * {Object} properties - properties of unsubscribe packet
  569. * @param {Function} [callback] - callback fired on unsuback
  570. * @returns {MqttClient} this - for chaining
  571. * @api public
  572. * @example client.unsubscribe('topic');
  573. * @example client.unsubscribe('topic', console.log);
  574. */
  575. MqttClient.prototype.unsubscribe = function () {
  576. var packet = {
  577. cmd: 'unsubscribe',
  578. qos: 1,
  579. messageId: this._nextId()
  580. }
  581. var that = this
  582. var args = new Array(arguments.length)
  583. for (var i = 0; i < arguments.length; i++) {
  584. args[i] = arguments[i]
  585. }
  586. var topic = args.shift()
  587. var callback = args.pop() || nop
  588. var opts = args.pop()
  589. if (typeof topic === 'string') {
  590. topic = [topic]
  591. }
  592. if (typeof callback !== 'function') {
  593. opts = callback
  594. callback = nop
  595. }
  596. if (this._checkDisconnecting(callback)) {
  597. return this
  598. }
  599. if (typeof topic === 'string') {
  600. packet.unsubscriptions = [topic]
  601. } else if (typeof topic === 'object' && topic.length) {
  602. packet.unsubscriptions = topic
  603. }
  604. if (this.options.resubscribe) {
  605. packet.unsubscriptions.forEach(function (topic) {
  606. delete that._resubscribeTopics[topic]
  607. })
  608. }
  609. if (typeof opts === 'object' && opts.properties) {
  610. packet.properties = opts.properties
  611. }
  612. this.outgoing[packet.messageId] = {
  613. volatile: true,
  614. cb: callback
  615. }
  616. this._sendPacket(packet)
  617. return this
  618. }
  619. /**
  620. * end - close connection
  621. *
  622. * @returns {MqttClient} this - for chaining
  623. * @param {Boolean} force - do not wait for all in-flight messages to be acked
  624. * @param {Function} cb - called when the client has been closed
  625. *
  626. * @api public
  627. */
  628. MqttClient.prototype.end = function () {
  629. var that = this
  630. var force = arguments[0]
  631. var opts = arguments[1]
  632. var cb = arguments[2]
  633. if (force == null || typeof force !== 'boolean') {
  634. cb = opts || nop
  635. opts = force
  636. force = false
  637. if (typeof opts !== 'object') {
  638. cb = opts
  639. opts = null
  640. if (typeof cb !== 'function') {
  641. cb = nop
  642. }
  643. }
  644. }
  645. if (typeof opts !== 'object') {
  646. cb = opts
  647. opts = null
  648. }
  649. cb = cb || nop
  650. function closeStores () {
  651. that.disconnected = true
  652. that.incomingStore.close(function () {
  653. that.outgoingStore.close(function () {
  654. if (cb) {
  655. cb.apply(null, arguments)
  656. }
  657. that.emit('end')
  658. })
  659. })
  660. if (that._deferredReconnect) {
  661. that._deferredReconnect()
  662. }
  663. }
  664. function finish () {
  665. // defer closesStores of an I/O cycle,
  666. // just to make sure things are
  667. // ok for websockets
  668. that._cleanUp(force, setImmediate.bind(null, closeStores), opts)
  669. }
  670. if (this.disconnecting) {
  671. return this
  672. }
  673. this._clearReconnect()
  674. this.disconnecting = true
  675. if (!force && Object.keys(this.outgoing).length > 0) {
  676. // wait 10ms, just to be sure we received all of it
  677. this.once('outgoingEmpty', setTimeout.bind(null, finish, 10))
  678. } else {
  679. finish()
  680. }
  681. return this
  682. }
  683. /**
  684. * removeOutgoingMessage - remove a message in outgoing store
  685. * the outgoing callback will be called withe Error('Message removed') if the message is removed
  686. *
  687. * @param {Number} mid - messageId to remove message
  688. * @returns {MqttClient} this - for chaining
  689. * @api public
  690. *
  691. * @example client.removeOutgoingMessage(client.getLastMessageId());
  692. */
  693. MqttClient.prototype.removeOutgoingMessage = function (mid) {
  694. var cb = this.outgoing[mid] ? this.outgoing[mid].cb : null
  695. delete this.outgoing[mid]
  696. this.outgoingStore.del({messageId: mid}, function () {
  697. cb(new Error('Message removed'))
  698. })
  699. return this
  700. }
  701. /**
  702. * reconnect - connect again using the same options as connect()
  703. *
  704. * @param {Object} [opts] - optional reconnect options, includes:
  705. * {Store} incomingStore - a store for the incoming packets
  706. * {Store} outgoingStore - a store for the outgoing packets
  707. * if opts is not given, current stores are used
  708. * @returns {MqttClient} this - for chaining
  709. *
  710. * @api public
  711. */
  712. MqttClient.prototype.reconnect = function (opts) {
  713. var that = this
  714. var f = function () {
  715. if (opts) {
  716. that.options.incomingStore = opts.incomingStore
  717. that.options.outgoingStore = opts.outgoingStore
  718. } else {
  719. that.options.incomingStore = null
  720. that.options.outgoingStore = null
  721. }
  722. that.incomingStore = that.options.incomingStore || new Store()
  723. that.outgoingStore = that.options.outgoingStore || new Store()
  724. that.disconnecting = false
  725. that.disconnected = false
  726. that._deferredReconnect = null
  727. that._reconnect()
  728. }
  729. if (this.disconnecting && !this.disconnected) {
  730. this._deferredReconnect = f
  731. } else {
  732. f()
  733. }
  734. return this
  735. }
  736. /**
  737. * _reconnect - implement reconnection
  738. * @api privateish
  739. */
  740. MqttClient.prototype._reconnect = function () {
  741. this.emit('reconnect')
  742. this._setupStream()
  743. }
  744. /**
  745. * _setupReconnect - setup reconnect timer
  746. */
  747. MqttClient.prototype._setupReconnect = function () {
  748. var that = this
  749. if (!that.disconnecting && !that.reconnectTimer && (that.options.reconnectPeriod > 0)) {
  750. if (!this.reconnecting) {
  751. this.emit('offline')
  752. this.reconnecting = true
  753. }
  754. that.reconnectTimer = setInterval(function () {
  755. that._reconnect()
  756. }, that.options.reconnectPeriod)
  757. }
  758. }
  759. /**
  760. * _clearReconnect - clear the reconnect timer
  761. */
  762. MqttClient.prototype._clearReconnect = function () {
  763. if (this.reconnectTimer) {
  764. clearInterval(this.reconnectTimer)
  765. this.reconnectTimer = null
  766. }
  767. }
  768. /**
  769. * _cleanUp - clean up on connection end
  770. * @api private
  771. */
  772. MqttClient.prototype._cleanUp = function (forced, done) {
  773. var opts = arguments[2]
  774. if (done) {
  775. this.stream.on('close', done)
  776. }
  777. if (forced) {
  778. if ((this.options.reconnectPeriod === 0) && this.options.clean) {
  779. flush(this.outgoing)
  780. }
  781. this.stream.destroy()
  782. } else {
  783. var packet = xtend({ cmd: 'disconnect' }, opts)
  784. this._sendPacket(
  785. packet,
  786. setImmediate.bind(
  787. null,
  788. this.stream.end.bind(this.stream)
  789. )
  790. )
  791. }
  792. if (!this.disconnecting) {
  793. this._clearReconnect()
  794. this._setupReconnect()
  795. }
  796. if (this.pingTimer !== null) {
  797. this.pingTimer.clear()
  798. this.pingTimer = null
  799. }
  800. if (done && !this.connected) {
  801. this.stream.removeListener('close', done)
  802. done()
  803. }
  804. }
  805. /**
  806. * _sendPacket - send or queue a packet
  807. * @param {String} type - packet type (see `protocol`)
  808. * @param {Object} packet - packet options
  809. * @param {Function} cb - callback when the packet is sent
  810. * @param {Function} cbStorePut - called when message is put into outgoingStore
  811. * @api private
  812. */
  813. MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) {
  814. cbStorePut = cbStorePut || nop
  815. if (!this.connected) {
  816. this._storePacket(packet, cb, cbStorePut)
  817. return
  818. }
  819. // When sending a packet, reschedule the ping timer
  820. this._shiftPingInterval()
  821. switch (packet.cmd) {
  822. case 'publish':
  823. break
  824. case 'pubrel':
  825. storeAndSend(this, packet, cb, cbStorePut)
  826. return
  827. default:
  828. sendPacket(this, packet, cb)
  829. return
  830. }
  831. switch (packet.qos) {
  832. case 2:
  833. case 1:
  834. storeAndSend(this, packet, cb, cbStorePut)
  835. break
  836. /**
  837. * no need of case here since it will be caught by default
  838. * and jshint comply that before default it must be a break
  839. * anyway it will result in -1 evaluation
  840. */
  841. case 0:
  842. /* falls through */
  843. default:
  844. sendPacket(this, packet, cb)
  845. break
  846. }
  847. }
  848. /**
  849. * _storePacket - queue a packet
  850. * @param {String} type - packet type (see `protocol`)
  851. * @param {Object} packet - packet options
  852. * @param {Function} cb - callback when the packet is sent
  853. * @param {Function} cbStorePut - called when message is put into outgoingStore
  854. * @api private
  855. */
  856. MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) {
  857. cbStorePut = cbStorePut || nop
  858. if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') {
  859. this.queue.push({ packet: packet, cb: cb })
  860. } else if (packet.qos > 0) {
  861. cb = this.outgoing[packet.messageId] ? this.outgoing[packet.messageId].cb : null
  862. this.outgoingStore.put(packet, function (err) {
  863. if (err) {
  864. return cb && cb(err)
  865. }
  866. cbStorePut()
  867. })
  868. } else if (cb) {
  869. cb(new Error('No connection to broker'))
  870. }
  871. }
  872. /**
  873. * _setupPingTimer - setup the ping timer
  874. *
  875. * @api private
  876. */
  877. MqttClient.prototype._setupPingTimer = function () {
  878. var that = this
  879. if (!this.pingTimer && this.options.keepalive) {
  880. this.pingResp = true
  881. this.pingTimer = reInterval(function () {
  882. that._checkPing()
  883. }, this.options.keepalive * 1000)
  884. }
  885. }
  886. /**
  887. * _shiftPingInterval - reschedule the ping interval
  888. *
  889. * @api private
  890. */
  891. MqttClient.prototype._shiftPingInterval = function () {
  892. if (this.pingTimer && this.options.keepalive && this.options.reschedulePings) {
  893. this.pingTimer.reschedule(this.options.keepalive * 1000)
  894. }
  895. }
  896. /**
  897. * _checkPing - check if a pingresp has come back, and ping the server again
  898. *
  899. * @api private
  900. */
  901. MqttClient.prototype._checkPing = function () {
  902. if (this.pingResp) {
  903. this.pingResp = false
  904. this._sendPacket({ cmd: 'pingreq' })
  905. } else {
  906. // do a forced cleanup since socket will be in bad shape
  907. this._cleanUp(true)
  908. }
  909. }
  910. /**
  911. * _handlePingresp - handle a pingresp
  912. *
  913. * @api private
  914. */
  915. MqttClient.prototype._handlePingresp = function () {
  916. this.pingResp = true
  917. }
  918. /**
  919. * _handleConnack
  920. *
  921. * @param {Object} packet
  922. * @api private
  923. */
  924. MqttClient.prototype._handleConnack = function (packet) {
  925. var options = this.options
  926. var version = options.protocolVersion
  927. var rc = version === 5 ? packet.reasonCode : packet.returnCode
  928. clearTimeout(this.connackTimer)
  929. if (packet.properties) {
  930. if (packet.properties.topicAliasMaximum) {
  931. if (!options.properties) { options.properties = {} }
  932. options.properties.topicAliasMaximum = packet.properties.topicAliasMaximum
  933. }
  934. if (packet.properties.serverKeepAlive && options.keepalive) {
  935. options.keepalive = packet.properties.serverKeepAlive
  936. this._shiftPingInterval()
  937. }
  938. if (packet.properties.maximumPacketSize) {
  939. if (!options.properties) { options.properties = {} }
  940. options.properties.maximumPacketSize = packet.properties.maximumPacketSize
  941. }
  942. }
  943. if (rc === 0) {
  944. this.reconnecting = false
  945. this._onConnect(packet)
  946. } else if (rc > 0) {
  947. var err = new Error('Connection refused: ' + errors[rc])
  948. err.code = rc
  949. this.emit('error', err)
  950. }
  951. }
  952. /**
  953. * _handlePublish
  954. *
  955. * @param {Object} packet
  956. * @api private
  957. */
  958. /*
  959. those late 2 case should be rewrite to comply with coding style:
  960. case 1:
  961. case 0:
  962. // do not wait sending a puback
  963. // no callback passed
  964. if (1 === qos) {
  965. this._sendPacket({
  966. cmd: 'puback',
  967. messageId: mid
  968. });
  969. }
  970. // emit the message event for both qos 1 and 0
  971. this.emit('message', topic, message, packet);
  972. this.handleMessage(packet, done);
  973. break;
  974. default:
  975. // do nothing but every switch mus have a default
  976. // log or throw an error about unknown qos
  977. break;
  978. for now i just suppressed the warnings
  979. */
  980. MqttClient.prototype._handlePublish = function (packet, done) {
  981. done = typeof done !== 'undefined' ? done : nop
  982. var topic = packet.topic.toString()
  983. var message = packet.payload
  984. var qos = packet.qos
  985. var mid = packet.messageId
  986. var that = this
  987. var options = this.options
  988. var validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153]
  989. switch (qos) {
  990. case 2: {
  991. options.customHandleAcks(topic, message, packet, function (error, code) {
  992. if (!(error instanceof Error)) {
  993. code = error
  994. error = null
  995. }
  996. if (error) { return that.emit('error', error) }
  997. if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for pubrec')) }
  998. if (code) {
  999. that._sendPacket({cmd: 'pubrec', messageId: mid, reasonCode: code}, done)
  1000. } else {
  1001. that.incomingStore.put(packet, function () {
  1002. that._sendPacket({cmd: 'pubrec', messageId: mid}, done)
  1003. })
  1004. }
  1005. })
  1006. break
  1007. }
  1008. case 1: {
  1009. // emit the message event
  1010. options.customHandleAcks(topic, message, packet, function (error, code) {
  1011. if (!(error instanceof Error)) {
  1012. code = error
  1013. error = null
  1014. }
  1015. if (error) { return that.emit('error', error) }
  1016. if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for puback')) }
  1017. if (!code) { that.emit('message', topic, message, packet) }
  1018. that.handleMessage(packet, function (err) {
  1019. if (err) {
  1020. return done && done(err)
  1021. }
  1022. that._sendPacket({cmd: 'puback', messageId: mid, reasonCode: code}, done)
  1023. })
  1024. })
  1025. break
  1026. }
  1027. case 0:
  1028. // emit the message event
  1029. this.emit('message', topic, message, packet)
  1030. this.handleMessage(packet, done)
  1031. break
  1032. default:
  1033. // do nothing
  1034. // log or throw an error about unknown qos
  1035. break
  1036. }
  1037. }
  1038. /**
  1039. * Handle messages with backpressure support, one at a time.
  1040. * Override at will.
  1041. *
  1042. * @param Packet packet the packet
  1043. * @param Function callback call when finished
  1044. * @api public
  1045. */
  1046. MqttClient.prototype.handleMessage = function (packet, callback) {
  1047. callback()
  1048. }
  1049. /**
  1050. * _handleAck
  1051. *
  1052. * @param {Object} packet
  1053. * @api private
  1054. */
  1055. MqttClient.prototype._handleAck = function (packet) {
  1056. /* eslint no-fallthrough: "off" */
  1057. var mid = packet.messageId
  1058. var type = packet.cmd
  1059. var response = null
  1060. var cb = this.outgoing[mid] ? this.outgoing[mid].cb : null
  1061. var that = this
  1062. var err
  1063. if (!cb) {
  1064. // Server sent an ack in error, ignore it.
  1065. return
  1066. }
  1067. // Process
  1068. switch (type) {
  1069. case 'pubcomp':
  1070. // same thing as puback for QoS 2
  1071. case 'puback':
  1072. var pubackRC = packet.reasonCode
  1073. // Callback - we're done
  1074. if (pubackRC && pubackRC > 0 && pubackRC !== 16) {
  1075. err = new Error('Publish error: ' + errors[pubackRC])
  1076. err.code = pubackRC
  1077. cb(err, packet)
  1078. }
  1079. delete this.outgoing[mid]
  1080. this.outgoingStore.del(packet, cb)
  1081. break
  1082. case 'pubrec':
  1083. response = {
  1084. cmd: 'pubrel',
  1085. qos: 2,
  1086. messageId: mid
  1087. }
  1088. var pubrecRC = packet.reasonCode
  1089. if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) {
  1090. err = new Error('Publish error: ' + errors[pubrecRC])
  1091. err.code = pubrecRC
  1092. cb(err, packet)
  1093. } else {
  1094. this._sendPacket(response)
  1095. }
  1096. break
  1097. case 'suback':
  1098. delete this.outgoing[mid]
  1099. for (var grantedI = 0; grantedI < packet.granted.length; grantedI++) {
  1100. if ((packet.granted[grantedI] & 0x80) !== 0) {
  1101. // suback with Failure status
  1102. var topics = this.messageIdToTopic[mid]
  1103. if (topics) {
  1104. topics.forEach(function (topic) {
  1105. delete that._resubscribeTopics[topic]
  1106. })
  1107. }
  1108. }
  1109. }
  1110. cb(null, packet)
  1111. break
  1112. case 'unsuback':
  1113. delete this.outgoing[mid]
  1114. cb(null)
  1115. break
  1116. default:
  1117. that.emit('error', new Error('unrecognized packet type'))
  1118. }
  1119. if (this.disconnecting &&
  1120. Object.keys(this.outgoing).length === 0) {
  1121. this.emit('outgoingEmpty')
  1122. }
  1123. }
  1124. /**
  1125. * _handlePubrel
  1126. *
  1127. * @param {Object} packet
  1128. * @api private
  1129. */
  1130. MqttClient.prototype._handlePubrel = function (packet, callback) {
  1131. callback = typeof callback !== 'undefined' ? callback : nop
  1132. var mid = packet.messageId
  1133. var that = this
  1134. var comp = {cmd: 'pubcomp', messageId: mid}
  1135. that.incomingStore.get(packet, function (err, pub) {
  1136. if (!err) {
  1137. that.emit('message', pub.topic, pub.payload, pub)
  1138. that.handleMessage(pub, function (err) {
  1139. if (err) {
  1140. return callback(err)
  1141. }
  1142. that.incomingStore.del(pub, nop)
  1143. that._sendPacket(comp, callback)
  1144. })
  1145. } else {
  1146. that._sendPacket(comp, callback)
  1147. }
  1148. })
  1149. }
  1150. /**
  1151. * _handleDisconnect
  1152. *
  1153. * @param {Object} packet
  1154. * @api private
  1155. */
  1156. MqttClient.prototype._handleDisconnect = function (packet) {
  1157. this.emit('disconnect', packet)
  1158. }
  1159. /**
  1160. * _nextId
  1161. * @return unsigned int
  1162. */
  1163. MqttClient.prototype._nextId = function () {
  1164. // id becomes current state of this.nextId and increments afterwards
  1165. var id = this.nextId++
  1166. // Ensure 16 bit unsigned int (max 65535, nextId got one higher)
  1167. if (this.nextId === 65536) {
  1168. this.nextId = 1
  1169. }
  1170. return id
  1171. }
  1172. /**
  1173. * getLastMessageId
  1174. * @return unsigned int
  1175. */
  1176. MqttClient.prototype.getLastMessageId = function () {
  1177. return (this.nextId === 1) ? 65535 : (this.nextId - 1)
  1178. }
  1179. /**
  1180. * _resubscribe
  1181. * @api private
  1182. */
  1183. MqttClient.prototype._resubscribe = function (connack) {
  1184. var _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics)
  1185. if (!this._firstConnection &&
  1186. (this.options.clean || (this.options.protocolVersion === 5 && !connack.sessionPresent)) &&
  1187. _resubscribeTopicsKeys.length > 0) {
  1188. if (this.options.resubscribe) {
  1189. if (this.options.protocolVersion === 5) {
  1190. for (var topicI = 0; topicI < _resubscribeTopicsKeys.length; topicI++) {
  1191. var resubscribeTopic = {}
  1192. resubscribeTopic[_resubscribeTopicsKeys[topicI]] = this._resubscribeTopics[_resubscribeTopicsKeys[topicI]]
  1193. resubscribeTopic.resubscribe = true
  1194. this.subscribe(resubscribeTopic, {properties: resubscribeTopic[_resubscribeTopicsKeys[topicI]].properties})
  1195. }
  1196. } else {
  1197. this._resubscribeTopics.resubscribe = true
  1198. this.subscribe(this._resubscribeTopics)
  1199. }
  1200. } else {
  1201. this._resubscribeTopics = {}
  1202. }
  1203. }
  1204. this._firstConnection = false
  1205. }
  1206. /**
  1207. * _onConnect
  1208. *
  1209. * @api private
  1210. */
  1211. MqttClient.prototype._onConnect = function (packet) {
  1212. if (this.disconnected) {
  1213. this.emit('connect', packet)
  1214. return
  1215. }
  1216. var that = this
  1217. this._setupPingTimer()
  1218. this._resubscribe(packet)
  1219. this.connected = true
  1220. function startStreamProcess () {
  1221. var outStore = that.outgoingStore.createStream()
  1222. function clearStoreProcessing () {
  1223. that._storeProcessing = false
  1224. that._packetIdsDuringStoreProcessing = {}
  1225. }
  1226. that.once('close', remove)
  1227. outStore.on('error', function (err) {
  1228. clearStoreProcessing()
  1229. that.removeListener('close', remove)
  1230. that.emit('error', err)
  1231. })
  1232. function remove () {
  1233. outStore.destroy()
  1234. outStore = null
  1235. clearStoreProcessing()
  1236. }
  1237. function storeDeliver () {
  1238. // edge case, we wrapped this twice
  1239. if (!outStore) {
  1240. return
  1241. }
  1242. that._storeProcessing = true
  1243. var packet = outStore.read(1)
  1244. var cb
  1245. if (!packet) {
  1246. // read when data is available in the future
  1247. outStore.once('readable', storeDeliver)
  1248. return
  1249. }
  1250. // Skip already processed store packets
  1251. if (that._packetIdsDuringStoreProcessing[packet.messageId]) {
  1252. storeDeliver()
  1253. return
  1254. }
  1255. // Avoid unnecessary stream read operations when disconnected
  1256. if (!that.disconnecting && !that.reconnectTimer) {
  1257. cb = that.outgoing[packet.messageId] ? that.outgoing[packet.messageId].cb : null
  1258. that.outgoing[packet.messageId] = {
  1259. volatile: false,
  1260. cb: function (err, status) {
  1261. // Ensure that the original callback passed in to publish gets invoked
  1262. if (cb) {
  1263. cb(err, status)
  1264. }
  1265. storeDeliver()
  1266. }
  1267. }
  1268. that._packetIdsDuringStoreProcessing[packet.messageId] = true
  1269. that._sendPacket(packet)
  1270. } else if (outStore.destroy) {
  1271. outStore.destroy()
  1272. }
  1273. }
  1274. outStore.on('end', function () {
  1275. var allProcessed = true
  1276. for (var id in that._packetIdsDuringStoreProcessing) {
  1277. if (!that._packetIdsDuringStoreProcessing[id]) {
  1278. allProcessed = false
  1279. break
  1280. }
  1281. }
  1282. if (allProcessed) {
  1283. clearStoreProcessing()
  1284. that.removeListener('close', remove)
  1285. that.emit('connect', packet)
  1286. } else {
  1287. startStreamProcess()
  1288. }
  1289. })
  1290. storeDeliver()
  1291. }
  1292. // start flowing
  1293. startStreamProcess()
  1294. }
  1295. module.exports = MqttClient