1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129 |
- 'use strict'
- var mqtt = require('..')
- var should = require('should')
- var fork = require('child_process').fork
- var path = require('path')
- var abstractClientTests = require('./abstract_client')
- var net = require('net')
- var eos = require('end-of-stream')
- var mqttPacket = require('mqtt-packet')
- var Buffer = require('safe-buffer').Buffer
- var Duplex = require('readable-stream').Duplex
- var Connection = require('mqtt-connection')
- var Server = require('./server')
- var FastServer = require('./server').FastMqttServer
- var port = 9876
- var server
- function connOnlyServer () {
- return new Server(function (client) {
- client.on('connect', function (packet) {
- client.connack({returnCode: 0})
- })
- })
- }
- /**
- * Test server
- */
- function buildServer (fastFlag) {
- var handler = function (client) {
- client.on('auth', function (packet) {
- var rc = 'reasonCode'
- var connack = {}
- connack[rc] = 0
- client.connack(connack)
- })
- client.on('connect', function (packet) {
- var rc = 'returnCode'
- var connack = {}
- if (client.options && client.options.protocolVersion === 5) {
- rc = 'reasonCode'
- if (packet.clientId === 'invalid') {
- connack[rc] = 128
- } else {
- connack[rc] = 0
- }
- } else {
- if (packet.clientId === 'invalid') {
- connack[rc] = 2
- } else {
- connack[rc] = 0
- }
- }
- if (packet.properties && packet.properties.authenticationMethod) {
- return false
- } else {
- client.connack(connack)
- }
- })
- client.on('publish', function (packet) {
- setImmediate(function () {
- switch (packet.qos) {
- case 0:
- break
- case 1:
- client.puback(packet)
- break
- case 2:
- client.pubrec(packet)
- break
- }
- })
- })
- client.on('pubrel', function (packet) {
- client.pubcomp(packet)
- })
- client.on('pubrec', function (packet) {
- client.pubrel(packet)
- })
- client.on('pubcomp', function () {
- // Nothing to be done
- })
- client.on('subscribe', function (packet) {
- client.suback({
- messageId: packet.messageId,
- granted: packet.subscriptions.map(function (e) {
- return e.qos
- })
- })
- })
- client.on('unsubscribe', function (packet) {
- packet.granted = packet.unsubscriptions.map(function () { return 0 })
- client.unsuback(packet)
- })
- client.on('pingreq', function () {
- client.pingresp()
- })
- }
- if (fastFlag) {
- return new FastServer(handler)
- } else {
- return new Server(handler)
- }
- }
- server = buildServer().listen(port)
- describe('MqttClient', function () {
- describe('creating', function () {
- it('should allow instantiation of MqttClient without the \'new\' operator', function (done) {
- should(function () {
- var client
- try {
- client = mqtt.MqttClient(function () {
- throw Error('break')
- }, {})
- client.end()
- } catch (err) {
- if (err.message !== 'break') {
- throw err
- }
- done()
- }
- }).not.throw('Object #<Object> has no method \'_setupStream\'')
- })
- })
- var config = { protocol: 'mqtt', port: port }
- abstractClientTests(server, config)
- describe('message ids', function () {
- it('should increment the message id', function () {
- var client = mqtt.connect(config)
- var currentId = client._nextId()
- client._nextId().should.equal(currentId + 1)
- client.end()
- })
- it('should return 1 once the internal counter reached limit', function () {
- var client = mqtt.connect(config)
- client.nextId = 65535
- client._nextId().should.equal(65535)
- client._nextId().should.equal(1)
- client.end()
- })
- it('should return 65535 for last message id once the internal counter reached limit', function () {
- var client = mqtt.connect(config)
- client.nextId = 65535
- client._nextId().should.equal(65535)
- client.getLastMessageId().should.equal(65535)
- client._nextId().should.equal(1)
- client.getLastMessageId().should.equal(1)
- client.end()
- })
- it('should not throw an error if packet\'s messageId is not found when receiving a pubrel packet', function (done) {
- var server2 = new Server(function (c) {
- c.on('connect', function (packet) {
- c.connack({returnCode: 0})
- c.pubrel({ messageId: Math.floor(Math.random() * 9000) + 1000 })
- })
- })
- server2.listen(port + 49, function () {
- var client = mqtt.connect({
- port: port + 49,
- host: 'localhost'
- })
- client.on('packetsend', function (packet) {
- if (packet.cmd === 'pubcomp') {
- client.end()
- server2.close()
- done()
- }
- })
- })
- })
- it('should not go overflow if the TCP frame contains a lot of PUBLISH packets', function (done) {
- var parser = mqttPacket.parser()
- var count = 0
- var max = 1000
- var duplex = new Duplex({
- read: function (n) {},
- write: function (chunk, enc, cb) {
- parser.parse(chunk)
- cb() // nothing to do
- }
- })
- var client = new mqtt.MqttClient(function () {
- return duplex
- }, {})
- client.on('message', function (t, p, packet) {
- if (++count === max) {
- done()
- }
- })
- parser.on('packet', function (packet) {
- var packets = []
- if (packet.cmd === 'connect') {
- duplex.push(mqttPacket.generate({
- cmd: 'connack',
- sessionPresent: false,
- returnCode: 0
- }))
- for (var i = 0; i < max; i++) {
- packets.push(mqttPacket.generate({
- cmd: 'publish',
- topic: Buffer.from('hello'),
- payload: Buffer.from('world'),
- retain: false,
- dup: false,
- messageId: i + 1,
- qos: 1
- }))
- }
- duplex.push(Buffer.concat(packets))
- }
- })
- })
- })
- describe('flushing', function () {
- it('should attempt to complete pending unsub and send on ping timeout', function (done) {
- this.timeout(10000)
- var server3 = connOnlyServer().listen(port + 72)
- var pubCallbackCalled = false
- var unsubscribeCallbackCalled = false
- var client = mqtt.connect({
- port: port + 72,
- host: 'localhost',
- keepalive: 1,
- connectTimeout: 350,
- reconnectPeriod: 0
- })
- client.once('connect', () => {
- client.publish('fakeTopic', 'fakeMessage', {qos: 1}, (err, result) => {
- should.exist(err)
- pubCallbackCalled = true
- })
- client.unsubscribe('fakeTopic', (err, result) => {
- should.exist(err)
- unsubscribeCallbackCalled = true
- })
- setTimeout(() => {
- client.end(() => {
- should.equal(pubCallbackCalled && unsubscribeCallbackCalled, true, 'callbacks not invoked')
- server3.close()
- done()
- })
- }, 5000)
- })
- })
- })
- describe('reconnecting', function () {
- it('should attempt to reconnect once server is down', function (done) {
- this.timeout(15000)
- var innerServer = fork(path.join(__dirname, 'helpers', 'server_process.js'))
- var client = mqtt.connect({ port: 3000, host: 'localhost', keepalive: 1 })
- client.once('connect', function () {
- innerServer.kill('SIGINT') // mocks server shutdown
- client.once('close', function () {
- should.exist(client.reconnectTimer)
- client.end()
- done()
- })
- })
- })
- it('should reconnect to multiple host-ports-protocol combinations if servers is passed', function (done) {
- this.timeout(15000)
- var server = buildServer(true).listen(port + 41)
- var server2 = buildServer(true).listen(port + 42)
- server2.on('listening', function () {
- var client = mqtt.connect({
- protocol: 'wss',
- servers: [
- { port: port + 42, host: 'localhost', protocol: 'ws' },
- { port: port + 41, host: 'localhost' }
- ],
- keepalive: 50
- })
- server2.on('client', function (c) {
- should.equal(client.stream.socket.url, 'ws://localhost:9918/', 'Protocol for first connection should use ws.')
- c.stream.destroy()
- server2.close()
- })
- server.once('client', function () {
- should.equal(client.stream.socket.url, 'wss://localhost:9917/', 'Protocol for second client should use the default protocol: wss, on port: port + 42.')
- client.end()
- done()
- })
- client.once('connect', function () {
- client.stream.destroy()
- })
- })
- })
- it('should reconnect if a connack is not received in an interval', function (done) {
- this.timeout(2000)
- var server2 = net.createServer().listen(port + 43)
- server2.on('connection', function (c) {
- eos(c, function () {
- server2.close()
- })
- })
- server2.on('listening', function () {
- var client = mqtt.connect({
- servers: [
- { port: port + 43, host: 'localhost_fake' },
- { port: port, host: 'localhost' }
- ],
- connectTimeout: 500
- })
- server.once('client', function () {
- client.end()
- done()
- })
- client.once('connect', function () {
- client.stream.destroy()
- })
- })
- })
- it('should not be cleared by the connack timer', function (done) {
- this.timeout(4000)
- var server2 = net.createServer().listen(port + 44)
- server2.on('connection', function (c) {
- c.destroy()
- })
- server2.once('listening', function () {
- var reconnects = 0
- var connectTimeout = 1000
- var reconnectPeriod = 100
- var expectedReconnects = Math.floor(connectTimeout / reconnectPeriod)
- var client = mqtt.connect({
- port: port + 44,
- host: 'localhost',
- connectTimeout: connectTimeout,
- reconnectPeriod: reconnectPeriod
- })
- client.on('reconnect', function () {
- reconnects++
- if (reconnects >= expectedReconnects) {
- client.end()
- done()
- }
- })
- })
- })
- it('should not keep requeueing the first message when offline', function (done) {
- this.timeout(2500)
- var server2 = buildServer().listen(port + 45)
- var client = mqtt.connect({
- port: port + 45,
- host: 'localhost',
- connectTimeout: 350,
- reconnectPeriod: 300
- })
- server2.on('client', function (c) {
- client.publish('hello', 'world', { qos: 1 }, function () {
- c.destroy()
- server2.close()
- client.publish('hello', 'world', { qos: 1 })
- })
- })
- setTimeout(function () {
- if (client.queue.length === 0) {
- client.end(true)
- done()
- } else {
- client.end(true)
- }
- }, 2000)
- })
- it('should not send the same subscribe multiple times on a flaky connection', function (done) {
- this.timeout(3500)
- var KILL_COUNT = 4
- var killedConnections = 0
- var subIds = {}
- var client = mqtt.connect({
- port: port + 46,
- host: 'localhost',
- connectTimeout: 350,
- reconnectPeriod: 300
- })
- var server2 = new Server(function (client) {
- client.on('error', function () {})
- client.on('connect', function (packet) {
- if (packet.clientId === 'invalid') {
- client.connack({returnCode: 2})
- } else {
- client.connack({returnCode: 0})
- }
- })
- }).listen(port + 46)
- server2.on('client', function (c) {
- client.subscribe('topic', function () {
- done()
- client.end()
- c.destroy()
- server2.close()
- })
- c.on('subscribe', function (packet) {
- if (killedConnections < KILL_COUNT) {
- // Kill the first few sub attempts to simulate a flaky connection
- killedConnections++
- c.destroy()
- } else {
- // Keep track of acks
- if (!subIds[packet.messageId]) {
- subIds[packet.messageId] = 0
- }
- subIds[packet.messageId]++
- if (subIds[packet.messageId] > 1) {
- done(new Error('Multiple duplicate acked subscriptions received for messageId ' + packet.messageId))
- client.end(true)
- c.destroy()
- server2.destroy()
- }
- c.suback({
- messageId: packet.messageId,
- granted: packet.subscriptions.map(function (e) {
- return e.qos
- })
- })
- }
- })
- })
- })
- it('should not fill the queue of subscribes if it cannot connect', function (done) {
- this.timeout(2500)
- var port2 = port + 48
- var server2 = net.createServer(function (stream) {
- var client = new Connection(stream)
- client.on('error', function () {})
- client.on('connect', function (packet) {
- client.connack({returnCode: 0})
- client.destroy()
- })
- })
- server2.listen(port2, function () {
- var client = mqtt.connect({
- port: port2,
- host: 'localhost',
- connectTimeout: 350,
- reconnectPeriod: 300
- })
- client.subscribe('hello')
- setTimeout(function () {
- client.queue.length.should.equal(1)
- client.end()
- done()
- }, 1000)
- })
- })
- it('should not send the same publish multiple times on a flaky connection', function (done) {
- this.timeout(3500)
- var KILL_COUNT = 4
- var killedConnections = 0
- var pubIds = {}
- var client = mqtt.connect({
- port: port + 47,
- host: 'localhost',
- connectTimeout: 350,
- reconnectPeriod: 300
- })
- var server2 = net.createServer(function (stream) {
- var client = new Connection(stream)
- client.on('error', function () {})
- client.on('connect', function (packet) {
- if (packet.clientId === 'invalid') {
- client.connack({returnCode: 2})
- } else {
- client.connack({returnCode: 0})
- }
- })
- this.emit('client', client)
- }).listen(port + 47)
- server2.on('client', function (c) {
- client.publish('topic', 'data', { qos: 1 }, function () {
- done()
- client.end()
- c.destroy()
- server2.destroy()
- })
- c.on('publish', function onPublish (packet) {
- if (killedConnections < KILL_COUNT) {
- // Kill the first few pub attempts to simulate a flaky connection
- killedConnections++
- c.destroy()
- // to avoid receiving inflight messages
- c.removeListener('publish', onPublish)
- } else {
- // Keep track of acks
- if (!pubIds[packet.messageId]) {
- pubIds[packet.messageId] = 0
- }
- pubIds[packet.messageId]++
- if (pubIds[packet.messageId] > 1) {
- done(new Error('Multiple duplicate acked publishes received for messageId ' + packet.messageId))
- client.end(true)
- c.destroy()
- server2.destroy()
- }
- c.puback(packet)
- }
- })
- })
- })
- })
- it('check emit error on checkDisconnection w/o callback', function (done) {
- this.timeout(15000)
- var server118 = new Server(function (client) {
- client.on('connect', function (packet) {
- client.connack({
- reasonCode: 0
- })
- })
- client.on('publish', function (packet) {
- setImmediate(function () {
- packet.reasonCode = 0
- client.puback(packet)
- })
- })
- }).listen(port + 118)
- var opts = {
- host: 'localhost',
- port: port + 118,
- protocolVersion: 5
- }
- var client = mqtt.connect(opts)
- client.on('error', function (error) {
- should(error.message).be.equal('client disconnecting')
- server118.close()
- done()
- })
- client.on('connect', function () {
- client.end(function () {
- client._checkDisconnecting()
- })
- server118.close()
- })
- })
- describe('MQTT 5.0', function () {
- var server = buildServer().listen(port + 115)
- var config = { protocol: 'mqtt', port: port + 115, protocolVersion: 5, properties: { maximumPacketSize: 200 } }
- abstractClientTests(server, config)
- it('should has Auth method with Auth data', function (done) {
- this.timeout(5000)
- var opts = {host: 'localhost', port: port + 115, protocolVersion: 5, properties: { authenticationData: Buffer.from([1, 2, 3, 4]) }}
- try {
- mqtt.connect(opts)
- } catch (error) {
- should(error.message).be.equal('Packet has no Authentication Method')
- }
- done()
- })
- it('auth packet', function (done) {
- this.timeout(15000)
- server.once('client', function (client) {
- client.on('auth', function (packet) {
- done()
- })
- })
- var opts = {host: 'localhost', port: port + 115, protocolVersion: 5, properties: { authenticationMethod: 'json' }, authPacket: {}}
- mqtt.connect(opts)
- })
- it('Maximum Packet Size', function (done) {
- this.timeout(15000)
- var opts = {host: 'localhost', port: port + 115, protocolVersion: 5, properties: { maximumPacketSize: 1 }}
- var client = mqtt.connect(opts)
- client.on('error', function (error) {
- should(error.message).be.equal('exceeding packets size connack')
- done()
- })
- })
- describe('Topic Alias', function () {
- it('topicAlias > topicAliasMaximum', function (done) {
- this.timeout(15000)
- var maximum = 15
- var current = 22
- server.once('client', function (client) {
- client.on('publish', function (packet) {
- if (packet.properties && packet.properties.topicAlias) {
- done(new Error('Packet should not have topicAlias'))
- return false
- }
- done()
- })
- })
- var opts = {host: 'localhost', port: port + 115, protocolVersion: 5, properties: { topicAliasMaximum: maximum }}
- var client = mqtt.connect(opts)
- client.publish('t/h', 'Message', { properties: { topicAlias: current } })
- })
- it('topicAlias w/o topicAliasMaximum in settings', function (done) {
- this.timeout(15000)
- server.once('client', function (client) {
- client.on('publish', function (packet) {
- if (packet.properties && packet.properties.topicAlias) {
- done(new Error('Packet should not have topicAlias'))
- return false
- }
- done()
- })
- })
- var opts = {host: 'localhost', port: port + 115, protocolVersion: 5}
- var client = mqtt.connect(opts)
- client.publish('t/h', 'Message', { properties: { topicAlias: 22 } })
- })
- })
- it('Change values of some properties by server response', function (done) {
- this.timeout(15000)
- var server116 = new Server(function (client) {
- client.on('connect', function (packet) {
- client.connack({
- reasonCode: 0,
- properties: {
- topicAliasMaximum: 15,
- serverKeepAlive: 16,
- maximumPacketSize: 95
- }
- })
- })
- }).listen(port + 116)
- var opts = {
- host: 'localhost',
- port: port + 116,
- protocolVersion: 5,
- properties: {
- topicAliasMaximum: 10,
- serverKeepAlive: 11,
- maximumPacketSize: 100
- }
- }
- var client = mqtt.connect(opts)
- client.on('connect', function () {
- should(client.options.keepalive).be.equal(16)
- should(client.options.properties.topicAliasMaximum).be.equal(15)
- should(client.options.properties.maximumPacketSize).be.equal(95)
- server116.close()
- done()
- })
- })
- it('should resubscribe when reconnecting with protocolVersion 5 and Session Present flag is false', function (done) {
- this.timeout(15000)
- var tryReconnect = true
- var reconnectEvent = false
- var server316 = new Server(function (client) {
- client.on('connect', function (packet) {
- client.connack({
- reasonCode: 0,
- sessionPresent: false
- })
- client.on('subscribe', function () {
- if (!tryReconnect) {
- client.end()
- server316.close()
- done()
- }
- })
- })
- }).listen(port + 316)
- var opts = {
- host: 'localhost',
- port: port + 316,
- protocolVersion: 5
- }
- var client = mqtt.connect(opts)
- client.on('reconnect', function () {
- reconnectEvent = true
- })
- client.on('connect', function (connack) {
- should(connack.sessionPresent).be.equal(false)
- if (tryReconnect) {
- client.subscribe('hello', function () {
- client.stream.end()
- })
- tryReconnect = false
- } else {
- reconnectEvent.should.equal(true)
- }
- })
- })
- it('should resubscribe when reconnecting with protocolVersion 5 and properties', function (done) {
- this.timeout(15000)
- var tryReconnect = true
- var reconnectEvent = false
- var server326 = new Server(function (client) {
- client.on('connect', function (packet) {
- client.on('subscribe', function (packet) {
- if (!reconnectEvent) {
- client.suback({
- messageId: packet.messageId,
- granted: packet.subscriptions.map(function (e) {
- return e.qos
- })
- })
- } else {
- if (!tryReconnect) {
- should(packet.properties.userProperties.test).be.equal('test')
- client.end()
- server326.close()
- done()
- }
- }
- })
- client.connack({
- reasonCode: 0,
- sessionPresent: false
- })
- })
- }).listen(port + 326)
- var opts = {
- host: 'localhost',
- port: port + 326,
- protocolVersion: 5
- }
- var client = mqtt.connect(opts)
- client.on('reconnect', function () {
- reconnectEvent = true
- })
- client.on('connect', function (connack) {
- should(connack.sessionPresent).be.equal(false)
- if (tryReconnect) {
- client.subscribe('hello', { properties: { userProperties: { test: 'test' } } }, function () {
- client.stream.end()
- })
- tryReconnect = false
- } else {
- reconnectEvent.should.equal(true)
- }
- })
- })
- var serverErr = new Server(function (client) {
- client.on('connect', function (packet) {
- client.connack({
- reasonCode: 0
- })
- })
- client.on('publish', function (packet) {
- setImmediate(function () {
- switch (packet.qos) {
- case 0:
- break
- case 1:
- packet.reasonCode = 142
- delete packet.cmd
- client.puback(packet)
- break
- case 2:
- packet.reasonCode = 142
- delete packet.cmd
- client.pubrec(packet)
- break
- }
- })
- })
- client.on('pubrel', function (packet) {
- packet.reasonCode = 142
- delete packet.cmd
- client.pubcomp(packet)
- })
- })
- it('Subscribe properties', function (done) {
- this.timeout(15000)
- var opts = {
- host: 'localhost',
- port: port + 119,
- protocolVersion: 5
- }
- var subOptions = { properties: { subscriptionIdentifier: 1234 } }
- var server119 = new Server(function (client) {
- client.on('connect', function (packet) {
- client.connack({
- reasonCode: 0
- })
- })
- client.on('subscribe', function (packet) {
- should(packet.properties.subscriptionIdentifier).be.equal(subOptions.properties.subscriptionIdentifier)
- server119.close()
- done()
- })
- }).listen(port + 119)
- var client = mqtt.connect(opts)
- client.on('connect', function () {
- client.subscribe('a/b', subOptions)
- })
- })
- it('puback handling errors check', function (done) {
- this.timeout(15000)
- serverErr.listen(port + 117)
- var opts = {
- host: 'localhost',
- port: port + 117,
- protocolVersion: 5
- }
- var client = mqtt.connect(opts)
- client.once('connect', () => {
- client.publish('a/b', 'message', {qos: 1}, function (err, packet) {
- should(err.message).be.equal('Publish error: Session taken over')
- should(err.code).be.equal(142)
- })
- serverErr.close()
- done()
- })
- })
- it('pubrec handling errors check', function (done) {
- this.timeout(15000)
- serverErr.listen(port + 118)
- var opts = {
- host: 'localhost',
- port: port + 118,
- protocolVersion: 5
- }
- var client = mqtt.connect(opts)
- client.once('connect', () => {
- client.publish('a/b', 'message', {qos: 2}, function (err, packet) {
- should(err.message).be.equal('Publish error: Session taken over')
- should(err.code).be.equal(142)
- })
- serverErr.close()
- done()
- })
- })
- it('puback handling custom reason code', function (done) {
- this.timeout(15000)
- serverErr.listen(port + 117)
- var opts = {
- host: 'localhost',
- port: port + 117,
- protocolVersion: 5,
- customHandleAcks: function (topic, message, packet, cb) {
- var code = 0
- if (topic === 'a/b') {
- code = 128
- }
- cb(code)
- }
- }
- serverErr.once('client', function (c) {
- c.once('subscribe', function () {
- c.publish({ topic: 'a/b', payload: 'payload', qos: 1, messageId: 1 })
- })
- c.on('puback', function (packet) {
- should(packet.reasonCode).be.equal(128)
- client.end()
- c.destroy()
- serverErr.close()
- done()
- })
- })
- var client = mqtt.connect(opts)
- client.once('connect', function () {
- client.subscribe('a/b', {qos: 1})
- })
- })
- it('server side disconnect', function (done) {
- this.timeout(15000)
- var server327 = new Server(function (client) {
- client.on('connect', function (packet) {
- client.connack({
- reasonCode: 0
- })
- client.disconnect({reasonCode: 128})
- server327.close()
- })
- })
- server327.listen(port + 327)
- var opts = {
- host: 'localhost',
- port: port + 327,
- protocolVersion: 5
- }
- var client = mqtt.connect(opts)
- client.once('disconnect', function (disconnectPacket) {
- should(disconnectPacket.reasonCode).be.equal(128)
- done()
- })
- })
- it('pubrec handling custom reason code', function (done) {
- this.timeout(15000)
- serverErr.listen(port + 117)
- var opts = {
- host: 'localhost',
- port: port + 117,
- protocolVersion: 5,
- customHandleAcks: function (topic, message, packet, cb) {
- var code = 0
- if (topic === 'a/b') {
- code = 128
- }
- cb(code)
- }
- }
- serverErr.once('client', function (c) {
- c.once('subscribe', function () {
- c.publish({ topic: 'a/b', payload: 'payload', qos: 2, messageId: 1 })
- })
- c.on('pubrec', function (packet) {
- should(packet.reasonCode).be.equal(128)
- client.end()
- c.destroy()
- serverErr.close()
- done()
- })
- })
- var client = mqtt.connect(opts)
- client.once('connect', function () {
- client.subscribe('a/b', {qos: 1})
- })
- })
- it('puback handling custom reason code with error', function (done) {
- this.timeout(15000)
- serverErr.listen(port + 117)
- var opts = {
- host: 'localhost',
- port: port + 117,
- protocolVersion: 5,
- customHandleAcks: function (topic, message, packet, cb) {
- var code = 0
- if (topic === 'a/b') {
- cb(new Error('a/b is not valid'))
- }
- cb(code)
- }
- }
- serverErr.once('client', function (c) {
- c.once('subscribe', function () {
- c.publish({ topic: 'a/b', payload: 'payload', qos: 1, messageId: 1 })
- })
- })
- var client = mqtt.connect(opts)
- client.on('error', function (error) {
- should(error.message).be.equal('a/b is not valid')
- client.end()
- serverErr.close()
- done()
- })
- client.once('connect', function () {
- client.subscribe('a/b', {qos: 1})
- })
- })
- it('pubrec handling custom reason code with error', function (done) {
- this.timeout(15000)
- serverErr.listen(port + 117)
- var opts = {
- host: 'localhost',
- port: port + 117,
- protocolVersion: 5,
- customHandleAcks: function (topic, message, packet, cb) {
- var code = 0
- if (topic === 'a/b') {
- cb(new Error('a/b is not valid'))
- }
- cb(code)
- }
- }
- serverErr.once('client', function (c) {
- c.once('subscribe', function () {
- c.publish({ topic: 'a/b', payload: 'payload', qos: 2, messageId: 1 })
- })
- })
- var client = mqtt.connect(opts)
- client.on('error', function (error) {
- should(error.message).be.equal('a/b is not valid')
- client.end()
- serverErr.close()
- done()
- })
- client.once('connect', function () {
- client.subscribe('a/b', {qos: 1})
- })
- })
- it('puback handling custom invalid reason code', function (done) {
- this.timeout(15000)
- serverErr.listen(port + 117)
- var opts = {
- host: 'localhost',
- port: port + 117,
- protocolVersion: 5,
- customHandleAcks: function (topic, message, packet, cb) {
- var code = 0
- if (topic === 'a/b') {
- code = 124124
- }
- cb(code)
- }
- }
- serverErr.once('client', function (c) {
- c.once('subscribe', function () {
- c.publish({ topic: 'a/b', payload: 'payload', qos: 1, messageId: 1 })
- })
- })
- var client = mqtt.connect(opts)
- client.on('error', function (error) {
- should(error.message).be.equal('Wrong reason code for puback')
- client.end()
- serverErr.close()
- done()
- })
- client.once('connect', function () {
- client.subscribe('a/b', {qos: 1})
- })
- })
- it('pubrec handling custom invalid reason code', function (done) {
- this.timeout(15000)
- serverErr.listen(port + 117)
- var opts = {
- host: 'localhost',
- port: port + 117,
- protocolVersion: 5,
- customHandleAcks: function (topic, message, packet, cb) {
- var code = 0
- if (topic === 'a/b') {
- code = 34535
- }
- cb(code)
- }
- }
- serverErr.once('client', function (c) {
- c.once('subscribe', function () {
- c.publish({ topic: 'a/b', payload: 'payload', qos: 2, messageId: 1 })
- })
- })
- var client = mqtt.connect(opts)
- client.on('error', function (error) {
- should(error.message).be.equal('Wrong reason code for pubrec')
- client.end()
- serverErr.close()
- done()
- })
- client.once('connect', function () {
- client.subscribe('a/b', {qos: 1})
- })
- })
- })
- })
|