client.js 32 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129
  1. 'use strict'
  2. var mqtt = require('..')
  3. var should = require('should')
  4. var fork = require('child_process').fork
  5. var path = require('path')
  6. var abstractClientTests = require('./abstract_client')
  7. var net = require('net')
  8. var eos = require('end-of-stream')
  9. var mqttPacket = require('mqtt-packet')
  10. var Buffer = require('safe-buffer').Buffer
  11. var Duplex = require('readable-stream').Duplex
  12. var Connection = require('mqtt-connection')
  13. var Server = require('./server')
  14. var FastServer = require('./server').FastMqttServer
  15. var port = 9876
  16. var server
  17. function connOnlyServer () {
  18. return new Server(function (client) {
  19. client.on('connect', function (packet) {
  20. client.connack({returnCode: 0})
  21. })
  22. })
  23. }
  24. /**
  25. * Test server
  26. */
  27. function buildServer (fastFlag) {
  28. var handler = function (client) {
  29. client.on('auth', function (packet) {
  30. var rc = 'reasonCode'
  31. var connack = {}
  32. connack[rc] = 0
  33. client.connack(connack)
  34. })
  35. client.on('connect', function (packet) {
  36. var rc = 'returnCode'
  37. var connack = {}
  38. if (client.options && client.options.protocolVersion === 5) {
  39. rc = 'reasonCode'
  40. if (packet.clientId === 'invalid') {
  41. connack[rc] = 128
  42. } else {
  43. connack[rc] = 0
  44. }
  45. } else {
  46. if (packet.clientId === 'invalid') {
  47. connack[rc] = 2
  48. } else {
  49. connack[rc] = 0
  50. }
  51. }
  52. if (packet.properties && packet.properties.authenticationMethod) {
  53. return false
  54. } else {
  55. client.connack(connack)
  56. }
  57. })
  58. client.on('publish', function (packet) {
  59. setImmediate(function () {
  60. switch (packet.qos) {
  61. case 0:
  62. break
  63. case 1:
  64. client.puback(packet)
  65. break
  66. case 2:
  67. client.pubrec(packet)
  68. break
  69. }
  70. })
  71. })
  72. client.on('pubrel', function (packet) {
  73. client.pubcomp(packet)
  74. })
  75. client.on('pubrec', function (packet) {
  76. client.pubrel(packet)
  77. })
  78. client.on('pubcomp', function () {
  79. // Nothing to be done
  80. })
  81. client.on('subscribe', function (packet) {
  82. client.suback({
  83. messageId: packet.messageId,
  84. granted: packet.subscriptions.map(function (e) {
  85. return e.qos
  86. })
  87. })
  88. })
  89. client.on('unsubscribe', function (packet) {
  90. packet.granted = packet.unsubscriptions.map(function () { return 0 })
  91. client.unsuback(packet)
  92. })
  93. client.on('pingreq', function () {
  94. client.pingresp()
  95. })
  96. }
  97. if (fastFlag) {
  98. return new FastServer(handler)
  99. } else {
  100. return new Server(handler)
  101. }
  102. }
  103. server = buildServer().listen(port)
  104. describe('MqttClient', function () {
  105. describe('creating', function () {
  106. it('should allow instantiation of MqttClient without the \'new\' operator', function (done) {
  107. should(function () {
  108. var client
  109. try {
  110. client = mqtt.MqttClient(function () {
  111. throw Error('break')
  112. }, {})
  113. client.end()
  114. } catch (err) {
  115. if (err.message !== 'break') {
  116. throw err
  117. }
  118. done()
  119. }
  120. }).not.throw('Object #<Object> has no method \'_setupStream\'')
  121. })
  122. })
  123. var config = { protocol: 'mqtt', port: port }
  124. abstractClientTests(server, config)
  125. describe('message ids', function () {
  126. it('should increment the message id', function () {
  127. var client = mqtt.connect(config)
  128. var currentId = client._nextId()
  129. client._nextId().should.equal(currentId + 1)
  130. client.end()
  131. })
  132. it('should return 1 once the internal counter reached limit', function () {
  133. var client = mqtt.connect(config)
  134. client.nextId = 65535
  135. client._nextId().should.equal(65535)
  136. client._nextId().should.equal(1)
  137. client.end()
  138. })
  139. it('should return 65535 for last message id once the internal counter reached limit', function () {
  140. var client = mqtt.connect(config)
  141. client.nextId = 65535
  142. client._nextId().should.equal(65535)
  143. client.getLastMessageId().should.equal(65535)
  144. client._nextId().should.equal(1)
  145. client.getLastMessageId().should.equal(1)
  146. client.end()
  147. })
  148. it('should not throw an error if packet\'s messageId is not found when receiving a pubrel packet', function (done) {
  149. var server2 = new Server(function (c) {
  150. c.on('connect', function (packet) {
  151. c.connack({returnCode: 0})
  152. c.pubrel({ messageId: Math.floor(Math.random() * 9000) + 1000 })
  153. })
  154. })
  155. server2.listen(port + 49, function () {
  156. var client = mqtt.connect({
  157. port: port + 49,
  158. host: 'localhost'
  159. })
  160. client.on('packetsend', function (packet) {
  161. if (packet.cmd === 'pubcomp') {
  162. client.end()
  163. server2.close()
  164. done()
  165. }
  166. })
  167. })
  168. })
  169. it('should not go overflow if the TCP frame contains a lot of PUBLISH packets', function (done) {
  170. var parser = mqttPacket.parser()
  171. var count = 0
  172. var max = 1000
  173. var duplex = new Duplex({
  174. read: function (n) {},
  175. write: function (chunk, enc, cb) {
  176. parser.parse(chunk)
  177. cb() // nothing to do
  178. }
  179. })
  180. var client = new mqtt.MqttClient(function () {
  181. return duplex
  182. }, {})
  183. client.on('message', function (t, p, packet) {
  184. if (++count === max) {
  185. done()
  186. }
  187. })
  188. parser.on('packet', function (packet) {
  189. var packets = []
  190. if (packet.cmd === 'connect') {
  191. duplex.push(mqttPacket.generate({
  192. cmd: 'connack',
  193. sessionPresent: false,
  194. returnCode: 0
  195. }))
  196. for (var i = 0; i < max; i++) {
  197. packets.push(mqttPacket.generate({
  198. cmd: 'publish',
  199. topic: Buffer.from('hello'),
  200. payload: Buffer.from('world'),
  201. retain: false,
  202. dup: false,
  203. messageId: i + 1,
  204. qos: 1
  205. }))
  206. }
  207. duplex.push(Buffer.concat(packets))
  208. }
  209. })
  210. })
  211. })
  212. describe('flushing', function () {
  213. it('should attempt to complete pending unsub and send on ping timeout', function (done) {
  214. this.timeout(10000)
  215. var server3 = connOnlyServer().listen(port + 72)
  216. var pubCallbackCalled = false
  217. var unsubscribeCallbackCalled = false
  218. var client = mqtt.connect({
  219. port: port + 72,
  220. host: 'localhost',
  221. keepalive: 1,
  222. connectTimeout: 350,
  223. reconnectPeriod: 0
  224. })
  225. client.once('connect', () => {
  226. client.publish('fakeTopic', 'fakeMessage', {qos: 1}, (err, result) => {
  227. should.exist(err)
  228. pubCallbackCalled = true
  229. })
  230. client.unsubscribe('fakeTopic', (err, result) => {
  231. should.exist(err)
  232. unsubscribeCallbackCalled = true
  233. })
  234. setTimeout(() => {
  235. client.end(() => {
  236. should.equal(pubCallbackCalled && unsubscribeCallbackCalled, true, 'callbacks not invoked')
  237. server3.close()
  238. done()
  239. })
  240. }, 5000)
  241. })
  242. })
  243. })
  244. describe('reconnecting', function () {
  245. it('should attempt to reconnect once server is down', function (done) {
  246. this.timeout(15000)
  247. var innerServer = fork(path.join(__dirname, 'helpers', 'server_process.js'))
  248. var client = mqtt.connect({ port: 3000, host: 'localhost', keepalive: 1 })
  249. client.once('connect', function () {
  250. innerServer.kill('SIGINT') // mocks server shutdown
  251. client.once('close', function () {
  252. should.exist(client.reconnectTimer)
  253. client.end()
  254. done()
  255. })
  256. })
  257. })
  258. it('should reconnect to multiple host-ports-protocol combinations if servers is passed', function (done) {
  259. this.timeout(15000)
  260. var server = buildServer(true).listen(port + 41)
  261. var server2 = buildServer(true).listen(port + 42)
  262. server2.on('listening', function () {
  263. var client = mqtt.connect({
  264. protocol: 'wss',
  265. servers: [
  266. { port: port + 42, host: 'localhost', protocol: 'ws' },
  267. { port: port + 41, host: 'localhost' }
  268. ],
  269. keepalive: 50
  270. })
  271. server2.on('client', function (c) {
  272. should.equal(client.stream.socket.url, 'ws://localhost:9918/', 'Protocol for first connection should use ws.')
  273. c.stream.destroy()
  274. server2.close()
  275. })
  276. server.once('client', function () {
  277. should.equal(client.stream.socket.url, 'wss://localhost:9917/', 'Protocol for second client should use the default protocol: wss, on port: port + 42.')
  278. client.end()
  279. done()
  280. })
  281. client.once('connect', function () {
  282. client.stream.destroy()
  283. })
  284. })
  285. })
  286. it('should reconnect if a connack is not received in an interval', function (done) {
  287. this.timeout(2000)
  288. var server2 = net.createServer().listen(port + 43)
  289. server2.on('connection', function (c) {
  290. eos(c, function () {
  291. server2.close()
  292. })
  293. })
  294. server2.on('listening', function () {
  295. var client = mqtt.connect({
  296. servers: [
  297. { port: port + 43, host: 'localhost_fake' },
  298. { port: port, host: 'localhost' }
  299. ],
  300. connectTimeout: 500
  301. })
  302. server.once('client', function () {
  303. client.end()
  304. done()
  305. })
  306. client.once('connect', function () {
  307. client.stream.destroy()
  308. })
  309. })
  310. })
  311. it('should not be cleared by the connack timer', function (done) {
  312. this.timeout(4000)
  313. var server2 = net.createServer().listen(port + 44)
  314. server2.on('connection', function (c) {
  315. c.destroy()
  316. })
  317. server2.once('listening', function () {
  318. var reconnects = 0
  319. var connectTimeout = 1000
  320. var reconnectPeriod = 100
  321. var expectedReconnects = Math.floor(connectTimeout / reconnectPeriod)
  322. var client = mqtt.connect({
  323. port: port + 44,
  324. host: 'localhost',
  325. connectTimeout: connectTimeout,
  326. reconnectPeriod: reconnectPeriod
  327. })
  328. client.on('reconnect', function () {
  329. reconnects++
  330. if (reconnects >= expectedReconnects) {
  331. client.end()
  332. done()
  333. }
  334. })
  335. })
  336. })
  337. it('should not keep requeueing the first message when offline', function (done) {
  338. this.timeout(2500)
  339. var server2 = buildServer().listen(port + 45)
  340. var client = mqtt.connect({
  341. port: port + 45,
  342. host: 'localhost',
  343. connectTimeout: 350,
  344. reconnectPeriod: 300
  345. })
  346. server2.on('client', function (c) {
  347. client.publish('hello', 'world', { qos: 1 }, function () {
  348. c.destroy()
  349. server2.close()
  350. client.publish('hello', 'world', { qos: 1 })
  351. })
  352. })
  353. setTimeout(function () {
  354. if (client.queue.length === 0) {
  355. client.end(true)
  356. done()
  357. } else {
  358. client.end(true)
  359. }
  360. }, 2000)
  361. })
  362. it('should not send the same subscribe multiple times on a flaky connection', function (done) {
  363. this.timeout(3500)
  364. var KILL_COUNT = 4
  365. var killedConnections = 0
  366. var subIds = {}
  367. var client = mqtt.connect({
  368. port: port + 46,
  369. host: 'localhost',
  370. connectTimeout: 350,
  371. reconnectPeriod: 300
  372. })
  373. var server2 = new Server(function (client) {
  374. client.on('error', function () {})
  375. client.on('connect', function (packet) {
  376. if (packet.clientId === 'invalid') {
  377. client.connack({returnCode: 2})
  378. } else {
  379. client.connack({returnCode: 0})
  380. }
  381. })
  382. }).listen(port + 46)
  383. server2.on('client', function (c) {
  384. client.subscribe('topic', function () {
  385. done()
  386. client.end()
  387. c.destroy()
  388. server2.close()
  389. })
  390. c.on('subscribe', function (packet) {
  391. if (killedConnections < KILL_COUNT) {
  392. // Kill the first few sub attempts to simulate a flaky connection
  393. killedConnections++
  394. c.destroy()
  395. } else {
  396. // Keep track of acks
  397. if (!subIds[packet.messageId]) {
  398. subIds[packet.messageId] = 0
  399. }
  400. subIds[packet.messageId]++
  401. if (subIds[packet.messageId] > 1) {
  402. done(new Error('Multiple duplicate acked subscriptions received for messageId ' + packet.messageId))
  403. client.end(true)
  404. c.destroy()
  405. server2.destroy()
  406. }
  407. c.suback({
  408. messageId: packet.messageId,
  409. granted: packet.subscriptions.map(function (e) {
  410. return e.qos
  411. })
  412. })
  413. }
  414. })
  415. })
  416. })
  417. it('should not fill the queue of subscribes if it cannot connect', function (done) {
  418. this.timeout(2500)
  419. var port2 = port + 48
  420. var server2 = net.createServer(function (stream) {
  421. var client = new Connection(stream)
  422. client.on('error', function () {})
  423. client.on('connect', function (packet) {
  424. client.connack({returnCode: 0})
  425. client.destroy()
  426. })
  427. })
  428. server2.listen(port2, function () {
  429. var client = mqtt.connect({
  430. port: port2,
  431. host: 'localhost',
  432. connectTimeout: 350,
  433. reconnectPeriod: 300
  434. })
  435. client.subscribe('hello')
  436. setTimeout(function () {
  437. client.queue.length.should.equal(1)
  438. client.end()
  439. done()
  440. }, 1000)
  441. })
  442. })
  443. it('should not send the same publish multiple times on a flaky connection', function (done) {
  444. this.timeout(3500)
  445. var KILL_COUNT = 4
  446. var killedConnections = 0
  447. var pubIds = {}
  448. var client = mqtt.connect({
  449. port: port + 47,
  450. host: 'localhost',
  451. connectTimeout: 350,
  452. reconnectPeriod: 300
  453. })
  454. var server2 = net.createServer(function (stream) {
  455. var client = new Connection(stream)
  456. client.on('error', function () {})
  457. client.on('connect', function (packet) {
  458. if (packet.clientId === 'invalid') {
  459. client.connack({returnCode: 2})
  460. } else {
  461. client.connack({returnCode: 0})
  462. }
  463. })
  464. this.emit('client', client)
  465. }).listen(port + 47)
  466. server2.on('client', function (c) {
  467. client.publish('topic', 'data', { qos: 1 }, function () {
  468. done()
  469. client.end()
  470. c.destroy()
  471. server2.destroy()
  472. })
  473. c.on('publish', function onPublish (packet) {
  474. if (killedConnections < KILL_COUNT) {
  475. // Kill the first few pub attempts to simulate a flaky connection
  476. killedConnections++
  477. c.destroy()
  478. // to avoid receiving inflight messages
  479. c.removeListener('publish', onPublish)
  480. } else {
  481. // Keep track of acks
  482. if (!pubIds[packet.messageId]) {
  483. pubIds[packet.messageId] = 0
  484. }
  485. pubIds[packet.messageId]++
  486. if (pubIds[packet.messageId] > 1) {
  487. done(new Error('Multiple duplicate acked publishes received for messageId ' + packet.messageId))
  488. client.end(true)
  489. c.destroy()
  490. server2.destroy()
  491. }
  492. c.puback(packet)
  493. }
  494. })
  495. })
  496. })
  497. })
  498. it('check emit error on checkDisconnection w/o callback', function (done) {
  499. this.timeout(15000)
  500. var server118 = new Server(function (client) {
  501. client.on('connect', function (packet) {
  502. client.connack({
  503. reasonCode: 0
  504. })
  505. })
  506. client.on('publish', function (packet) {
  507. setImmediate(function () {
  508. packet.reasonCode = 0
  509. client.puback(packet)
  510. })
  511. })
  512. }).listen(port + 118)
  513. var opts = {
  514. host: 'localhost',
  515. port: port + 118,
  516. protocolVersion: 5
  517. }
  518. var client = mqtt.connect(opts)
  519. client.on('error', function (error) {
  520. should(error.message).be.equal('client disconnecting')
  521. server118.close()
  522. done()
  523. })
  524. client.on('connect', function () {
  525. client.end(function () {
  526. client._checkDisconnecting()
  527. })
  528. server118.close()
  529. })
  530. })
  531. describe('MQTT 5.0', function () {
  532. var server = buildServer().listen(port + 115)
  533. var config = { protocol: 'mqtt', port: port + 115, protocolVersion: 5, properties: { maximumPacketSize: 200 } }
  534. abstractClientTests(server, config)
  535. it('should has Auth method with Auth data', function (done) {
  536. this.timeout(5000)
  537. var opts = {host: 'localhost', port: port + 115, protocolVersion: 5, properties: { authenticationData: Buffer.from([1, 2, 3, 4]) }}
  538. try {
  539. mqtt.connect(opts)
  540. } catch (error) {
  541. should(error.message).be.equal('Packet has no Authentication Method')
  542. }
  543. done()
  544. })
  545. it('auth packet', function (done) {
  546. this.timeout(15000)
  547. server.once('client', function (client) {
  548. client.on('auth', function (packet) {
  549. done()
  550. })
  551. })
  552. var opts = {host: 'localhost', port: port + 115, protocolVersion: 5, properties: { authenticationMethod: 'json' }, authPacket: {}}
  553. mqtt.connect(opts)
  554. })
  555. it('Maximum Packet Size', function (done) {
  556. this.timeout(15000)
  557. var opts = {host: 'localhost', port: port + 115, protocolVersion: 5, properties: { maximumPacketSize: 1 }}
  558. var client = mqtt.connect(opts)
  559. client.on('error', function (error) {
  560. should(error.message).be.equal('exceeding packets size connack')
  561. done()
  562. })
  563. })
  564. describe('Topic Alias', function () {
  565. it('topicAlias > topicAliasMaximum', function (done) {
  566. this.timeout(15000)
  567. var maximum = 15
  568. var current = 22
  569. server.once('client', function (client) {
  570. client.on('publish', function (packet) {
  571. if (packet.properties && packet.properties.topicAlias) {
  572. done(new Error('Packet should not have topicAlias'))
  573. return false
  574. }
  575. done()
  576. })
  577. })
  578. var opts = {host: 'localhost', port: port + 115, protocolVersion: 5, properties: { topicAliasMaximum: maximum }}
  579. var client = mqtt.connect(opts)
  580. client.publish('t/h', 'Message', { properties: { topicAlias: current } })
  581. })
  582. it('topicAlias w/o topicAliasMaximum in settings', function (done) {
  583. this.timeout(15000)
  584. server.once('client', function (client) {
  585. client.on('publish', function (packet) {
  586. if (packet.properties && packet.properties.topicAlias) {
  587. done(new Error('Packet should not have topicAlias'))
  588. return false
  589. }
  590. done()
  591. })
  592. })
  593. var opts = {host: 'localhost', port: port + 115, protocolVersion: 5}
  594. var client = mqtt.connect(opts)
  595. client.publish('t/h', 'Message', { properties: { topicAlias: 22 } })
  596. })
  597. })
  598. it('Change values of some properties by server response', function (done) {
  599. this.timeout(15000)
  600. var server116 = new Server(function (client) {
  601. client.on('connect', function (packet) {
  602. client.connack({
  603. reasonCode: 0,
  604. properties: {
  605. topicAliasMaximum: 15,
  606. serverKeepAlive: 16,
  607. maximumPacketSize: 95
  608. }
  609. })
  610. })
  611. }).listen(port + 116)
  612. var opts = {
  613. host: 'localhost',
  614. port: port + 116,
  615. protocolVersion: 5,
  616. properties: {
  617. topicAliasMaximum: 10,
  618. serverKeepAlive: 11,
  619. maximumPacketSize: 100
  620. }
  621. }
  622. var client = mqtt.connect(opts)
  623. client.on('connect', function () {
  624. should(client.options.keepalive).be.equal(16)
  625. should(client.options.properties.topicAliasMaximum).be.equal(15)
  626. should(client.options.properties.maximumPacketSize).be.equal(95)
  627. server116.close()
  628. done()
  629. })
  630. })
  631. it('should resubscribe when reconnecting with protocolVersion 5 and Session Present flag is false', function (done) {
  632. this.timeout(15000)
  633. var tryReconnect = true
  634. var reconnectEvent = false
  635. var server316 = new Server(function (client) {
  636. client.on('connect', function (packet) {
  637. client.connack({
  638. reasonCode: 0,
  639. sessionPresent: false
  640. })
  641. client.on('subscribe', function () {
  642. if (!tryReconnect) {
  643. client.end()
  644. server316.close()
  645. done()
  646. }
  647. })
  648. })
  649. }).listen(port + 316)
  650. var opts = {
  651. host: 'localhost',
  652. port: port + 316,
  653. protocolVersion: 5
  654. }
  655. var client = mqtt.connect(opts)
  656. client.on('reconnect', function () {
  657. reconnectEvent = true
  658. })
  659. client.on('connect', function (connack) {
  660. should(connack.sessionPresent).be.equal(false)
  661. if (tryReconnect) {
  662. client.subscribe('hello', function () {
  663. client.stream.end()
  664. })
  665. tryReconnect = false
  666. } else {
  667. reconnectEvent.should.equal(true)
  668. }
  669. })
  670. })
  671. it('should resubscribe when reconnecting with protocolVersion 5 and properties', function (done) {
  672. this.timeout(15000)
  673. var tryReconnect = true
  674. var reconnectEvent = false
  675. var server326 = new Server(function (client) {
  676. client.on('connect', function (packet) {
  677. client.on('subscribe', function (packet) {
  678. if (!reconnectEvent) {
  679. client.suback({
  680. messageId: packet.messageId,
  681. granted: packet.subscriptions.map(function (e) {
  682. return e.qos
  683. })
  684. })
  685. } else {
  686. if (!tryReconnect) {
  687. should(packet.properties.userProperties.test).be.equal('test')
  688. client.end()
  689. server326.close()
  690. done()
  691. }
  692. }
  693. })
  694. client.connack({
  695. reasonCode: 0,
  696. sessionPresent: false
  697. })
  698. })
  699. }).listen(port + 326)
  700. var opts = {
  701. host: 'localhost',
  702. port: port + 326,
  703. protocolVersion: 5
  704. }
  705. var client = mqtt.connect(opts)
  706. client.on('reconnect', function () {
  707. reconnectEvent = true
  708. })
  709. client.on('connect', function (connack) {
  710. should(connack.sessionPresent).be.equal(false)
  711. if (tryReconnect) {
  712. client.subscribe('hello', { properties: { userProperties: { test: 'test' } } }, function () {
  713. client.stream.end()
  714. })
  715. tryReconnect = false
  716. } else {
  717. reconnectEvent.should.equal(true)
  718. }
  719. })
  720. })
  721. var serverErr = new Server(function (client) {
  722. client.on('connect', function (packet) {
  723. client.connack({
  724. reasonCode: 0
  725. })
  726. })
  727. client.on('publish', function (packet) {
  728. setImmediate(function () {
  729. switch (packet.qos) {
  730. case 0:
  731. break
  732. case 1:
  733. packet.reasonCode = 142
  734. delete packet.cmd
  735. client.puback(packet)
  736. break
  737. case 2:
  738. packet.reasonCode = 142
  739. delete packet.cmd
  740. client.pubrec(packet)
  741. break
  742. }
  743. })
  744. })
  745. client.on('pubrel', function (packet) {
  746. packet.reasonCode = 142
  747. delete packet.cmd
  748. client.pubcomp(packet)
  749. })
  750. })
  751. it('Subscribe properties', function (done) {
  752. this.timeout(15000)
  753. var opts = {
  754. host: 'localhost',
  755. port: port + 119,
  756. protocolVersion: 5
  757. }
  758. var subOptions = { properties: { subscriptionIdentifier: 1234 } }
  759. var server119 = new Server(function (client) {
  760. client.on('connect', function (packet) {
  761. client.connack({
  762. reasonCode: 0
  763. })
  764. })
  765. client.on('subscribe', function (packet) {
  766. should(packet.properties.subscriptionIdentifier).be.equal(subOptions.properties.subscriptionIdentifier)
  767. server119.close()
  768. done()
  769. })
  770. }).listen(port + 119)
  771. var client = mqtt.connect(opts)
  772. client.on('connect', function () {
  773. client.subscribe('a/b', subOptions)
  774. })
  775. })
  776. it('puback handling errors check', function (done) {
  777. this.timeout(15000)
  778. serverErr.listen(port + 117)
  779. var opts = {
  780. host: 'localhost',
  781. port: port + 117,
  782. protocolVersion: 5
  783. }
  784. var client = mqtt.connect(opts)
  785. client.once('connect', () => {
  786. client.publish('a/b', 'message', {qos: 1}, function (err, packet) {
  787. should(err.message).be.equal('Publish error: Session taken over')
  788. should(err.code).be.equal(142)
  789. })
  790. serverErr.close()
  791. done()
  792. })
  793. })
  794. it('pubrec handling errors check', function (done) {
  795. this.timeout(15000)
  796. serverErr.listen(port + 118)
  797. var opts = {
  798. host: 'localhost',
  799. port: port + 118,
  800. protocolVersion: 5
  801. }
  802. var client = mqtt.connect(opts)
  803. client.once('connect', () => {
  804. client.publish('a/b', 'message', {qos: 2}, function (err, packet) {
  805. should(err.message).be.equal('Publish error: Session taken over')
  806. should(err.code).be.equal(142)
  807. })
  808. serverErr.close()
  809. done()
  810. })
  811. })
  812. it('puback handling custom reason code', function (done) {
  813. this.timeout(15000)
  814. serverErr.listen(port + 117)
  815. var opts = {
  816. host: 'localhost',
  817. port: port + 117,
  818. protocolVersion: 5,
  819. customHandleAcks: function (topic, message, packet, cb) {
  820. var code = 0
  821. if (topic === 'a/b') {
  822. code = 128
  823. }
  824. cb(code)
  825. }
  826. }
  827. serverErr.once('client', function (c) {
  828. c.once('subscribe', function () {
  829. c.publish({ topic: 'a/b', payload: 'payload', qos: 1, messageId: 1 })
  830. })
  831. c.on('puback', function (packet) {
  832. should(packet.reasonCode).be.equal(128)
  833. client.end()
  834. c.destroy()
  835. serverErr.close()
  836. done()
  837. })
  838. })
  839. var client = mqtt.connect(opts)
  840. client.once('connect', function () {
  841. client.subscribe('a/b', {qos: 1})
  842. })
  843. })
  844. it('server side disconnect', function (done) {
  845. this.timeout(15000)
  846. var server327 = new Server(function (client) {
  847. client.on('connect', function (packet) {
  848. client.connack({
  849. reasonCode: 0
  850. })
  851. client.disconnect({reasonCode: 128})
  852. server327.close()
  853. })
  854. })
  855. server327.listen(port + 327)
  856. var opts = {
  857. host: 'localhost',
  858. port: port + 327,
  859. protocolVersion: 5
  860. }
  861. var client = mqtt.connect(opts)
  862. client.once('disconnect', function (disconnectPacket) {
  863. should(disconnectPacket.reasonCode).be.equal(128)
  864. done()
  865. })
  866. })
  867. it('pubrec handling custom reason code', function (done) {
  868. this.timeout(15000)
  869. serverErr.listen(port + 117)
  870. var opts = {
  871. host: 'localhost',
  872. port: port + 117,
  873. protocolVersion: 5,
  874. customHandleAcks: function (topic, message, packet, cb) {
  875. var code = 0
  876. if (topic === 'a/b') {
  877. code = 128
  878. }
  879. cb(code)
  880. }
  881. }
  882. serverErr.once('client', function (c) {
  883. c.once('subscribe', function () {
  884. c.publish({ topic: 'a/b', payload: 'payload', qos: 2, messageId: 1 })
  885. })
  886. c.on('pubrec', function (packet) {
  887. should(packet.reasonCode).be.equal(128)
  888. client.end()
  889. c.destroy()
  890. serverErr.close()
  891. done()
  892. })
  893. })
  894. var client = mqtt.connect(opts)
  895. client.once('connect', function () {
  896. client.subscribe('a/b', {qos: 1})
  897. })
  898. })
  899. it('puback handling custom reason code with error', function (done) {
  900. this.timeout(15000)
  901. serverErr.listen(port + 117)
  902. var opts = {
  903. host: 'localhost',
  904. port: port + 117,
  905. protocolVersion: 5,
  906. customHandleAcks: function (topic, message, packet, cb) {
  907. var code = 0
  908. if (topic === 'a/b') {
  909. cb(new Error('a/b is not valid'))
  910. }
  911. cb(code)
  912. }
  913. }
  914. serverErr.once('client', function (c) {
  915. c.once('subscribe', function () {
  916. c.publish({ topic: 'a/b', payload: 'payload', qos: 1, messageId: 1 })
  917. })
  918. })
  919. var client = mqtt.connect(opts)
  920. client.on('error', function (error) {
  921. should(error.message).be.equal('a/b is not valid')
  922. client.end()
  923. serverErr.close()
  924. done()
  925. })
  926. client.once('connect', function () {
  927. client.subscribe('a/b', {qos: 1})
  928. })
  929. })
  930. it('pubrec handling custom reason code with error', function (done) {
  931. this.timeout(15000)
  932. serverErr.listen(port + 117)
  933. var opts = {
  934. host: 'localhost',
  935. port: port + 117,
  936. protocolVersion: 5,
  937. customHandleAcks: function (topic, message, packet, cb) {
  938. var code = 0
  939. if (topic === 'a/b') {
  940. cb(new Error('a/b is not valid'))
  941. }
  942. cb(code)
  943. }
  944. }
  945. serverErr.once('client', function (c) {
  946. c.once('subscribe', function () {
  947. c.publish({ topic: 'a/b', payload: 'payload', qos: 2, messageId: 1 })
  948. })
  949. })
  950. var client = mqtt.connect(opts)
  951. client.on('error', function (error) {
  952. should(error.message).be.equal('a/b is not valid')
  953. client.end()
  954. serverErr.close()
  955. done()
  956. })
  957. client.once('connect', function () {
  958. client.subscribe('a/b', {qos: 1})
  959. })
  960. })
  961. it('puback handling custom invalid reason code', function (done) {
  962. this.timeout(15000)
  963. serverErr.listen(port + 117)
  964. var opts = {
  965. host: 'localhost',
  966. port: port + 117,
  967. protocolVersion: 5,
  968. customHandleAcks: function (topic, message, packet, cb) {
  969. var code = 0
  970. if (topic === 'a/b') {
  971. code = 124124
  972. }
  973. cb(code)
  974. }
  975. }
  976. serverErr.once('client', function (c) {
  977. c.once('subscribe', function () {
  978. c.publish({ topic: 'a/b', payload: 'payload', qos: 1, messageId: 1 })
  979. })
  980. })
  981. var client = mqtt.connect(opts)
  982. client.on('error', function (error) {
  983. should(error.message).be.equal('Wrong reason code for puback')
  984. client.end()
  985. serverErr.close()
  986. done()
  987. })
  988. client.once('connect', function () {
  989. client.subscribe('a/b', {qos: 1})
  990. })
  991. })
  992. it('pubrec handling custom invalid reason code', function (done) {
  993. this.timeout(15000)
  994. serverErr.listen(port + 117)
  995. var opts = {
  996. host: 'localhost',
  997. port: port + 117,
  998. protocolVersion: 5,
  999. customHandleAcks: function (topic, message, packet, cb) {
  1000. var code = 0
  1001. if (topic === 'a/b') {
  1002. code = 34535
  1003. }
  1004. cb(code)
  1005. }
  1006. }
  1007. serverErr.once('client', function (c) {
  1008. c.once('subscribe', function () {
  1009. c.publish({ topic: 'a/b', payload: 'payload', qos: 2, messageId: 1 })
  1010. })
  1011. })
  1012. var client = mqtt.connect(opts)
  1013. client.on('error', function (error) {
  1014. should(error.message).be.equal('Wrong reason code for pubrec')
  1015. client.end()
  1016. serverErr.close()
  1017. done()
  1018. })
  1019. client.once('connect', function () {
  1020. client.subscribe('a/b', {qos: 1})
  1021. })
  1022. })
  1023. })
  1024. })