123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084 |
- 'use strict'
- /**
- * Testing dependencies
- */
- var should = require('should')
- var sinon = require('sinon')
- var mqtt = require('../')
- var xtend = require('xtend')
- var Server = require('./server')
- var Store = require('./../lib/store')
- var port = 9876
- module.exports = function (server, config) {
- var version = config.protocolVersion || 4
- function connect (opts) {
- opts = xtend(config, opts)
- return mqtt.connect(opts)
- }
- describe('closing', function () {
- it('should emit close if stream closes', function (done) {
- var client = connect()
- client.once('connect', function () {
- client.stream.end()
- })
- client.once('close', function () {
- client.end()
- done()
- })
- })
- it('should mark the client as disconnected', function (done) {
- var client = connect()
- client.once('close', function () {
- client.end()
- if (!client.connected) {
- done()
- } else {
- done(new Error('Not marked as disconnected'))
- }
- })
- client.once('connect', function () {
- client.stream.end()
- })
- })
- it('should stop ping timer if stream closes', function (done) {
- var client = connect()
- client.once('close', function () {
- should.not.exist(client.pingTimer)
- client.end()
- done()
- })
- client.once('connect', function () {
- should.exist(client.pingTimer)
- client.stream.end()
- })
- })
- it('should emit close after end called', function (done) {
- var client = connect()
- client.once('close', function () {
- done()
- })
- client.once('connect', function () {
- client.end()
- })
- })
- it('should emit end after end called and client must be disconnected', function (done) {
- var client = connect()
- client.once('end', function () {
- if (client.disconnected) {
- return done()
- }
- done(new Error('client must be disconnected'))
- })
- client.once('connect', function () {
- client.end()
- })
- })
- it('should pass store close error to end callback but not to end listeners', function (done) {
- var store = new Store()
- var client = connect({outgoingStore: store})
- store.close = function (cb) {
- cb(new Error('test'))
- }
- client.once('end', function () {
- if (arguments.length === 0) {
- return done()
- }
- throw new Error('no argument shoould be passed to event')
- })
- client.once('connect', function () {
- client.end(function (test) {
- if (test && test.message === 'test') {
- return
- }
- throw new Error('bad argument passed to callback')
- })
- })
- })
- it('should return `this` if end called twice', function (done) {
- var client = connect()
- client.once('connect', function () {
- client.end()
- var value = client.end()
- if (value === client) {
- done()
- } else {
- done(new Error('Not returning client.'))
- }
- })
- })
- it('should emit end only on first client end', function (done) {
- var client = connect()
- client.once('end', function () {
- var timeout = setTimeout(done.bind(null), 200)
- client.once('end', function () {
- clearTimeout(timeout)
- done(new Error('end was emitted twice'))
- })
- client.end()
- })
- client.once('connect', client.end.bind(client))
- })
- it('should stop ping timer after end called', function (done) {
- var client = connect()
- client.once('connect', function () {
- should.exist(client.pingTimer)
- client.end()
- should.not.exist(client.pingTimer)
- done()
- })
- })
- it('should be able to end even on a failed connection', function (done) {
- var client = connect({host: 'this_hostname_should_not_exist'})
- var timeout = setTimeout(function () {
- done(new Error('Failed to end a disconnected client'))
- }, 500)
- setTimeout(function () {
- client.end(function () {
- clearTimeout(timeout)
- done()
- })
- }, 200)
- })
- it('should emit end even on a failed connection', function (done) {
- var client = connect({host: 'this_hostname_should_not_exist'})
- var timeout = setTimeout(function () {
- done(new Error('Disconnected client has failed to emit end'))
- }, 500)
- client.once('end', function () {
- clearTimeout(timeout)
- done()
- })
- setTimeout(client.end.bind(client), 200)
- })
- it('should emit end only once for a reconnecting client', function (done) {
- var client = connect({host: 'this_hostname_should_not_exist', connectTimeout: 10, reconnectPeriod: 10})
- client.once('end', function () {
- var timeout = setTimeout(done.bind(null))
- client.once('end', function () {
- clearTimeout(timeout)
- done(new Error('end emitted twice'))
- })
- })
- setTimeout(client.end.bind(client), 300)
- })
- })
- describe('connecting', function () {
- it('should connect to the broker', function (done) {
- var client = connect()
- client.on('error', done)
- server.once('client', function () {
- client.end()
- done()
- })
- })
- it('should send a default client id', function (done) {
- var client = connect()
- client.on('error', done)
- server.once('client', function (serverClient) {
- serverClient.once('connect', function (packet) {
- packet.clientId.should.match(/mqttjs.*/)
- serverClient.disconnect()
- done()
- })
- })
- })
- it('should send be clean by default', function (done) {
- var client = connect()
- client.on('error', done)
- server.once('client', function (serverClient) {
- serverClient.once('connect', function (packet) {
- packet.clean.should.be.true()
- serverClient.disconnect()
- done()
- })
- })
- })
- it('should connect with the given client id', function (done) {
- var client = connect({clientId: 'testclient'})
- client.on('error', function (err) {
- throw err
- })
- server.once('client', function (serverClient) {
- serverClient.once('connect', function (packet) {
- packet.clientId.should.match(/testclient/)
- serverClient.disconnect()
- done()
- })
- })
- })
- it('should connect with the client id and unclean state', function (done) {
- var client = connect({clientId: 'testclient', clean: false})
- client.on('error', function (err) {
- throw err
- })
- server.once('client', function (serverClient) {
- serverClient.once('connect', function (packet) {
- packet.clientId.should.match(/testclient/)
- packet.clean.should.be.false()
- serverClient.disconnect()
- done()
- })
- })
- })
- it('should require a clientId with clean=false', function (done) {
- try {
- var client = connect({ clean: false })
- client.on('error', function (err) {
- done(err)
- // done(new Error('should have thrown'));
- })
- } catch (err) {
- done()
- }
- })
- it('should default to localhost', function (done) {
- var client = connect({clientId: 'testclient'})
- client.on('error', function (err) {
- throw err
- })
- server.once('client', function (serverClient) {
- serverClient.once('connect', function (packet) {
- packet.clientId.should.match(/testclient/)
- serverClient.disconnect()
- done()
- })
- })
- })
- it('should emit connect', function (done) {
- var client = connect()
- client.once('connect', function () {
- client.end()
- done()
- })
- client.once('error', done)
- })
- it('should provide connack packet with connect event', function (done) {
- var connack = version === 5 ? {reasonCode: 0} : {returnCode: 0}
- server.once('client', function (serverClient) {
- connack.sessionPresent = true
- serverClient.connack(connack)
- server.once('client', function (serverClient) {
- connack.sessionPresent = false
- serverClient.connack(connack)
- })
- })
- var client = connect()
- client.once('connect', function (packet) {
- should(packet.sessionPresent).be.equal(true)
- client.once('connect', function (packet) {
- should(packet.sessionPresent).be.equal(false)
- client.end()
- done()
- })
- })
- })
- it('should mark the client as connected', function (done) {
- var client = connect()
- client.once('connect', function () {
- client.end()
- if (client.connected) {
- done()
- } else {
- done(new Error('Not marked as connected'))
- }
- })
- })
- it('should emit error', function (done) {
- var client = connect({clientId: 'invalid'})
- client.once('connect', function () {
- done(new Error('Should not emit connect'))
- })
- client.once('error', function (error) {
- var value = version === 5 ? 128 : 2
- should(error.code).be.equal(value) // code for clientID identifer rejected
- client.end()
- done()
- })
- })
- it('should have different client ids', function (done) {
- var client1 = connect()
- var client2 = connect()
- client1.options.clientId.should.not.equal(client2.options.clientId)
- client1.end(true)
- client2.end(true)
- setImmediate(done)
- })
- })
- describe('handling offline states', function () {
- it('should emit offline events once when the client transitions from connected states to disconnected ones', function (done) {
- var client = connect({reconnectPeriod: 20})
- client.on('connect', function () {
- this.stream.end()
- })
- client.on('offline', function () {
- client.end(true, done)
- })
- })
- it('should emit offline events once when the client (at first) can NOT connect to servers', function (done) {
- // fake a port
- var client = connect({ reconnectPeriod: 20, port: 4557 })
- client.on('offline', function () {
- client.end(true, done)
- })
- })
- })
- describe('topic validations when subscribing', function () {
- it('should be ok for well-formated topics', function (done) {
- var client = connect()
- client.subscribe(
- [
- '+', '+/event', 'event/+', '#', 'event/#', 'system/event/+',
- 'system/+/event', 'system/registry/event/#', 'system/+/event/#',
- 'system/registry/event/new_device', 'system/+/+/new_device'
- ],
- function (err) {
- client.end(function () {
- if (err) {
- return done(new Error(err))
- }
- done()
- })
- }
- )
- })
- it('should return an error (via callbacks) for topic #/event', function (done) {
- var client = connect()
- client.subscribe(['#/event', 'event#', 'event+'], function (err) {
- client.end(false, function () {
- if (err) {
- return done()
- }
- done(new Error('Validations do NOT work'))
- })
- })
- })
- it('should return an empty array for duplicate subs', function (done) {
- var client = connect()
- client.subscribe('event', function (err, granted1) {
- if (err) {
- return done(err)
- }
- client.subscribe('event', function (err, granted2) {
- if (err) {
- return done(err)
- }
- granted2.should.Array()
- granted2.should.be.empty()
- done()
- })
- })
- })
- it('should return an error (via callbacks) for topic #/event', function (done) {
- var client = connect()
- client.subscribe('#/event', function (err) {
- client.end(function () {
- if (err) {
- return done()
- }
- done(new Error('Validations do NOT work'))
- })
- })
- })
- it('should return an error (via callbacks) for topic event#', function (done) {
- var client = connect()
- client.subscribe('event#', function (err) {
- client.end(function () {
- if (err) {
- return done()
- }
- done(new Error('Validations do NOT work'))
- })
- })
- })
- it('should return an error (via callbacks) for topic system/#/event', function (done) {
- var client = connect()
- client.subscribe('system/#/event', function (err) {
- client.end(function () {
- if (err) {
- return done()
- }
- done(new Error('Validations do NOT work'))
- })
- })
- })
- it('should return an error (via callbacks) for empty topic list', function (done) {
- var client = connect()
- client.subscribe([], function (err) {
- client.end()
- if (err) {
- return done()
- }
- done(new Error('Validations do NOT work'))
- })
- })
- it('should return an error (via callbacks) for topic system/+/#/event', function (done) {
- var client = connect()
- client.subscribe('system/+/#/event', function (err) {
- client.end(true, function () {
- if (err) {
- return done()
- }
- done(new Error('Validations do NOT work'))
- })
- })
- })
- })
- describe('offline messages', function () {
- it('should queue message until connected', function (done) {
- var client = connect()
- client.publish('test', 'test')
- client.subscribe('test')
- client.unsubscribe('test')
- client.queue.length.should.equal(3)
- client.once('connect', function () {
- client.queue.length.should.equal(0)
- setTimeout(function () {
- client.end(true, done)
- }, 10)
- })
- })
- it('should not queue qos 0 messages if queueQoSZero is false', function (done) {
- var client = connect({queueQoSZero: false})
- client.publish('test', 'test', {qos: 0})
- client.queue.length.should.equal(0)
- client.on('connect', function () {
- setTimeout(function () {
- client.end(true, done)
- }, 10)
- })
- })
- it('should queue qos != 0 messages', function (done) {
- var client = connect({queueQoSZero: false})
- client.publish('test', 'test', {qos: 1})
- client.publish('test', 'test', {qos: 2})
- client.subscribe('test')
- client.unsubscribe('test')
- client.queue.length.should.equal(2)
- client.on('connect', function () {
- setTimeout(function () {
- client.end(true, done)
- }, 10)
- })
- })
- it('should not interrupt messages', function (done) {
- var client = null
- var incomingStore = new mqtt.Store({ clean: false })
- var outgoingStore = new mqtt.Store({ clean: false })
- var publishCount = 0
- var server2 = new Server(function (c) {
- c.on('connect', function () {
- c.connack({returnCode: 0})
- })
- c.on('publish', function (packet) {
- if (packet.qos !== 0) {
- c.puback({messageId: packet.messageId})
- }
- switch (publishCount++) {
- case 0:
- packet.payload.toString().should.equal('payload1')
- break
- case 1:
- packet.payload.toString().should.equal('payload2')
- break
- case 2:
- packet.payload.toString().should.equal('payload3')
- break
- case 3:
- packet.payload.toString().should.equal('payload4')
- server2.close()
- done()
- break
- }
- })
- })
- server2.listen(port + 50, function () {
- client = mqtt.connect({
- port: port + 50,
- host: 'localhost',
- clean: false,
- clientId: 'cid1',
- reconnectPeriod: 0,
- incomingStore: incomingStore,
- outgoingStore: outgoingStore,
- queueQoSZero: true
- })
- client.on('packetreceive', function (packet) {
- if (packet.cmd === 'connack') {
- setImmediate(
- function () {
- client.publish('test', 'payload3', {qos: 1})
- client.publish('test', 'payload4', {qos: 0})
- }
- )
- }
- })
- client.publish('test', 'payload1', {qos: 2})
- client.publish('test', 'payload2', {qos: 2})
- })
- })
- it('should call cb if an outgoing QoS 0 message is not sent', function (done) {
- var client = connect({queueQoSZero: false})
- var called = false
- client.publish('test', 'test', {qos: 0}, function () {
- called = true
- })
- client.on('connect', function () {
- called.should.equal(true)
- setTimeout(function () {
- client.end(true, done)
- }, 10)
- })
- })
- it('should delay ending up until all inflight messages are delivered', function (done) {
- var client = connect()
- var subscribeCalled = false
- client.on('connect', function () {
- client.subscribe('test', function () {
- subscribeCalled = true
- })
- client.publish('test', 'test', function () {
- client.end(false, function () {
- subscribeCalled.should.be.equal(true)
- done()
- })
- })
- })
- })
- it('wait QoS 1 publish messages', function (done) {
- var client = connect()
- var messageReceived = false
- client.on('connect', function () {
- client.subscribe('test')
- client.publish('test', 'test', { qos: 1 }, function () {
- client.end(false, function () {
- messageReceived.should.equal(true)
- done()
- })
- })
- client.on('message', function () {
- messageReceived = true
- })
- })
- server.once('client', function (serverClient) {
- serverClient.on('subscribe', function () {
- serverClient.on('publish', function (packet) {
- serverClient.publish(packet)
- })
- })
- })
- })
- it('does not wait acks when force-closing', function (done) {
- // non-running broker
- var client = connect('mqtt://localhost:8993')
- client.publish('test', 'test', { qos: 1 })
- client.end(true, done)
- })
- it('should call cb if store.put fails', function (done) {
- const store = new Store()
- store.put = function (packet, cb) {
- process.nextTick(cb, new Error('oops there is an error'))
- }
- var client = connect({ incomingStore: store, outgoingStore: store })
- client.publish('test', 'test', { qos: 2 }, function (err) {
- if (err) {
- client.end(true, done)
- }
- })
- })
- })
- describe('publishing', function () {
- it('should publish a message (offline)', function (done) {
- var client = connect()
- var payload = 'test'
- var topic = 'test'
- client.publish(topic, payload)
- server.on('client', onClient)
- function onClient (serverClient) {
- serverClient.once('connect', function () {
- server.removeListener('client', onClient)
- })
- serverClient.once('publish', function (packet) {
- packet.topic.should.equal(topic)
- packet.payload.toString().should.equal(payload)
- packet.qos.should.equal(0)
- packet.retain.should.equal(false)
- client.end(true, done)
- })
- }
- })
- it('should publish a message (online)', function (done) {
- var client = connect()
- var payload = 'test'
- var topic = 'test'
- client.on('connect', function () {
- client.publish(topic, payload)
- })
- server.once('client', function (serverClient) {
- serverClient.once('publish', function (packet) {
- packet.topic.should.equal(topic)
- packet.payload.toString().should.equal(payload)
- packet.qos.should.equal(0)
- packet.retain.should.equal(false)
- client.end()
- done()
- })
- })
- })
- it('should publish a message (retain, offline)', function (done) {
- var client = connect({ queueQoSZero: true })
- var payload = 'test'
- var topic = 'test'
- var called = false
- client.publish(topic, payload, { retain: true }, function () {
- called = true
- })
- server.once('client', function (serverClient) {
- serverClient.once('publish', function (packet) {
- packet.topic.should.equal(topic)
- packet.payload.toString().should.equal(payload)
- packet.qos.should.equal(0)
- packet.retain.should.equal(true)
- called.should.equal(true)
- client.end()
- done()
- })
- })
- })
- it('should emit a packetsend event', function (done) {
- var client = connect()
- var payload = 'test_payload'
- var testTopic = 'testTopic'
- client.on('packetsend', function (packet) {
- if (packet.cmd === 'publish') {
- packet.qos.should.equal(0)
- packet.topic.should.equal(testTopic)
- packet.payload.should.equal(payload)
- packet.retain.should.equal(false)
- client.end()
- done()
- }
- })
- client.publish(testTopic, payload)
- })
- it('should accept options', function (done) {
- var client = connect()
- var payload = 'test'
- var topic = 'test'
- var opts = {
- retain: true,
- qos: 1
- }
- client.once('connect', function () {
- client.publish(topic, payload, opts)
- })
- server.once('client', function (serverClient) {
- serverClient.once('publish', function (packet) {
- packet.topic.should.equal(topic)
- packet.payload.toString().should.equal(payload)
- packet.qos.should.equal(opts.qos, 'incorrect qos')
- packet.retain.should.equal(opts.retain, 'incorrect ret')
- packet.dup.should.equal(false, 'incorrect dup')
- client.end()
- done()
- })
- })
- })
- it('should publish with the default options for an empty parameter', function (done) {
- var client = connect()
- var payload = 'test'
- var topic = 'test'
- var defaultOpts = {qos: 0, retain: false, dup: false}
- client.once('connect', function () {
- client.publish(topic, payload, {})
- })
- server.once('client', function (serverClient) {
- serverClient.once('publish', function (packet) {
- packet.topic.should.equal(topic)
- packet.payload.toString().should.equal(payload)
- packet.qos.should.equal(defaultOpts.qos, 'incorrect qos')
- packet.retain.should.equal(defaultOpts.retain, 'incorrect ret')
- packet.dup.should.equal(defaultOpts.dup, 'incorrect dup')
- client.end()
- done()
- })
- })
- })
- it('should mark a message as duplicate when "dup" option is set', function (done) {
- var client = connect()
- var payload = 'duplicated-test'
- var topic = 'test'
- var opts = {
- retain: true,
- qos: 1,
- dup: true
- }
- client.once('connect', function () {
- client.publish(topic, payload, opts)
- })
- server.once('client', function (serverClient) {
- serverClient.once('publish', function (packet) {
- packet.topic.should.equal(topic)
- packet.payload.toString().should.equal(payload)
- packet.dup.should.equal(opts.dup, 'incorrect dup')
- client.end()
- done()
- })
- })
- })
- it('should fire a callback (qos 0)', function (done) {
- var client = connect()
- client.once('connect', function () {
- client.publish('a', 'b', function () {
- client.end()
- done()
- })
- })
- })
- it('should fire a callback (qos 1)', function (done) {
- var client = connect()
- var opts = { qos: 1 }
- client.once('connect', function () {
- client.publish('a', 'b', opts, function () {
- client.end()
- done()
- })
- })
- })
- it('should fire a callback (qos 2)', function (done) {
- var client = connect()
- var opts = { qos: 2 }
- client.once('connect', function () {
- client.publish('a', 'b', opts, function () {
- client.end()
- done()
- })
- })
- })
- it('should support UTF-8 characters in topic', function (done) {
- var client = connect()
- client.once('connect', function () {
- client.publish('中国', 'hello', function () {
- client.end()
- done()
- })
- })
- })
- it('should support UTF-8 characters in payload', function (done) {
- var client = connect()
- client.once('connect', function () {
- client.publish('hello', '中国', function () {
- client.end()
- done()
- })
- })
- })
- it('should publish 10 QoS 2 and receive them', function (done) {
- var client = connect()
- var count = 0
- client.on('connect', function () {
- client.subscribe('test')
- client.publish('test', 'test', { qos: 2 })
- })
- client.on('message', function () {
- if (count >= 10) {
- client.end()
- done()
- } else {
- client.publish('test', 'test', { qos: 2 })
- }
- })
- server.once('client', function (serverClient) {
- serverClient.on('offline', function () {
- client.end()
- done('error went offline... didnt see this happen')
- })
- serverClient.on('subscribe', function () {
- serverClient.on('publish', function (packet) {
- serverClient.publish(packet)
- })
- })
- serverClient.on('pubrel', function () {
- count++
- })
- })
- })
- function testQosHandleMessage (qos, done) {
- var client = connect()
- var messageEventCount = 0
- var handleMessageCount = 0
- client.handleMessage = function (packet, callback) {
- setTimeout(function () {
- handleMessageCount++
- // next message event should not emit until handleMessage completes
- handleMessageCount.should.equal(messageEventCount)
- if (handleMessageCount === 10) {
- setTimeout(function () {
- client.end()
- done()
- })
- }
- callback()
- }, 100)
- }
- client.on('message', function (topic, message, packet) {
- messageEventCount++
- })
- client.on('connect', function () {
- client.subscribe('test')
- })
- server.once('client', function (serverClient) {
- serverClient.on('offline', function () {
- client.end()
- done('error went offline... didnt see this happen')
- })
- serverClient.on('subscribe', function () {
- for (var i = 0; i < 10; i++) {
- serverClient.publish({
- messageId: i,
- topic: 'test',
- payload: 'test' + i,
- qos: qos
- })
- }
- })
- })
- }
- it('should publish 10 QoS 0 and receive them only when `handleMessage` finishes', function (done) {
- testQosHandleMessage(0, done)
- })
- it('should publish 10 QoS 1 and receive them only when `handleMessage` finishes', function (done) {
- testQosHandleMessage(1, done)
- })
- it('should publish 10 QoS 2 and receive them only when `handleMessage` finishes', function (done) {
- testQosHandleMessage(2, done)
- })
- it('should not send a `puback` if the execution of `handleMessage` fails for messages with QoS `1`', function (done) {
- var client = connect()
- client.handleMessage = function (packet, callback) {
- callback(new Error('Error thrown by the application'))
- }
- client._sendPacket = sinon.spy()
- client._handlePublish({
- messageId: Math.floor(65535 * Math.random()),
- topic: 'test',
- payload: 'test',
- qos: 1
- }, function (err) {
- should.exist(err)
- })
- client._sendPacket.callCount.should.equal(0)
- client.end()
- client.on('connect', function () { done() })
- })
- it('should silently ignore errors thrown by `handleMessage` and return when no callback is passed ' +
- 'into `handlePublish` method', function (done) {
- var client = connect()
- client.handleMessage = function (packet, callback) {
- callback(new Error('Error thrown by the application'))
- }
- try {
- client._handlePublish({
- messageId: Math.floor(65535 * Math.random()),
- topic: 'test',
- payload: 'test',
- qos: 1
- })
- done()
- } catch (err) {
- done(err)
- } finally {
- client.end()
- }
- })
- it('should handle error with async incoming store in QoS 2 `handlePublish` method', function (done) {
- function AsyncStore () {
- if (!(this instanceof AsyncStore)) {
- return new AsyncStore()
- }
- }
- AsyncStore.prototype.put = function (packet, cb) {
- process.nextTick(function () {
- cb(new Error('Error'))
- })
- }
- var store = new AsyncStore()
- var client = connect({incomingStore: store})
- client._handlePublish({
- messageId: 1,
- topic: 'test',
- payload: 'test',
- qos: 2
- }, function () {
- done()
- client.end()
- })
- })
- it('should handle error with async incoming store in QoS 2 `handlePubrel` method', function (done) {
- function AsyncStore () {
- if (!(this instanceof AsyncStore)) {
- return new AsyncStore()
- }
- }
- AsyncStore.prototype.del = function (packet, cb) {
- process.nextTick(function () {
- cb(new Error('Error'))
- })
- }
- AsyncStore.prototype.get = function (packet, cb) {
- process.nextTick(function () {
- cb(null, {cmd: 'publish'})
- })
- }
- var store = new AsyncStore()
- var client = connect({incomingStore: store})
- client._handlePubrel({
- messageId: 1,
- qos: 2
- }, function () {
- done()
- client.end()
- })
- })
- it('should handle success with async incoming store in QoS 2 `handlePubrel` method', function (done) {
- var delComplete = false
- function AsyncStore () {
- if (!(this instanceof AsyncStore)) {
- return new AsyncStore()
- }
- }
- AsyncStore.prototype.del = function (packet, cb) {
- process.nextTick(function () {
- delComplete = true
- cb(null)
- })
- }
- AsyncStore.prototype.get = function (packet, cb) {
- process.nextTick(function () {
- cb(null, {cmd: 'publish'})
- })
- }
- var store = new AsyncStore()
- var client = connect({incomingStore: store})
- client._handlePubrel({
- messageId: 1,
- qos: 2
- }, function () {
- delComplete.should.equal(true)
- done()
- client.end()
- })
- })
- it('should handle error with async incoming store in QoS 1 `handlePublish` method', function (done) {
- function AsyncStore () {
- if (!(this instanceof AsyncStore)) {
- return new AsyncStore()
- }
- }
- AsyncStore.prototype.put = function (packet, cb) {
- process.nextTick(function () {
- cb(null, 'Error')
- })
- }
- var store = new AsyncStore()
- var client = connect({incomingStore: store})
- client._handlePublish({
- messageId: 1,
- topic: 'test',
- payload: 'test',
- qos: 1
- }, function () {
- done()
- client.end()
- })
- })
- it('should not send a `pubcomp` if the execution of `handleMessage` fails for messages with QoS `2`', function (done) {
- var store = new Store()
- var client = connect({incomingStore: store})
- var messageId = Math.floor(65535 * Math.random())
- var topic = 'test'
- var payload = 'test'
- var qos = 2
- client.handleMessage = function (packet, callback) {
- callback(new Error('Error thrown by the application'))
- }
- client.once('connect', function () {
- client.subscribe(topic, {qos: 2})
- store.put({
- messageId: messageId,
- topic: topic,
- payload: payload,
- qos: qos,
- cmd: 'publish'
- }, function () {
- // cleans up the client
- client.end()
- client._sendPacket = sinon.spy()
- client._handlePubrel({cmd: 'pubrel', messageId: messageId}, function (err) {
- should.exist(err)
- })
- client._sendPacket.callCount.should.equal(0)
- done()
- })
- })
- })
- it('should silently ignore errors thrown by `handleMessage` and return when no callback is passed ' +
- 'into `handlePubrel` method', function (done) {
- var store = new Store()
- var client = connect({incomingStore: store})
- var messageId = Math.floor(65535 * Math.random())
- var topic = 'test'
- var payload = 'test'
- var qos = 2
- client.handleMessage = function (packet, callback) {
- callback(new Error('Error thrown by the application'))
- }
- client.once('connect', function () {
- client.subscribe(topic, {qos: 2})
- store.put({
- messageId: messageId,
- topic: topic,
- payload: payload,
- qos: qos,
- cmd: 'publish'
- }, function () {
- try {
- client._handlePubrel({cmd: 'pubrel', messageId: messageId})
- done()
- } catch (err) {
- done(err)
- } finally {
- client.end()
- }
- })
- })
- })
- it('should keep message order', function (done) {
- var publishCount = 0
- var reconnect = false
- var client = {}
- var incomingStore = new mqtt.Store({ clean: false })
- var outgoingStore = new mqtt.Store({ clean: false })
- var server2 = new Server(function (c) {
- // errors are not interesting for this test
- // but they might happen on some platforms
- c.on('error', function () {})
- c.on('connect', function (packet) {
- c.connack({returnCode: 0})
- })
- c.on('publish', function (packet) {
- c.puback({messageId: packet.messageId})
- if (reconnect) {
- switch (publishCount++) {
- case 0:
- packet.payload.toString().should.equal('payload1')
- break
- case 1:
- packet.payload.toString().should.equal('payload2')
- break
- case 2:
- packet.payload.toString().should.equal('payload3')
- server2.close()
- done()
- break
- }
- }
- })
- })
- server2.listen(port + 50, function () {
- client = mqtt.connect({
- port: port + 50,
- host: 'localhost',
- clean: false,
- clientId: 'cid1',
- reconnectPeriod: 0,
- incomingStore: incomingStore,
- outgoingStore: outgoingStore
- })
- client.on('connect', function () {
- if (!reconnect) {
- client.publish('topic', 'payload1', {qos: 1})
- client.publish('topic', 'payload2', {qos: 1})
- client.end(true)
- } else {
- client.publish('topic', 'payload3', {qos: 1})
- }
- })
- client.on('close', function () {
- if (!reconnect) {
- client.reconnect({
- clean: false,
- incomingStore: incomingStore,
- outgoingStore: outgoingStore
- })
- reconnect = true
- }
- })
- })
- })
- function testCallbackStorePutByQoS (qos, clean, expected, done) {
- var client = connect({
- clean: clean,
- clientId: 'testId'
- })
- var callbacks = []
- function cbStorePut () {
- callbacks.push('storeput')
- }
- client.on('connect', function () {
- client.publish('test', 'test', {qos: qos, cbStorePut: cbStorePut}, function (err) {
- if (err) done(err)
- callbacks.push('publish')
- should.deepEqual(callbacks, expected)
- done()
- })
- client.end()
- })
- }
- it('should not call cbStorePut when publishing message with QoS `0` and clean `true`', function (done) {
- testCallbackStorePutByQoS(0, true, ['publish'], done)
- })
- it('should not call cbStorePut when publishing message with QoS `0` and clean `false`', function (done) {
- testCallbackStorePutByQoS(0, false, ['publish'], done)
- })
- it('should call cbStorePut before publish completes when publishing message with QoS `1` and clean `true`', function (done) {
- testCallbackStorePutByQoS(1, true, ['storeput', 'publish'], done)
- })
- it('should call cbStorePut before publish completes when publishing message with QoS `1` and clean `false`', function (done) {
- testCallbackStorePutByQoS(1, false, ['storeput', 'publish'], done)
- })
- it('should call cbStorePut before publish completes when publishing message with QoS `2` and clean `true`', function (done) {
- testCallbackStorePutByQoS(2, true, ['storeput', 'publish'], done)
- })
- it('should call cbStorePut before publish completes when publishing message with QoS `2` and clean `false`', function (done) {
- testCallbackStorePutByQoS(2, false, ['storeput', 'publish'], done)
- })
- })
- describe('unsubscribing', function () {
- it('should send an unsubscribe packet (offline)', function (done) {
- var client = connect()
- client.unsubscribe('test')
- server.once('client', function (serverClient) {
- serverClient.once('unsubscribe', function (packet) {
- packet.unsubscriptions.should.containEql('test')
- client.end()
- done()
- })
- })
- })
- it('should send an unsubscribe packet', function (done) {
- var client = connect()
- var topic = 'topic'
- client.once('connect', function () {
- client.unsubscribe(topic)
- })
- server.once('client', function (serverClient) {
- serverClient.once('unsubscribe', function (packet) {
- packet.unsubscriptions.should.containEql(topic)
- client.end()
- done()
- })
- })
- })
- it('should emit a packetsend event', function (done) {
- var client = connect()
- var testTopic = 'testTopic'
- client.once('connect', function () {
- client.subscribe(testTopic)
- })
- client.on('packetsend', function (packet) {
- if (packet.cmd === 'subscribe') {
- client.end()
- done()
- }
- })
- })
- it('should emit a packetreceive event', function (done) {
- var client = connect()
- var testTopic = 'testTopic'
- client.once('connect', function () {
- client.subscribe(testTopic)
- })
- client.on('packetreceive', function (packet) {
- if (packet.cmd === 'suback') {
- client.end()
- done()
- }
- })
- })
- it('should accept an array of unsubs', function (done) {
- var client = connect()
- var topics = ['topic1', 'topic2']
- client.once('connect', function () {
- client.unsubscribe(topics)
- })
- server.once('client', function (serverClient) {
- serverClient.once('unsubscribe', function (packet) {
- packet.unsubscriptions.should.eql(topics)
- done()
- })
- })
- })
- it('should fire a callback on unsuback', function (done) {
- var client = connect()
- var topic = 'topic'
- client.once('connect', function () {
- client.unsubscribe(topic, done)
- })
- server.once('client', function (serverClient) {
- serverClient.once('unsubscribe', function (packet) {
- serverClient.unsuback(packet)
- client.end()
- })
- })
- })
- it('should unsubscribe from a chinese topic', function (done) {
- var client = connect()
- var topic = '中国'
- client.once('connect', function () {
- client.unsubscribe(topic)
- })
- server.once('client', function (serverClient) {
- serverClient.once('unsubscribe', function (packet) {
- packet.unsubscriptions.should.containEql(topic)
- client.end()
- done()
- })
- })
- })
- })
- describe('keepalive', function () {
- var clock
- beforeEach(function () {
- clock = sinon.useFakeTimers()
- })
- afterEach(function () {
- clock.restore()
- })
- it('should checkPing at keepalive interval', function (done) {
- var interval = 3
- var client = connect({ keepalive: interval })
- client._checkPing = sinon.spy()
- client.once('connect', function () {
- clock.tick(interval * 1000)
- client._checkPing.callCount.should.equal(1)
- clock.tick(interval * 1000)
- client._checkPing.callCount.should.equal(2)
- clock.tick(interval * 1000)
- client._checkPing.callCount.should.equal(3)
- client.end()
- done()
- })
- })
- it('should not checkPing if publishing at a higher rate than keepalive', function (done) {
- var intervalMs = 3000
- var client = connect({keepalive: intervalMs / 1000})
- client._checkPing = sinon.spy()
- client.once('connect', function () {
- client.publish('foo', 'bar')
- clock.tick(intervalMs - 1)
- client.publish('foo', 'bar')
- clock.tick(2)
- client._checkPing.callCount.should.equal(0)
- client.end()
- done()
- })
- })
- it('should checkPing if publishing at a higher rate than keepalive and reschedulePings===false', function (done) {
- var intervalMs = 3000
- var client = connect({
- keepalive: intervalMs / 1000,
- reschedulePings: false
- })
- client._checkPing = sinon.spy()
- client.once('connect', function () {
- client.publish('foo', 'bar')
- clock.tick(intervalMs - 1)
- client.publish('foo', 'bar')
- clock.tick(2)
- client._checkPing.callCount.should.equal(1)
- client.end()
- done()
- })
- })
- })
- describe('pinging', function () {
- it('should set a ping timer', function (done) {
- var client = connect({keepalive: 3})
- client.once('connect', function () {
- should.exist(client.pingTimer)
- client.end()
- done()
- })
- })
- it('should not set a ping timer keepalive=0', function (done) {
- var client = connect({keepalive: 0})
- client.on('connect', function () {
- should.not.exist(client.pingTimer)
- client.end()
- done()
- })
- })
- it('should reconnect if pingresp is not sent', function (done) {
- var client = connect({keepalive: 1, reconnectPeriod: 100})
- // Fake no pingresp being send by stubbing the _handlePingresp function
- client._handlePingresp = function () {}
- client.once('connect', function () {
- client.once('connect', function () {
- client.end()
- done()
- })
- })
- })
- it('should not reconnect if pingresp is successful', function (done) {
- var client = connect({keepalive: 100})
- client.once('close', function () {
- done(new Error('Client closed connection'))
- })
- setTimeout(done, 1000)
- })
- it('should defer the next ping when sending a control packet', function (done) {
- var client = connect({keepalive: 1})
- client.once('connect', function () {
- client._checkPing = sinon.spy()
- client.publish('foo', 'bar')
- setTimeout(function () {
- client._checkPing.callCount.should.equal(0)
- client.publish('foo', 'bar')
- setTimeout(function () {
- client._checkPing.callCount.should.equal(0)
- client.publish('foo', 'bar')
- setTimeout(function () {
- client._checkPing.callCount.should.equal(0)
- done()
- }, 75)
- }, 75)
- }, 75)
- })
- })
- })
- describe('subscribing', function () {
- it('should send a subscribe message (offline)', function (done) {
- var client = connect()
- client.subscribe('test')
- server.once('client', function (serverClient) {
- serverClient.once('subscribe', function () {
- done()
- })
- })
- })
- it('should send a subscribe message', function (done) {
- var client = connect()
- var topic = 'test'
- client.once('connect', function () {
- client.subscribe(topic)
- })
- server.once('client', function (serverClient) {
- serverClient.once('subscribe', function (packet) {
- var result = {
- topic: topic,
- qos: 0
- }
- if (version === 5) {
- result.nl = false
- result.rap = false
- result.rh = 0
- }
- packet.subscriptions.should.containEql(result)
- done()
- })
- })
- })
- it('should emit a packetsend event', function (done) {
- var client = connect()
- var testTopic = 'testTopic'
- client.once('connect', function () {
- client.subscribe(testTopic)
- })
- client.on('packetsend', function (packet) {
- if (packet.cmd === 'subscribe') {
- done()
- }
- })
- })
- it('should emit a packetreceive event', function (done) {
- var client = connect()
- var testTopic = 'testTopic'
- client.once('connect', function () {
- client.subscribe(testTopic)
- })
- client.on('packetreceive', function (packet) {
- if (packet.cmd === 'suback') {
- done()
- }
- })
- })
- it('should accept an array of subscriptions', function (done) {
- var client = connect()
- var subs = ['test1', 'test2']
- client.once('connect', function () {
- client.subscribe(subs)
- })
- server.once('client', function (serverClient) {
- serverClient.once('subscribe', function (packet) {
- // i.e. [{topic: 'a', qos: 0}, {topic: 'b', qos: 0}]
- var expected = subs.map(function (i) {
- var result = {topic: i, qos: 0}
- if (version === 5) {
- result.nl = false
- result.rap = false
- result.rh = 0
- }
- return result
- })
- packet.subscriptions.should.eql(expected)
- done()
- })
- })
- })
- it('should accept an hash of subscriptions', function (done) {
- var client = connect()
- var topics = {
- test1: {qos: 0},
- test2: {qos: 1}
- }
- client.once('connect', function () {
- client.subscribe(topics)
- })
- server.once('client', function (serverClient) {
- serverClient.once('subscribe', function (packet) {
- var k
- var expected = []
- for (k in topics) {
- if (topics.hasOwnProperty(k)) {
- var result = {
- topic: k,
- qos: topics[k].qos
- }
- if (version === 5) {
- result.nl = false
- result.rap = false
- result.rh = 0
- }
- expected.push(result)
- }
- }
- packet.subscriptions.should.eql(expected)
- done()
- })
- })
- })
- it('should accept an options parameter', function (done) {
- var client = connect()
- var topic = 'test'
- var opts = {qos: 1}
- client.once('connect', function () {
- client.subscribe(topic, opts)
- })
- server.once('client', function (serverClient) {
- serverClient.once('subscribe', function (packet) {
- var expected = [{
- topic: topic,
- qos: 1
- }]
- if (version === 5) {
- expected[0].nl = false
- expected[0].rap = false
- expected[0].rh = 0
- }
- packet.subscriptions.should.eql(expected)
- done()
- })
- })
- })
- it('should subscribe with the default options for an empty options parameter', function (done) {
- var client = connect()
- var topic = 'test'
- var defaultOpts = {qos: 0}
- client.once('connect', function () {
- client.subscribe(topic, {})
- })
- server.once('client', function (serverClient) {
- serverClient.once('subscribe', function (packet) {
- var result = {
- topic: topic,
- qos: defaultOpts.qos
- }
- if (version === 5) {
- result.nl = false
- result.rap = false
- result.rh = 0
- }
- packet.subscriptions.should.containEql(result)
- done()
- })
- })
- })
- it('should fire a callback on suback', function (done) {
- var client = connect()
- var topic = 'test'
- client.once('connect', function () {
- client.subscribe(topic, { qos: 2 }, function (err, granted) {
- if (err) {
- done(err)
- } else {
- should.exist(granted, 'granted not given')
- var result = {topic: 'test', qos: 2}
- if (version === 5) {
- result.nl = false
- result.rap = false
- result.rh = 0
- result.properties = undefined
- }
- granted.should.containEql(result)
- done()
- }
- })
- })
- })
- it('should fire a callback with error if disconnected (options provided)', function (done) {
- var client = connect()
- var topic = 'test'
- client.once('connect', function () {
- client.end(true, function () {
- client.subscribe(topic, {qos: 2}, function (err, granted) {
- should.not.exist(granted, 'granted given')
- should.exist(err, 'no error given')
- done()
- })
- })
- })
- })
- it('should fire a callback with error if disconnected (options not provided)', function (done) {
- var client = connect()
- var topic = 'test'
- client.once('connect', function () {
- client.end(true, function () {
- client.subscribe(topic, function (err, granted) {
- should.not.exist(granted, 'granted given')
- should.exist(err, 'no error given')
- done()
- })
- })
- })
- })
- it('should subscribe with a chinese topic', function (done) {
- var client = connect()
- var topic = '中国'
- client.once('connect', function () {
- client.subscribe(topic)
- })
- server.once('client', function (serverClient) {
- serverClient.once('subscribe', function (packet) {
- var result = {
- topic: topic,
- qos: 0
- }
- if (version === 5) {
- result.nl = false
- result.rap = false
- result.rh = 0
- }
- packet.subscriptions.should.containEql(result)
- done()
- })
- })
- })
- })
- describe('receiving messages', function () {
- it('should fire the message event', function (done) {
- var client = connect()
- var testPacket = {
- topic: 'test',
- payload: 'message',
- retain: true,
- qos: 1,
- messageId: 5
- }
- client.subscribe(testPacket.topic)
- client.once('message', function (topic, message, packet) {
- topic.should.equal(testPacket.topic)
- message.toString().should.equal(testPacket.payload)
- packet.should.equal(packet)
- client.end()
- done()
- })
- server.once('client', function (serverClient) {
- serverClient.on('subscribe', function () {
- serverClient.publish(testPacket)
- })
- })
- })
- it('should emit a packetreceive event', function (done) {
- var client = connect()
- var testPacket = {
- topic: 'test',
- payload: 'message',
- retain: true,
- qos: 1,
- messageId: 5
- }
- client.subscribe(testPacket.topic)
- client.on('packetreceive', function (packet) {
- if (packet.cmd === 'publish') {
- packet.qos.should.equal(1)
- packet.topic.should.equal(testPacket.topic)
- packet.payload.toString().should.equal(testPacket.payload)
- packet.retain.should.equal(true)
- client.end()
- done()
- }
- })
- server.once('client', function (serverClient) {
- serverClient.on('subscribe', function () {
- serverClient.publish(testPacket)
- })
- })
- })
- it('should support binary data', function (done) {
- var client = connect({ encoding: 'binary' })
- var testPacket = {
- topic: 'test',
- payload: 'message',
- retain: true,
- qos: 1,
- messageId: 5
- }
- client.subscribe(testPacket.topic)
- client.once('message', function (topic, message, packet) {
- topic.should.equal(testPacket.topic)
- message.should.be.an.instanceOf(Buffer)
- message.toString().should.equal(testPacket.payload)
- packet.should.equal(packet)
- done()
- })
- server.once('client', function (serverClient) {
- serverClient.on('subscribe', function () {
- serverClient.publish(testPacket)
- })
- })
- })
- it('should emit a message event (qos=2)', function (done) {
- var client = connect()
- var testPacket = {
- topic: 'test',
- payload: 'message',
- retain: true,
- qos: 2,
- messageId: 5
- }
- server.testPublish = testPacket
- client.subscribe(testPacket.topic)
- client.once('message', function (topic, message, packet) {
- topic.should.equal(testPacket.topic)
- message.toString().should.equal(testPacket.payload)
- packet.should.equal(packet)
- done()
- })
- server.once('client', function (serverClient) {
- serverClient.on('subscribe', function () {
- serverClient.publish(testPacket)
- })
- })
- })
- it('should emit a message event (qos=2) - repeated publish', function (done) {
- var client = connect()
- var testPacket = {
- topic: 'test',
- payload: 'message',
- retain: true,
- qos: 2,
- messageId: 5
- }
- server.testPublish = testPacket
- client.subscribe(testPacket.topic)
- client.on('message', function (topic, message, packet) {
- topic.should.equal(testPacket.topic)
- message.toString().should.equal(testPacket.payload)
- packet.should.equal(packet)
- done()
- })
- server.once('client', function (serverClient) {
- serverClient.on('subscribe', function () {
- serverClient.publish(testPacket)
- // twice, should be ignored
- serverClient.publish(testPacket)
- })
- })
- })
- it('should support chinese topic', function (done) {
- var client = connect({ encoding: 'binary' })
- var testPacket = {
- topic: '国',
- payload: 'message',
- retain: true,
- qos: 1,
- messageId: 5
- }
- client.subscribe(testPacket.topic)
- client.once('message', function (topic, message, packet) {
- topic.should.equal(testPacket.topic)
- message.should.be.an.instanceOf(Buffer)
- message.toString().should.equal(testPacket.payload)
- packet.should.equal(packet)
- done()
- })
- server.once('client', function (serverClient) {
- serverClient.on('subscribe', function () {
- serverClient.publish(testPacket)
- })
- })
- })
- })
- describe('qos handling', function () {
- it('should follow qos 0 semantics (trivial)', function (done) {
- var client = connect()
- var testTopic = 'test'
- var testMessage = 'message'
- client.once('connect', function () {
- client.subscribe(testTopic, {qos: 0})
- })
- server.once('client', function (serverClient) {
- serverClient.once('subscribe', function () {
- serverClient.publish({
- topic: testTopic,
- payload: testMessage,
- qos: 0,
- retain: false
- })
- done()
- })
- })
- })
- it('should follow qos 1 semantics', function (done) {
- var client = connect()
- var testTopic = 'test'
- var testMessage = 'message'
- var mid = 50
- client.once('connect', function () {
- client.subscribe(testTopic, {qos: 1})
- })
- server.once('client', function (serverClient) {
- serverClient.once('subscribe', function () {
- serverClient.publish({
- topic: testTopic,
- payload: testMessage,
- messageId: mid,
- qos: 1
- })
- })
- serverClient.once('puback', function (packet) {
- packet.messageId.should.equal(mid)
- done()
- })
- })
- })
- it('should follow qos 2 semantics', function (done) {
- var client = connect()
- var testTopic = 'test'
- var testMessage = 'message'
- var mid = 253
- var publishReceived = false
- var pubrecReceived = false
- var pubrelReceived = false
- client.once('connect', function () {
- client.subscribe(testTopic, {qos: 2})
- })
- client.on('packetreceive', (packet) => {
- switch (packet.cmd) {
- case 'connack':
- case 'suback':
- // expected, but not specifically part of QOS 2 semantics
- break
- case 'publish':
- pubrecReceived.should.be.false()
- pubrelReceived.should.be.false()
- publishReceived = true
- break
- case 'pubrel':
- publishReceived.should.be.true()
- pubrecReceived.should.be.true()
- pubrelReceived = true
- break
- default:
- should.fail()
- }
- })
- server.once('client', function (serverClient) {
- serverClient.once('subscribe', function () {
- serverClient.publish({
- topic: testTopic,
- payload: testMessage,
- qos: 2,
- messageId: mid
- })
- })
- serverClient.on('pubrec', function () {
- publishReceived.should.be.true()
- pubrelReceived.should.be.false()
- pubrecReceived = true
- })
- serverClient.once('pubcomp', function () {
- client.removeAllListeners()
- serverClient.removeAllListeners()
- publishReceived.should.be.true()
- pubrecReceived.should.be.true()
- pubrelReceived.should.be.true()
- done()
- })
- })
- })
- it('should should empty the incoming store after a qos 2 handshake is completed', function (done) {
- var client = connect()
- var testTopic = 'test'
- var testMessage = 'message'
- var mid = 253
- client.once('connect', function () {
- client.subscribe(testTopic, {qos: 2})
- })
- client.on('packetreceive', (packet) => {
- if (packet.cmd === 'pubrel') {
- should(client.incomingStore._inflights.size).be.equal(1)
- }
- })
- server.once('client', function (serverClient) {
- serverClient.once('subscribe', function () {
- serverClient.publish({
- topic: testTopic,
- payload: testMessage,
- qos: 2,
- messageId: mid
- })
- })
- serverClient.once('pubcomp', function () {
- should(client.incomingStore._inflights.size).be.equal(0)
- client.removeAllListeners()
- done()
- })
- })
- })
- function testMultiplePubrel (shouldSendPubcompFail, done) {
- var client = connect()
- var testTopic = 'test'
- var testMessage = 'message'
- var mid = 253
- var pubcompCount = 0
- var pubrelCount = 0
- var handleMessageCount = 0
- var emitMessageCount = 0
- var origSendPacket = client._sendPacket
- var shouldSendFail
- client.handleMessage = function (packet, callback) {
- handleMessageCount++
- callback()
- }
- client.on('message', function () {
- emitMessageCount++
- })
- client._sendPacket = function (packet, sendDone) {
- shouldSendFail = packet.cmd === 'pubcomp' && shouldSendPubcompFail
- if (sendDone) {
- sendDone(shouldSendFail ? new Error('testing pubcomp failure') : undefined)
- }
- // send the mocked response
- switch (packet.cmd) {
- case 'subscribe':
- const suback = {cmd: 'suback', messageId: packet.messageId, granted: [2]}
- client._handlePacket(suback, function (err) {
- should(err).not.be.ok()
- })
- break
- case 'pubrec':
- case 'pubcomp':
- // for both pubrec and pubcomp, reply with pubrel, simulating the server not receiving the pubcomp
- if (packet.cmd === 'pubcomp') {
- pubcompCount++
- if (pubcompCount === 2) {
- // end the test once the client has gone through two rounds of replying to pubrel messages
- pubrelCount.should.be.exactly(2)
- handleMessageCount.should.be.exactly(1)
- emitMessageCount.should.be.exactly(1)
- client._sendPacket = origSendPacket
- done()
- break
- }
- }
- // simulate the pubrel message, either in response to pubrec or to mock pubcomp failing to be received
- const pubrel = {cmd: 'pubrel', messageId: mid}
- pubrelCount++
- client._handlePacket(pubrel, function (err) {
- if (shouldSendFail) {
- should(err).be.ok()
- } else {
- should(err).not.be.ok()
- }
- })
- break
- }
- }
- client.once('connect', function () {
- client.subscribe(testTopic, {qos: 2})
- const publish = {cmd: 'publish', topic: testTopic, payload: testMessage, qos: 2, messageId: mid}
- client._handlePacket(publish, function (err) {
- should(err).not.be.ok()
- })
- })
- }
- it('handle qos 2 messages exactly once when multiple pubrel received', function (done) {
- testMultiplePubrel(false, done)
- })
- it('handle qos 2 messages exactly once when multiple pubrel received and sending pubcomp fails on client', function (done) {
- testMultiplePubrel(true, done)
- })
- })
- describe('auto reconnect', function () {
- it('should mark the client disconnecting if #end called', function () {
- var client = connect()
- client.end()
- client.disconnecting.should.eql(true)
- })
- it('should reconnect after stream disconnect', function (done) {
- var client = connect()
- var tryReconnect = true
- client.on('connect', function () {
- if (tryReconnect) {
- client.stream.end()
- tryReconnect = false
- } else {
- client.end()
- done()
- }
- })
- })
- it('should emit \'reconnect\' when reconnecting', function (done) {
- var client = connect()
- var tryReconnect = true
- var reconnectEvent = false
- client.on('reconnect', function () {
- reconnectEvent = true
- })
- client.on('connect', function () {
- if (tryReconnect) {
- client.stream.end()
- tryReconnect = false
- } else {
- reconnectEvent.should.equal(true)
- client.end()
- done()
- }
- })
- })
- it('should emit \'offline\' after going offline', function (done) {
- var client = connect()
- var tryReconnect = true
- var offlineEvent = false
- client.on('offline', function () {
- offlineEvent = true
- })
- client.on('connect', function () {
- if (tryReconnect) {
- client.stream.end()
- tryReconnect = false
- } else {
- offlineEvent.should.equal(true)
- client.end()
- done()
- }
- })
- })
- it('should not reconnect if it was ended by the user', function (done) {
- var client = connect()
- client.on('connect', function () {
- client.end()
- done() // it will raise an exception if called two times
- })
- })
- it('should setup a reconnect timer on disconnect', function (done) {
- var client = connect()
- client.once('connect', function () {
- should.not.exist(client.reconnectTimer)
- client.stream.end()
- })
- client.once('close', function () {
- should.exist(client.reconnectTimer)
- client.end()
- done()
- })
- })
- it('should allow specification of a reconnect period', function (done) {
- var end
- var period = 200
- var client = connect({reconnectPeriod: period})
- var reconnect = false
- var start = Date.now()
- client.on('connect', function () {
- if (!reconnect) {
- client.stream.end()
- reconnect = true
- } else {
- client.end()
- end = Date.now()
- if (end - start >= period) {
- // Connected in about 2 seconds, that's good enough
- done()
- } else {
- done(new Error('Strange reconnect period'))
- }
- }
- })
- })
- it('should always cleanup successfully on reconnection', function (done) {
- var client = connect({host: 'this_hostname_should_not_exist', connectTimeout: 0, reconnectPeriod: 1})
- setTimeout(client.end.bind(client, done), 50)
- })
- it('should resend in-flight QoS 1 publish messages from the client', function (done) {
- var client = connect({reconnectPeriod: 200})
- var serverPublished = false
- var clientCalledBack = false
- server.once('client', function (serverClient) {
- serverClient.on('connect', function () {
- setImmediate(function () {
- serverClient.stream.destroy()
- })
- })
- server.once('client', function (serverClientNew) {
- serverClientNew.on('publish', function () {
- serverPublished = true
- check()
- })
- })
- })
- client.publish('hello', 'world', { qos: 1 }, function () {
- clientCalledBack = true
- check()
- })
- function check () {
- if (serverPublished && clientCalledBack) {
- client.end()
- done()
- }
- }
- })
- it('should not resend in-flight publish messages if disconnecting', function (done) {
- var client = connect({reconnectPeriod: 200})
- var serverPublished = false
- var clientCalledBack = false
- server.once('client', function (serverClient) {
- serverClient.on('connect', function () {
- setImmediate(function () {
- serverClient.stream.destroy()
- client.end()
- serverPublished.should.be.false()
- clientCalledBack.should.be.false()
- done()
- })
- })
- server.once('client', function (serverClientNew) {
- serverClientNew.on('publish', function () {
- serverPublished = true
- })
- })
- })
- client.publish('hello', 'world', { qos: 1 }, function () {
- clientCalledBack = true
- })
- })
- it('should resend in-flight QoS 2 publish messages from the client', function (done) {
- var client = connect({reconnectPeriod: 200})
- var serverPublished = false
- var clientCalledBack = false
- server.once('client', function (serverClient) {
- // ignore errors
- serverClient.on('error', function () {})
- serverClient.on('publish', function () {
- setImmediate(function () {
- serverClient.stream.destroy()
- })
- })
- server.once('client', function (serverClientNew) {
- serverClientNew.on('pubrel', function () {
- serverPublished = true
- check()
- })
- })
- })
- client.publish('hello', 'world', { qos: 2 }, function () {
- clientCalledBack = true
- check()
- })
- function check () {
- if (serverPublished && clientCalledBack) {
- client.end()
- done()
- }
- }
- })
- it('should not resend in-flight QoS 1 removed publish messages from the client', function (done) {
- var client = connect({reconnectPeriod: 200})
- var clientCalledBack = false
- server.once('client', function (serverClient) {
- serverClient.on('connect', function () {
- setImmediate(function () {
- serverClient.stream.destroy()
- })
- })
- server.once('client', function (serverClientNew) {
- serverClientNew.on('publish', function () {
- should.fail()
- done()
- })
- })
- })
- client.publish('hello', 'world', { qos: 1 }, function (err) {
- clientCalledBack = true
- should(err.message).be.equal('Message removed')
- })
- should(Object.keys(client.outgoing).length).be.equal(1)
- should(client.outgoingStore._inflights.size).be.equal(1)
- client.removeOutgoingMessage(client.getLastMessageId())
- should(Object.keys(client.outgoing).length).be.equal(0)
- should(client.outgoingStore._inflights.size).be.equal(0)
- clientCalledBack.should.be.true()
- client.end()
- done()
- })
- it('should not resend in-flight QoS 2 removed publish messages from the client', function (done) {
- var client = connect({reconnectPeriod: 200})
- var clientCalledBack = false
- server.once('client', function (serverClient) {
- serverClient.on('connect', function () {
- setImmediate(function () {
- serverClient.stream.destroy()
- })
- })
- server.once('client', function (serverClientNew) {
- serverClientNew.on('publish', function () {
- should.fail()
- done()
- })
- })
- })
- client.publish('hello', 'world', { qos: 2 }, function (err) {
- clientCalledBack = true
- should(err.message).be.equal('Message removed')
- })
- should(Object.keys(client.outgoing).length).be.equal(1)
- should(client.outgoingStore._inflights.size).be.equal(1)
- client.removeOutgoingMessage(client.getLastMessageId())
- should(Object.keys(client.outgoing).length).be.equal(0)
- should(client.outgoingStore._inflights.size).be.equal(0)
- clientCalledBack.should.be.true()
- client.end()
- done()
- })
- it('should resubscribe when reconnecting', function (done) {
- var client = connect({ reconnectPeriod: 100 })
- var tryReconnect = true
- var reconnectEvent = false
- client.on('reconnect', function () {
- reconnectEvent = true
- })
- client.on('connect', function () {
- if (tryReconnect) {
- client.subscribe('hello', function () {
- client.stream.end()
- server.once('client', function (serverClient) {
- serverClient.on('subscribe', function () {
- client.end()
- done()
- })
- })
- })
- tryReconnect = false
- } else {
- reconnectEvent.should.equal(true)
- }
- })
- })
- it('should not resubscribe when reconnecting if resubscribe is disabled', function (done) {
- var client = connect({ reconnectPeriod: 100, resubscribe: false })
- var tryReconnect = true
- var reconnectEvent = false
- client.on('reconnect', function () {
- reconnectEvent = true
- })
- client.on('connect', function () {
- if (tryReconnect) {
- client.subscribe('hello', function () {
- client.stream.end()
- server.once('client', function (serverClient) {
- serverClient.on('subscribe', function () {
- should.fail()
- })
- })
- })
- tryReconnect = false
- } else {
- reconnectEvent.should.equal(true)
- should(Object.keys(client._resubscribeTopics).length).be.equal(0)
- done()
- }
- })
- })
- it('should not resubscribe when reconnecting if suback is error', function (done) {
- var tryReconnect = true
- var reconnectEvent = false
- var server2 = new Server(function (c) {
- c.on('connect', function (packet) {
- c.connack({returnCode: 0})
- })
- c.on('subscribe', function (packet) {
- c.suback({
- messageId: packet.messageId,
- granted: packet.subscriptions.map(function (e) {
- return e.qos | 0x80
- })
- })
- c.pubrel({ messageId: Math.floor(Math.random() * 9000) + 1000 })
- })
- })
- server2.listen(port + 49, function () {
- var client = mqtt.connect({
- port: port + 49,
- host: 'localhost',
- reconnectPeriod: 100
- })
- client.on('reconnect', function () {
- reconnectEvent = true
- })
- client.on('connect', function () {
- if (tryReconnect) {
- client.subscribe('hello', function () {
- client.stream.end()
- server.once('client', function (serverClient) {
- serverClient.on('subscribe', function () {
- should.fail()
- })
- })
- })
- tryReconnect = false
- } else {
- reconnectEvent.should.equal(true)
- should(Object.keys(client._resubscribeTopics).length).be.equal(0)
- server2.close()
- done()
- }
- })
- })
- })
- it('should preserved incomingStore after disconnecting if clean is false', function (done) {
- var reconnect = false
- var client = {}
- var incomingStore = new mqtt.Store({ clean: false })
- var outgoingStore = new mqtt.Store({ clean: false })
- var server2 = new Server(function (c) {
- c.on('connect', function (packet) {
- c.connack({returnCode: 0})
- if (reconnect) {
- c.pubrel({ messageId: 1 })
- }
- })
- c.on('subscribe', function (packet) {
- c.suback({
- messageId: packet.messageId,
- granted: packet.subscriptions.map(function (e) {
- return e.qos
- })
- })
- c.publish({ topic: 'topic', payload: 'payload', qos: 2, messageId: 1, retain: false })
- })
- c.on('pubrec', function (packet) {
- client.end(false, function () {
- client.reconnect({
- incomingStore: incomingStore,
- outgoingStore: outgoingStore
- })
- })
- })
- c.on('pubcomp', function (packet) {
- client.end()
- server2.close()
- done()
- })
- })
- server2.listen(port + 50, function () {
- client = mqtt.connect({
- port: port + 50,
- host: 'localhost',
- clean: false,
- clientId: 'cid1',
- reconnectPeriod: 0,
- incomingStore: incomingStore,
- outgoingStore: outgoingStore
- })
- client.on('connect', function () {
- if (!reconnect) {
- client.subscribe('test', {qos: 2}, function () {
- })
- reconnect = true
- }
- })
- client.on('message', function (topic, message) {
- topic.should.equal('topic')
- message.toString().should.equal('payload')
- })
- })
- })
- it('should clear outgoing if close from server', function (done) {
- var reconnect = false
- var client = {}
- var server2 = new Server(function (c) {
- c.on('connect', function (packet) {
- c.connack({returnCode: 0})
- })
- c.on('subscribe', function (packet) {
- if (reconnect) {
- c.suback({
- messageId: packet.messageId,
- granted: packet.subscriptions.map(function (e) {
- return e.qos
- })
- })
- } else {
- c.destroy()
- }
- })
- })
- server2.listen(port + 50, function () {
- client = mqtt.connect({
- port: port + 50,
- host: 'localhost',
- clean: true,
- clientId: 'cid1',
- reconnectPeriod: 0
- })
- client.on('connect', function () {
- client.subscribe('test', {qos: 2}, function (e) {
- if (!e) {
- client.end()
- }
- })
- })
- client.on('close', function () {
- if (reconnect) {
- server2.close()
- done()
- } else {
- Object.keys(client.outgoing).length.should.equal(0)
- reconnect = true
- client.reconnect()
- }
- })
- })
- })
- it('should resend in-flight QoS 1 publish messages from the client if clean is false', function (done) {
- var reconnect = false
- var client = {}
- var incomingStore = new mqtt.Store({ clean: false })
- var outgoingStore = new mqtt.Store({ clean: false })
- var server2 = new Server(function (c) {
- c.on('connect', function (packet) {
- c.connack({returnCode: 0})
- })
- c.on('publish', function (packet) {
- if (reconnect) {
- server2.close()
- done()
- } else {
- client.end(true, function () {
- client.reconnect({
- incomingStore: incomingStore,
- outgoingStore: outgoingStore
- })
- reconnect = true
- })
- }
- })
- })
- server2.listen(port + 50, function () {
- client = mqtt.connect({
- port: port + 50,
- host: 'localhost',
- clean: false,
- clientId: 'cid1',
- reconnectPeriod: 0,
- incomingStore: incomingStore,
- outgoingStore: outgoingStore
- })
- client.on('connect', function () {
- if (!reconnect) {
- client.publish('topic', 'payload', {qos: 1})
- }
- })
- client.on('error', function () {})
- })
- })
- it('should resend in-flight QoS 2 publish messages from the client if clean is false', function (done) {
- var reconnect = false
- var client = {}
- var incomingStore = new mqtt.Store({ clean: false })
- var outgoingStore = new mqtt.Store({ clean: false })
- var server2 = new Server(function (c) {
- c.on('connect', function (packet) {
- c.connack({returnCode: 0})
- })
- c.on('publish', function (packet) {
- if (reconnect) {
- server2.close()
- done()
- } else {
- client.end(true, function () {
- client.reconnect({
- incomingStore: incomingStore,
- outgoingStore: outgoingStore
- })
- reconnect = true
- })
- }
- })
- })
- server2.listen(port + 50, function () {
- client = mqtt.connect({
- port: port + 50,
- host: 'localhost',
- clean: false,
- clientId: 'cid1',
- reconnectPeriod: 0,
- incomingStore: incomingStore,
- outgoingStore: outgoingStore
- })
- client.on('connect', function () {
- if (!reconnect) {
- client.publish('topic', 'payload', {qos: 2})
- }
- })
- client.on('error', function () {})
- })
- })
- it('should resend in-flight QoS 2 pubrel messages from the client if clean is false', function (done) {
- var reconnect = false
- var client = {}
- var incomingStore = new mqtt.Store({ clean: false })
- var outgoingStore = new mqtt.Store({ clean: false })
- var server2 = new Server(function (c) {
- c.on('connect', function (packet) {
- c.connack({returnCode: 0})
- })
- c.on('publish', function (packet) {
- if (!reconnect) {
- c.pubrec({messageId: packet.messageId})
- }
- })
- c.on('pubrel', function () {
- if (reconnect) {
- server2.close()
- done()
- } else {
- client.end(true, function () {
- client.reconnect({
- incomingStore: incomingStore,
- outgoingStore: outgoingStore
- })
- reconnect = true
- })
- }
- })
- })
- server2.listen(port + 50, function () {
- client = mqtt.connect({
- port: port + 50,
- host: 'localhost',
- clean: false,
- clientId: 'cid1',
- reconnectPeriod: 0,
- incomingStore: incomingStore,
- outgoingStore: outgoingStore
- })
- client.on('connect', function () {
- if (!reconnect) {
- client.publish('topic', 'payload', {qos: 2})
- }
- })
- client.on('error', function () {})
- })
- })
- it('should resend in-flight publish messages by published order', function (done) {
- var publishCount = 0
- var reconnect = false
- var disconnectOnce = true
- var client = {}
- var incomingStore = new mqtt.Store({ clean: false })
- var outgoingStore = new mqtt.Store({ clean: false })
- var server2 = new Server(function (c) {
- // errors are not interesting for this test
- // but they might happen on some platforms
- c.on('error', function () {})
- c.on('connect', function (packet) {
- c.connack({returnCode: 0})
- })
- c.on('publish', function (packet) {
- c.puback({messageId: packet.messageId})
- if (reconnect) {
- switch (publishCount++) {
- case 0:
- packet.payload.toString().should.equal('payload1')
- break
- case 1:
- packet.payload.toString().should.equal('payload2')
- break
- case 2:
- packet.payload.toString().should.equal('payload3')
- server2.close()
- done()
- break
- }
- } else {
- if (disconnectOnce) {
- client.end(true, function () {
- reconnect = true
- client.reconnect({
- incomingStore: incomingStore,
- outgoingStore: outgoingStore
- })
- })
- disconnectOnce = false
- }
- }
- })
- })
- server2.listen(port + 50, function () {
- client = mqtt.connect({
- port: port + 50,
- host: 'localhost',
- clean: false,
- clientId: 'cid1',
- reconnectPeriod: 0,
- incomingStore: incomingStore,
- outgoingStore: outgoingStore
- })
- client.nextId = 65535
- client.on('connect', function () {
- if (!reconnect) {
- client.publish('topic', 'payload1', {qos: 1})
- client.publish('topic', 'payload2', {qos: 1})
- client.publish('topic', 'payload3', {qos: 1})
- }
- })
- client.on('error', function () {})
- })
- })
- it('should be able to pub/sub if reconnect() is called at close handler', function (done) {
- var client = connect({ reconnectPeriod: 0 })
- var tryReconnect = true
- var reconnectEvent = false
- client.on('close', function () {
- if (tryReconnect) {
- tryReconnect = false
- client.reconnect()
- } else {
- reconnectEvent.should.equal(true)
- done()
- }
- })
- client.on('reconnect', function () {
- reconnectEvent = true
- })
- client.on('connect', function () {
- if (tryReconnect) {
- client.end()
- } else {
- client.subscribe('hello', function () {
- client.end()
- })
- }
- })
- })
- it('should be able to pub/sub if reconnect() is called at out of close handler', function (done) {
- var client = connect({ reconnectPeriod: 0 })
- var tryReconnect = true
- var reconnectEvent = false
- client.on('close', function () {
- if (tryReconnect) {
- tryReconnect = false
- setTimeout(function () {
- client.reconnect()
- }, 100)
- } else {
- reconnectEvent.should.equal(true)
- done()
- }
- })
- client.on('reconnect', function () {
- reconnectEvent = true
- })
- client.on('connect', function () {
- if (tryReconnect) {
- client.end()
- } else {
- client.subscribe('hello', function () {
- client.end()
- })
- }
- })
- })
- context('with alternate server client', function () {
- var cachedClientListeners
- var connack = version === 5 ? { reasonCode: 0 } : { returnCode: 0 }
- beforeEach(function () {
- cachedClientListeners = server.listeners('client')
- server.removeAllListeners('client')
- })
- afterEach(function () {
- server.removeAllListeners('client')
- cachedClientListeners.forEach(function (listener) {
- server.on('client', listener)
- })
- })
- it('should resubscribe even if disconnect is before suback', function (done) {
- var client = mqtt.connect(Object.assign({ reconnectPeriod: 100 }, config))
- var subscribeCount = 0
- var connectCount = 0
- server.on('client', function (serverClient) {
- serverClient.on('connect', function () {
- connectCount++
- serverClient.connack(connack)
- })
- serverClient.on('subscribe', function () {
- subscribeCount++
- // disconnect before sending the suback on the first subscribe
- if (subscribeCount === 1) {
- client.stream.end()
- }
- // after the second connection, confirm that the only two
- // subscribes have taken place, then cleanup and exit
- if (connectCount >= 2) {
- subscribeCount.should.equal(2)
- client.end(true, done)
- }
- })
- })
- client.subscribe('hello')
- })
- it('should resubscribe exactly once', function (done) {
- var client = mqtt.connect(Object.assign({ reconnectPeriod: 100 }, config))
- var subscribeCount = 0
- server.on('client', function (serverClient) {
- serverClient.on('connect', function () {
- serverClient.connack(connack)
- })
- serverClient.on('subscribe', function () {
- subscribeCount++
- // disconnect before sending the suback on the first subscribe
- if (subscribeCount === 1) {
- client.stream.end()
- }
- // after the second connection, only two subs
- // subscribes have taken place, then cleanup and exit
- if (subscribeCount === 2) {
- client.end(true, done)
- }
- })
- })
- client.subscribe('hello')
- })
- })
- })
- }
|