abstract_client.js 86 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084
  1. 'use strict'
  2. /**
  3. * Testing dependencies
  4. */
  5. var should = require('should')
  6. var sinon = require('sinon')
  7. var mqtt = require('../')
  8. var xtend = require('xtend')
  9. var Server = require('./server')
  10. var Store = require('./../lib/store')
  11. var port = 9876
  12. module.exports = function (server, config) {
  13. var version = config.protocolVersion || 4
  14. function connect (opts) {
  15. opts = xtend(config, opts)
  16. return mqtt.connect(opts)
  17. }
  18. describe('closing', function () {
  19. it('should emit close if stream closes', function (done) {
  20. var client = connect()
  21. client.once('connect', function () {
  22. client.stream.end()
  23. })
  24. client.once('close', function () {
  25. client.end()
  26. done()
  27. })
  28. })
  29. it('should mark the client as disconnected', function (done) {
  30. var client = connect()
  31. client.once('close', function () {
  32. client.end()
  33. if (!client.connected) {
  34. done()
  35. } else {
  36. done(new Error('Not marked as disconnected'))
  37. }
  38. })
  39. client.once('connect', function () {
  40. client.stream.end()
  41. })
  42. })
  43. it('should stop ping timer if stream closes', function (done) {
  44. var client = connect()
  45. client.once('close', function () {
  46. should.not.exist(client.pingTimer)
  47. client.end()
  48. done()
  49. })
  50. client.once('connect', function () {
  51. should.exist(client.pingTimer)
  52. client.stream.end()
  53. })
  54. })
  55. it('should emit close after end called', function (done) {
  56. var client = connect()
  57. client.once('close', function () {
  58. done()
  59. })
  60. client.once('connect', function () {
  61. client.end()
  62. })
  63. })
  64. it('should emit end after end called and client must be disconnected', function (done) {
  65. var client = connect()
  66. client.once('end', function () {
  67. if (client.disconnected) {
  68. return done()
  69. }
  70. done(new Error('client must be disconnected'))
  71. })
  72. client.once('connect', function () {
  73. client.end()
  74. })
  75. })
  76. it('should pass store close error to end callback but not to end listeners', function (done) {
  77. var store = new Store()
  78. var client = connect({outgoingStore: store})
  79. store.close = function (cb) {
  80. cb(new Error('test'))
  81. }
  82. client.once('end', function () {
  83. if (arguments.length === 0) {
  84. return done()
  85. }
  86. throw new Error('no argument shoould be passed to event')
  87. })
  88. client.once('connect', function () {
  89. client.end(function (test) {
  90. if (test && test.message === 'test') {
  91. return
  92. }
  93. throw new Error('bad argument passed to callback')
  94. })
  95. })
  96. })
  97. it('should return `this` if end called twice', function (done) {
  98. var client = connect()
  99. client.once('connect', function () {
  100. client.end()
  101. var value = client.end()
  102. if (value === client) {
  103. done()
  104. } else {
  105. done(new Error('Not returning client.'))
  106. }
  107. })
  108. })
  109. it('should emit end only on first client end', function (done) {
  110. var client = connect()
  111. client.once('end', function () {
  112. var timeout = setTimeout(done.bind(null), 200)
  113. client.once('end', function () {
  114. clearTimeout(timeout)
  115. done(new Error('end was emitted twice'))
  116. })
  117. client.end()
  118. })
  119. client.once('connect', client.end.bind(client))
  120. })
  121. it('should stop ping timer after end called', function (done) {
  122. var client = connect()
  123. client.once('connect', function () {
  124. should.exist(client.pingTimer)
  125. client.end()
  126. should.not.exist(client.pingTimer)
  127. done()
  128. })
  129. })
  130. it('should be able to end even on a failed connection', function (done) {
  131. var client = connect({host: 'this_hostname_should_not_exist'})
  132. var timeout = setTimeout(function () {
  133. done(new Error('Failed to end a disconnected client'))
  134. }, 500)
  135. setTimeout(function () {
  136. client.end(function () {
  137. clearTimeout(timeout)
  138. done()
  139. })
  140. }, 200)
  141. })
  142. it('should emit end even on a failed connection', function (done) {
  143. var client = connect({host: 'this_hostname_should_not_exist'})
  144. var timeout = setTimeout(function () {
  145. done(new Error('Disconnected client has failed to emit end'))
  146. }, 500)
  147. client.once('end', function () {
  148. clearTimeout(timeout)
  149. done()
  150. })
  151. setTimeout(client.end.bind(client), 200)
  152. })
  153. it('should emit end only once for a reconnecting client', function (done) {
  154. var client = connect({host: 'this_hostname_should_not_exist', connectTimeout: 10, reconnectPeriod: 10})
  155. client.once('end', function () {
  156. var timeout = setTimeout(done.bind(null))
  157. client.once('end', function () {
  158. clearTimeout(timeout)
  159. done(new Error('end emitted twice'))
  160. })
  161. })
  162. setTimeout(client.end.bind(client), 300)
  163. })
  164. })
  165. describe('connecting', function () {
  166. it('should connect to the broker', function (done) {
  167. var client = connect()
  168. client.on('error', done)
  169. server.once('client', function () {
  170. client.end()
  171. done()
  172. })
  173. })
  174. it('should send a default client id', function (done) {
  175. var client = connect()
  176. client.on('error', done)
  177. server.once('client', function (serverClient) {
  178. serverClient.once('connect', function (packet) {
  179. packet.clientId.should.match(/mqttjs.*/)
  180. serverClient.disconnect()
  181. done()
  182. })
  183. })
  184. })
  185. it('should send be clean by default', function (done) {
  186. var client = connect()
  187. client.on('error', done)
  188. server.once('client', function (serverClient) {
  189. serverClient.once('connect', function (packet) {
  190. packet.clean.should.be.true()
  191. serverClient.disconnect()
  192. done()
  193. })
  194. })
  195. })
  196. it('should connect with the given client id', function (done) {
  197. var client = connect({clientId: 'testclient'})
  198. client.on('error', function (err) {
  199. throw err
  200. })
  201. server.once('client', function (serverClient) {
  202. serverClient.once('connect', function (packet) {
  203. packet.clientId.should.match(/testclient/)
  204. serverClient.disconnect()
  205. done()
  206. })
  207. })
  208. })
  209. it('should connect with the client id and unclean state', function (done) {
  210. var client = connect({clientId: 'testclient', clean: false})
  211. client.on('error', function (err) {
  212. throw err
  213. })
  214. server.once('client', function (serverClient) {
  215. serverClient.once('connect', function (packet) {
  216. packet.clientId.should.match(/testclient/)
  217. packet.clean.should.be.false()
  218. serverClient.disconnect()
  219. done()
  220. })
  221. })
  222. })
  223. it('should require a clientId with clean=false', function (done) {
  224. try {
  225. var client = connect({ clean: false })
  226. client.on('error', function (err) {
  227. done(err)
  228. // done(new Error('should have thrown'));
  229. })
  230. } catch (err) {
  231. done()
  232. }
  233. })
  234. it('should default to localhost', function (done) {
  235. var client = connect({clientId: 'testclient'})
  236. client.on('error', function (err) {
  237. throw err
  238. })
  239. server.once('client', function (serverClient) {
  240. serverClient.once('connect', function (packet) {
  241. packet.clientId.should.match(/testclient/)
  242. serverClient.disconnect()
  243. done()
  244. })
  245. })
  246. })
  247. it('should emit connect', function (done) {
  248. var client = connect()
  249. client.once('connect', function () {
  250. client.end()
  251. done()
  252. })
  253. client.once('error', done)
  254. })
  255. it('should provide connack packet with connect event', function (done) {
  256. var connack = version === 5 ? {reasonCode: 0} : {returnCode: 0}
  257. server.once('client', function (serverClient) {
  258. connack.sessionPresent = true
  259. serverClient.connack(connack)
  260. server.once('client', function (serverClient) {
  261. connack.sessionPresent = false
  262. serverClient.connack(connack)
  263. })
  264. })
  265. var client = connect()
  266. client.once('connect', function (packet) {
  267. should(packet.sessionPresent).be.equal(true)
  268. client.once('connect', function (packet) {
  269. should(packet.sessionPresent).be.equal(false)
  270. client.end()
  271. done()
  272. })
  273. })
  274. })
  275. it('should mark the client as connected', function (done) {
  276. var client = connect()
  277. client.once('connect', function () {
  278. client.end()
  279. if (client.connected) {
  280. done()
  281. } else {
  282. done(new Error('Not marked as connected'))
  283. }
  284. })
  285. })
  286. it('should emit error', function (done) {
  287. var client = connect({clientId: 'invalid'})
  288. client.once('connect', function () {
  289. done(new Error('Should not emit connect'))
  290. })
  291. client.once('error', function (error) {
  292. var value = version === 5 ? 128 : 2
  293. should(error.code).be.equal(value) // code for clientID identifer rejected
  294. client.end()
  295. done()
  296. })
  297. })
  298. it('should have different client ids', function (done) {
  299. var client1 = connect()
  300. var client2 = connect()
  301. client1.options.clientId.should.not.equal(client2.options.clientId)
  302. client1.end(true)
  303. client2.end(true)
  304. setImmediate(done)
  305. })
  306. })
  307. describe('handling offline states', function () {
  308. it('should emit offline events once when the client transitions from connected states to disconnected ones', function (done) {
  309. var client = connect({reconnectPeriod: 20})
  310. client.on('connect', function () {
  311. this.stream.end()
  312. })
  313. client.on('offline', function () {
  314. client.end(true, done)
  315. })
  316. })
  317. it('should emit offline events once when the client (at first) can NOT connect to servers', function (done) {
  318. // fake a port
  319. var client = connect({ reconnectPeriod: 20, port: 4557 })
  320. client.on('offline', function () {
  321. client.end(true, done)
  322. })
  323. })
  324. })
  325. describe('topic validations when subscribing', function () {
  326. it('should be ok for well-formated topics', function (done) {
  327. var client = connect()
  328. client.subscribe(
  329. [
  330. '+', '+/event', 'event/+', '#', 'event/#', 'system/event/+',
  331. 'system/+/event', 'system/registry/event/#', 'system/+/event/#',
  332. 'system/registry/event/new_device', 'system/+/+/new_device'
  333. ],
  334. function (err) {
  335. client.end(function () {
  336. if (err) {
  337. return done(new Error(err))
  338. }
  339. done()
  340. })
  341. }
  342. )
  343. })
  344. it('should return an error (via callbacks) for topic #/event', function (done) {
  345. var client = connect()
  346. client.subscribe(['#/event', 'event#', 'event+'], function (err) {
  347. client.end(false, function () {
  348. if (err) {
  349. return done()
  350. }
  351. done(new Error('Validations do NOT work'))
  352. })
  353. })
  354. })
  355. it('should return an empty array for duplicate subs', function (done) {
  356. var client = connect()
  357. client.subscribe('event', function (err, granted1) {
  358. if (err) {
  359. return done(err)
  360. }
  361. client.subscribe('event', function (err, granted2) {
  362. if (err) {
  363. return done(err)
  364. }
  365. granted2.should.Array()
  366. granted2.should.be.empty()
  367. done()
  368. })
  369. })
  370. })
  371. it('should return an error (via callbacks) for topic #/event', function (done) {
  372. var client = connect()
  373. client.subscribe('#/event', function (err) {
  374. client.end(function () {
  375. if (err) {
  376. return done()
  377. }
  378. done(new Error('Validations do NOT work'))
  379. })
  380. })
  381. })
  382. it('should return an error (via callbacks) for topic event#', function (done) {
  383. var client = connect()
  384. client.subscribe('event#', function (err) {
  385. client.end(function () {
  386. if (err) {
  387. return done()
  388. }
  389. done(new Error('Validations do NOT work'))
  390. })
  391. })
  392. })
  393. it('should return an error (via callbacks) for topic system/#/event', function (done) {
  394. var client = connect()
  395. client.subscribe('system/#/event', function (err) {
  396. client.end(function () {
  397. if (err) {
  398. return done()
  399. }
  400. done(new Error('Validations do NOT work'))
  401. })
  402. })
  403. })
  404. it('should return an error (via callbacks) for empty topic list', function (done) {
  405. var client = connect()
  406. client.subscribe([], function (err) {
  407. client.end()
  408. if (err) {
  409. return done()
  410. }
  411. done(new Error('Validations do NOT work'))
  412. })
  413. })
  414. it('should return an error (via callbacks) for topic system/+/#/event', function (done) {
  415. var client = connect()
  416. client.subscribe('system/+/#/event', function (err) {
  417. client.end(true, function () {
  418. if (err) {
  419. return done()
  420. }
  421. done(new Error('Validations do NOT work'))
  422. })
  423. })
  424. })
  425. })
  426. describe('offline messages', function () {
  427. it('should queue message until connected', function (done) {
  428. var client = connect()
  429. client.publish('test', 'test')
  430. client.subscribe('test')
  431. client.unsubscribe('test')
  432. client.queue.length.should.equal(3)
  433. client.once('connect', function () {
  434. client.queue.length.should.equal(0)
  435. setTimeout(function () {
  436. client.end(true, done)
  437. }, 10)
  438. })
  439. })
  440. it('should not queue qos 0 messages if queueQoSZero is false', function (done) {
  441. var client = connect({queueQoSZero: false})
  442. client.publish('test', 'test', {qos: 0})
  443. client.queue.length.should.equal(0)
  444. client.on('connect', function () {
  445. setTimeout(function () {
  446. client.end(true, done)
  447. }, 10)
  448. })
  449. })
  450. it('should queue qos != 0 messages', function (done) {
  451. var client = connect({queueQoSZero: false})
  452. client.publish('test', 'test', {qos: 1})
  453. client.publish('test', 'test', {qos: 2})
  454. client.subscribe('test')
  455. client.unsubscribe('test')
  456. client.queue.length.should.equal(2)
  457. client.on('connect', function () {
  458. setTimeout(function () {
  459. client.end(true, done)
  460. }, 10)
  461. })
  462. })
  463. it('should not interrupt messages', function (done) {
  464. var client = null
  465. var incomingStore = new mqtt.Store({ clean: false })
  466. var outgoingStore = new mqtt.Store({ clean: false })
  467. var publishCount = 0
  468. var server2 = new Server(function (c) {
  469. c.on('connect', function () {
  470. c.connack({returnCode: 0})
  471. })
  472. c.on('publish', function (packet) {
  473. if (packet.qos !== 0) {
  474. c.puback({messageId: packet.messageId})
  475. }
  476. switch (publishCount++) {
  477. case 0:
  478. packet.payload.toString().should.equal('payload1')
  479. break
  480. case 1:
  481. packet.payload.toString().should.equal('payload2')
  482. break
  483. case 2:
  484. packet.payload.toString().should.equal('payload3')
  485. break
  486. case 3:
  487. packet.payload.toString().should.equal('payload4')
  488. server2.close()
  489. done()
  490. break
  491. }
  492. })
  493. })
  494. server2.listen(port + 50, function () {
  495. client = mqtt.connect({
  496. port: port + 50,
  497. host: 'localhost',
  498. clean: false,
  499. clientId: 'cid1',
  500. reconnectPeriod: 0,
  501. incomingStore: incomingStore,
  502. outgoingStore: outgoingStore,
  503. queueQoSZero: true
  504. })
  505. client.on('packetreceive', function (packet) {
  506. if (packet.cmd === 'connack') {
  507. setImmediate(
  508. function () {
  509. client.publish('test', 'payload3', {qos: 1})
  510. client.publish('test', 'payload4', {qos: 0})
  511. }
  512. )
  513. }
  514. })
  515. client.publish('test', 'payload1', {qos: 2})
  516. client.publish('test', 'payload2', {qos: 2})
  517. })
  518. })
  519. it('should call cb if an outgoing QoS 0 message is not sent', function (done) {
  520. var client = connect({queueQoSZero: false})
  521. var called = false
  522. client.publish('test', 'test', {qos: 0}, function () {
  523. called = true
  524. })
  525. client.on('connect', function () {
  526. called.should.equal(true)
  527. setTimeout(function () {
  528. client.end(true, done)
  529. }, 10)
  530. })
  531. })
  532. it('should delay ending up until all inflight messages are delivered', function (done) {
  533. var client = connect()
  534. var subscribeCalled = false
  535. client.on('connect', function () {
  536. client.subscribe('test', function () {
  537. subscribeCalled = true
  538. })
  539. client.publish('test', 'test', function () {
  540. client.end(false, function () {
  541. subscribeCalled.should.be.equal(true)
  542. done()
  543. })
  544. })
  545. })
  546. })
  547. it('wait QoS 1 publish messages', function (done) {
  548. var client = connect()
  549. var messageReceived = false
  550. client.on('connect', function () {
  551. client.subscribe('test')
  552. client.publish('test', 'test', { qos: 1 }, function () {
  553. client.end(false, function () {
  554. messageReceived.should.equal(true)
  555. done()
  556. })
  557. })
  558. client.on('message', function () {
  559. messageReceived = true
  560. })
  561. })
  562. server.once('client', function (serverClient) {
  563. serverClient.on('subscribe', function () {
  564. serverClient.on('publish', function (packet) {
  565. serverClient.publish(packet)
  566. })
  567. })
  568. })
  569. })
  570. it('does not wait acks when force-closing', function (done) {
  571. // non-running broker
  572. var client = connect('mqtt://localhost:8993')
  573. client.publish('test', 'test', { qos: 1 })
  574. client.end(true, done)
  575. })
  576. it('should call cb if store.put fails', function (done) {
  577. const store = new Store()
  578. store.put = function (packet, cb) {
  579. process.nextTick(cb, new Error('oops there is an error'))
  580. }
  581. var client = connect({ incomingStore: store, outgoingStore: store })
  582. client.publish('test', 'test', { qos: 2 }, function (err) {
  583. if (err) {
  584. client.end(true, done)
  585. }
  586. })
  587. })
  588. })
  589. describe('publishing', function () {
  590. it('should publish a message (offline)', function (done) {
  591. var client = connect()
  592. var payload = 'test'
  593. var topic = 'test'
  594. client.publish(topic, payload)
  595. server.on('client', onClient)
  596. function onClient (serverClient) {
  597. serverClient.once('connect', function () {
  598. server.removeListener('client', onClient)
  599. })
  600. serverClient.once('publish', function (packet) {
  601. packet.topic.should.equal(topic)
  602. packet.payload.toString().should.equal(payload)
  603. packet.qos.should.equal(0)
  604. packet.retain.should.equal(false)
  605. client.end(true, done)
  606. })
  607. }
  608. })
  609. it('should publish a message (online)', function (done) {
  610. var client = connect()
  611. var payload = 'test'
  612. var topic = 'test'
  613. client.on('connect', function () {
  614. client.publish(topic, payload)
  615. })
  616. server.once('client', function (serverClient) {
  617. serverClient.once('publish', function (packet) {
  618. packet.topic.should.equal(topic)
  619. packet.payload.toString().should.equal(payload)
  620. packet.qos.should.equal(0)
  621. packet.retain.should.equal(false)
  622. client.end()
  623. done()
  624. })
  625. })
  626. })
  627. it('should publish a message (retain, offline)', function (done) {
  628. var client = connect({ queueQoSZero: true })
  629. var payload = 'test'
  630. var topic = 'test'
  631. var called = false
  632. client.publish(topic, payload, { retain: true }, function () {
  633. called = true
  634. })
  635. server.once('client', function (serverClient) {
  636. serverClient.once('publish', function (packet) {
  637. packet.topic.should.equal(topic)
  638. packet.payload.toString().should.equal(payload)
  639. packet.qos.should.equal(0)
  640. packet.retain.should.equal(true)
  641. called.should.equal(true)
  642. client.end()
  643. done()
  644. })
  645. })
  646. })
  647. it('should emit a packetsend event', function (done) {
  648. var client = connect()
  649. var payload = 'test_payload'
  650. var testTopic = 'testTopic'
  651. client.on('packetsend', function (packet) {
  652. if (packet.cmd === 'publish') {
  653. packet.qos.should.equal(0)
  654. packet.topic.should.equal(testTopic)
  655. packet.payload.should.equal(payload)
  656. packet.retain.should.equal(false)
  657. client.end()
  658. done()
  659. }
  660. })
  661. client.publish(testTopic, payload)
  662. })
  663. it('should accept options', function (done) {
  664. var client = connect()
  665. var payload = 'test'
  666. var topic = 'test'
  667. var opts = {
  668. retain: true,
  669. qos: 1
  670. }
  671. client.once('connect', function () {
  672. client.publish(topic, payload, opts)
  673. })
  674. server.once('client', function (serverClient) {
  675. serverClient.once('publish', function (packet) {
  676. packet.topic.should.equal(topic)
  677. packet.payload.toString().should.equal(payload)
  678. packet.qos.should.equal(opts.qos, 'incorrect qos')
  679. packet.retain.should.equal(opts.retain, 'incorrect ret')
  680. packet.dup.should.equal(false, 'incorrect dup')
  681. client.end()
  682. done()
  683. })
  684. })
  685. })
  686. it('should publish with the default options for an empty parameter', function (done) {
  687. var client = connect()
  688. var payload = 'test'
  689. var topic = 'test'
  690. var defaultOpts = {qos: 0, retain: false, dup: false}
  691. client.once('connect', function () {
  692. client.publish(topic, payload, {})
  693. })
  694. server.once('client', function (serverClient) {
  695. serverClient.once('publish', function (packet) {
  696. packet.topic.should.equal(topic)
  697. packet.payload.toString().should.equal(payload)
  698. packet.qos.should.equal(defaultOpts.qos, 'incorrect qos')
  699. packet.retain.should.equal(defaultOpts.retain, 'incorrect ret')
  700. packet.dup.should.equal(defaultOpts.dup, 'incorrect dup')
  701. client.end()
  702. done()
  703. })
  704. })
  705. })
  706. it('should mark a message as duplicate when "dup" option is set', function (done) {
  707. var client = connect()
  708. var payload = 'duplicated-test'
  709. var topic = 'test'
  710. var opts = {
  711. retain: true,
  712. qos: 1,
  713. dup: true
  714. }
  715. client.once('connect', function () {
  716. client.publish(topic, payload, opts)
  717. })
  718. server.once('client', function (serverClient) {
  719. serverClient.once('publish', function (packet) {
  720. packet.topic.should.equal(topic)
  721. packet.payload.toString().should.equal(payload)
  722. packet.dup.should.equal(opts.dup, 'incorrect dup')
  723. client.end()
  724. done()
  725. })
  726. })
  727. })
  728. it('should fire a callback (qos 0)', function (done) {
  729. var client = connect()
  730. client.once('connect', function () {
  731. client.publish('a', 'b', function () {
  732. client.end()
  733. done()
  734. })
  735. })
  736. })
  737. it('should fire a callback (qos 1)', function (done) {
  738. var client = connect()
  739. var opts = { qos: 1 }
  740. client.once('connect', function () {
  741. client.publish('a', 'b', opts, function () {
  742. client.end()
  743. done()
  744. })
  745. })
  746. })
  747. it('should fire a callback (qos 2)', function (done) {
  748. var client = connect()
  749. var opts = { qos: 2 }
  750. client.once('connect', function () {
  751. client.publish('a', 'b', opts, function () {
  752. client.end()
  753. done()
  754. })
  755. })
  756. })
  757. it('should support UTF-8 characters in topic', function (done) {
  758. var client = connect()
  759. client.once('connect', function () {
  760. client.publish('中国', 'hello', function () {
  761. client.end()
  762. done()
  763. })
  764. })
  765. })
  766. it('should support UTF-8 characters in payload', function (done) {
  767. var client = connect()
  768. client.once('connect', function () {
  769. client.publish('hello', '中国', function () {
  770. client.end()
  771. done()
  772. })
  773. })
  774. })
  775. it('should publish 10 QoS 2 and receive them', function (done) {
  776. var client = connect()
  777. var count = 0
  778. client.on('connect', function () {
  779. client.subscribe('test')
  780. client.publish('test', 'test', { qos: 2 })
  781. })
  782. client.on('message', function () {
  783. if (count >= 10) {
  784. client.end()
  785. done()
  786. } else {
  787. client.publish('test', 'test', { qos: 2 })
  788. }
  789. })
  790. server.once('client', function (serverClient) {
  791. serverClient.on('offline', function () {
  792. client.end()
  793. done('error went offline... didnt see this happen')
  794. })
  795. serverClient.on('subscribe', function () {
  796. serverClient.on('publish', function (packet) {
  797. serverClient.publish(packet)
  798. })
  799. })
  800. serverClient.on('pubrel', function () {
  801. count++
  802. })
  803. })
  804. })
  805. function testQosHandleMessage (qos, done) {
  806. var client = connect()
  807. var messageEventCount = 0
  808. var handleMessageCount = 0
  809. client.handleMessage = function (packet, callback) {
  810. setTimeout(function () {
  811. handleMessageCount++
  812. // next message event should not emit until handleMessage completes
  813. handleMessageCount.should.equal(messageEventCount)
  814. if (handleMessageCount === 10) {
  815. setTimeout(function () {
  816. client.end()
  817. done()
  818. })
  819. }
  820. callback()
  821. }, 100)
  822. }
  823. client.on('message', function (topic, message, packet) {
  824. messageEventCount++
  825. })
  826. client.on('connect', function () {
  827. client.subscribe('test')
  828. })
  829. server.once('client', function (serverClient) {
  830. serverClient.on('offline', function () {
  831. client.end()
  832. done('error went offline... didnt see this happen')
  833. })
  834. serverClient.on('subscribe', function () {
  835. for (var i = 0; i < 10; i++) {
  836. serverClient.publish({
  837. messageId: i,
  838. topic: 'test',
  839. payload: 'test' + i,
  840. qos: qos
  841. })
  842. }
  843. })
  844. })
  845. }
  846. it('should publish 10 QoS 0 and receive them only when `handleMessage` finishes', function (done) {
  847. testQosHandleMessage(0, done)
  848. })
  849. it('should publish 10 QoS 1 and receive them only when `handleMessage` finishes', function (done) {
  850. testQosHandleMessage(1, done)
  851. })
  852. it('should publish 10 QoS 2 and receive them only when `handleMessage` finishes', function (done) {
  853. testQosHandleMessage(2, done)
  854. })
  855. it('should not send a `puback` if the execution of `handleMessage` fails for messages with QoS `1`', function (done) {
  856. var client = connect()
  857. client.handleMessage = function (packet, callback) {
  858. callback(new Error('Error thrown by the application'))
  859. }
  860. client._sendPacket = sinon.spy()
  861. client._handlePublish({
  862. messageId: Math.floor(65535 * Math.random()),
  863. topic: 'test',
  864. payload: 'test',
  865. qos: 1
  866. }, function (err) {
  867. should.exist(err)
  868. })
  869. client._sendPacket.callCount.should.equal(0)
  870. client.end()
  871. client.on('connect', function () { done() })
  872. })
  873. it('should silently ignore errors thrown by `handleMessage` and return when no callback is passed ' +
  874. 'into `handlePublish` method', function (done) {
  875. var client = connect()
  876. client.handleMessage = function (packet, callback) {
  877. callback(new Error('Error thrown by the application'))
  878. }
  879. try {
  880. client._handlePublish({
  881. messageId: Math.floor(65535 * Math.random()),
  882. topic: 'test',
  883. payload: 'test',
  884. qos: 1
  885. })
  886. done()
  887. } catch (err) {
  888. done(err)
  889. } finally {
  890. client.end()
  891. }
  892. })
  893. it('should handle error with async incoming store in QoS 2 `handlePublish` method', function (done) {
  894. function AsyncStore () {
  895. if (!(this instanceof AsyncStore)) {
  896. return new AsyncStore()
  897. }
  898. }
  899. AsyncStore.prototype.put = function (packet, cb) {
  900. process.nextTick(function () {
  901. cb(new Error('Error'))
  902. })
  903. }
  904. var store = new AsyncStore()
  905. var client = connect({incomingStore: store})
  906. client._handlePublish({
  907. messageId: 1,
  908. topic: 'test',
  909. payload: 'test',
  910. qos: 2
  911. }, function () {
  912. done()
  913. client.end()
  914. })
  915. })
  916. it('should handle error with async incoming store in QoS 2 `handlePubrel` method', function (done) {
  917. function AsyncStore () {
  918. if (!(this instanceof AsyncStore)) {
  919. return new AsyncStore()
  920. }
  921. }
  922. AsyncStore.prototype.del = function (packet, cb) {
  923. process.nextTick(function () {
  924. cb(new Error('Error'))
  925. })
  926. }
  927. AsyncStore.prototype.get = function (packet, cb) {
  928. process.nextTick(function () {
  929. cb(null, {cmd: 'publish'})
  930. })
  931. }
  932. var store = new AsyncStore()
  933. var client = connect({incomingStore: store})
  934. client._handlePubrel({
  935. messageId: 1,
  936. qos: 2
  937. }, function () {
  938. done()
  939. client.end()
  940. })
  941. })
  942. it('should handle success with async incoming store in QoS 2 `handlePubrel` method', function (done) {
  943. var delComplete = false
  944. function AsyncStore () {
  945. if (!(this instanceof AsyncStore)) {
  946. return new AsyncStore()
  947. }
  948. }
  949. AsyncStore.prototype.del = function (packet, cb) {
  950. process.nextTick(function () {
  951. delComplete = true
  952. cb(null)
  953. })
  954. }
  955. AsyncStore.prototype.get = function (packet, cb) {
  956. process.nextTick(function () {
  957. cb(null, {cmd: 'publish'})
  958. })
  959. }
  960. var store = new AsyncStore()
  961. var client = connect({incomingStore: store})
  962. client._handlePubrel({
  963. messageId: 1,
  964. qos: 2
  965. }, function () {
  966. delComplete.should.equal(true)
  967. done()
  968. client.end()
  969. })
  970. })
  971. it('should handle error with async incoming store in QoS 1 `handlePublish` method', function (done) {
  972. function AsyncStore () {
  973. if (!(this instanceof AsyncStore)) {
  974. return new AsyncStore()
  975. }
  976. }
  977. AsyncStore.prototype.put = function (packet, cb) {
  978. process.nextTick(function () {
  979. cb(null, 'Error')
  980. })
  981. }
  982. var store = new AsyncStore()
  983. var client = connect({incomingStore: store})
  984. client._handlePublish({
  985. messageId: 1,
  986. topic: 'test',
  987. payload: 'test',
  988. qos: 1
  989. }, function () {
  990. done()
  991. client.end()
  992. })
  993. })
  994. it('should not send a `pubcomp` if the execution of `handleMessage` fails for messages with QoS `2`', function (done) {
  995. var store = new Store()
  996. var client = connect({incomingStore: store})
  997. var messageId = Math.floor(65535 * Math.random())
  998. var topic = 'test'
  999. var payload = 'test'
  1000. var qos = 2
  1001. client.handleMessage = function (packet, callback) {
  1002. callback(new Error('Error thrown by the application'))
  1003. }
  1004. client.once('connect', function () {
  1005. client.subscribe(topic, {qos: 2})
  1006. store.put({
  1007. messageId: messageId,
  1008. topic: topic,
  1009. payload: payload,
  1010. qos: qos,
  1011. cmd: 'publish'
  1012. }, function () {
  1013. // cleans up the client
  1014. client.end()
  1015. client._sendPacket = sinon.spy()
  1016. client._handlePubrel({cmd: 'pubrel', messageId: messageId}, function (err) {
  1017. should.exist(err)
  1018. })
  1019. client._sendPacket.callCount.should.equal(0)
  1020. done()
  1021. })
  1022. })
  1023. })
  1024. it('should silently ignore errors thrown by `handleMessage` and return when no callback is passed ' +
  1025. 'into `handlePubrel` method', function (done) {
  1026. var store = new Store()
  1027. var client = connect({incomingStore: store})
  1028. var messageId = Math.floor(65535 * Math.random())
  1029. var topic = 'test'
  1030. var payload = 'test'
  1031. var qos = 2
  1032. client.handleMessage = function (packet, callback) {
  1033. callback(new Error('Error thrown by the application'))
  1034. }
  1035. client.once('connect', function () {
  1036. client.subscribe(topic, {qos: 2})
  1037. store.put({
  1038. messageId: messageId,
  1039. topic: topic,
  1040. payload: payload,
  1041. qos: qos,
  1042. cmd: 'publish'
  1043. }, function () {
  1044. try {
  1045. client._handlePubrel({cmd: 'pubrel', messageId: messageId})
  1046. done()
  1047. } catch (err) {
  1048. done(err)
  1049. } finally {
  1050. client.end()
  1051. }
  1052. })
  1053. })
  1054. })
  1055. it('should keep message order', function (done) {
  1056. var publishCount = 0
  1057. var reconnect = false
  1058. var client = {}
  1059. var incomingStore = new mqtt.Store({ clean: false })
  1060. var outgoingStore = new mqtt.Store({ clean: false })
  1061. var server2 = new Server(function (c) {
  1062. // errors are not interesting for this test
  1063. // but they might happen on some platforms
  1064. c.on('error', function () {})
  1065. c.on('connect', function (packet) {
  1066. c.connack({returnCode: 0})
  1067. })
  1068. c.on('publish', function (packet) {
  1069. c.puback({messageId: packet.messageId})
  1070. if (reconnect) {
  1071. switch (publishCount++) {
  1072. case 0:
  1073. packet.payload.toString().should.equal('payload1')
  1074. break
  1075. case 1:
  1076. packet.payload.toString().should.equal('payload2')
  1077. break
  1078. case 2:
  1079. packet.payload.toString().should.equal('payload3')
  1080. server2.close()
  1081. done()
  1082. break
  1083. }
  1084. }
  1085. })
  1086. })
  1087. server2.listen(port + 50, function () {
  1088. client = mqtt.connect({
  1089. port: port + 50,
  1090. host: 'localhost',
  1091. clean: false,
  1092. clientId: 'cid1',
  1093. reconnectPeriod: 0,
  1094. incomingStore: incomingStore,
  1095. outgoingStore: outgoingStore
  1096. })
  1097. client.on('connect', function () {
  1098. if (!reconnect) {
  1099. client.publish('topic', 'payload1', {qos: 1})
  1100. client.publish('topic', 'payload2', {qos: 1})
  1101. client.end(true)
  1102. } else {
  1103. client.publish('topic', 'payload3', {qos: 1})
  1104. }
  1105. })
  1106. client.on('close', function () {
  1107. if (!reconnect) {
  1108. client.reconnect({
  1109. clean: false,
  1110. incomingStore: incomingStore,
  1111. outgoingStore: outgoingStore
  1112. })
  1113. reconnect = true
  1114. }
  1115. })
  1116. })
  1117. })
  1118. function testCallbackStorePutByQoS (qos, clean, expected, done) {
  1119. var client = connect({
  1120. clean: clean,
  1121. clientId: 'testId'
  1122. })
  1123. var callbacks = []
  1124. function cbStorePut () {
  1125. callbacks.push('storeput')
  1126. }
  1127. client.on('connect', function () {
  1128. client.publish('test', 'test', {qos: qos, cbStorePut: cbStorePut}, function (err) {
  1129. if (err) done(err)
  1130. callbacks.push('publish')
  1131. should.deepEqual(callbacks, expected)
  1132. done()
  1133. })
  1134. client.end()
  1135. })
  1136. }
  1137. it('should not call cbStorePut when publishing message with QoS `0` and clean `true`', function (done) {
  1138. testCallbackStorePutByQoS(0, true, ['publish'], done)
  1139. })
  1140. it('should not call cbStorePut when publishing message with QoS `0` and clean `false`', function (done) {
  1141. testCallbackStorePutByQoS(0, false, ['publish'], done)
  1142. })
  1143. it('should call cbStorePut before publish completes when publishing message with QoS `1` and clean `true`', function (done) {
  1144. testCallbackStorePutByQoS(1, true, ['storeput', 'publish'], done)
  1145. })
  1146. it('should call cbStorePut before publish completes when publishing message with QoS `1` and clean `false`', function (done) {
  1147. testCallbackStorePutByQoS(1, false, ['storeput', 'publish'], done)
  1148. })
  1149. it('should call cbStorePut before publish completes when publishing message with QoS `2` and clean `true`', function (done) {
  1150. testCallbackStorePutByQoS(2, true, ['storeput', 'publish'], done)
  1151. })
  1152. it('should call cbStorePut before publish completes when publishing message with QoS `2` and clean `false`', function (done) {
  1153. testCallbackStorePutByQoS(2, false, ['storeput', 'publish'], done)
  1154. })
  1155. })
  1156. describe('unsubscribing', function () {
  1157. it('should send an unsubscribe packet (offline)', function (done) {
  1158. var client = connect()
  1159. client.unsubscribe('test')
  1160. server.once('client', function (serverClient) {
  1161. serverClient.once('unsubscribe', function (packet) {
  1162. packet.unsubscriptions.should.containEql('test')
  1163. client.end()
  1164. done()
  1165. })
  1166. })
  1167. })
  1168. it('should send an unsubscribe packet', function (done) {
  1169. var client = connect()
  1170. var topic = 'topic'
  1171. client.once('connect', function () {
  1172. client.unsubscribe(topic)
  1173. })
  1174. server.once('client', function (serverClient) {
  1175. serverClient.once('unsubscribe', function (packet) {
  1176. packet.unsubscriptions.should.containEql(topic)
  1177. client.end()
  1178. done()
  1179. })
  1180. })
  1181. })
  1182. it('should emit a packetsend event', function (done) {
  1183. var client = connect()
  1184. var testTopic = 'testTopic'
  1185. client.once('connect', function () {
  1186. client.subscribe(testTopic)
  1187. })
  1188. client.on('packetsend', function (packet) {
  1189. if (packet.cmd === 'subscribe') {
  1190. client.end()
  1191. done()
  1192. }
  1193. })
  1194. })
  1195. it('should emit a packetreceive event', function (done) {
  1196. var client = connect()
  1197. var testTopic = 'testTopic'
  1198. client.once('connect', function () {
  1199. client.subscribe(testTopic)
  1200. })
  1201. client.on('packetreceive', function (packet) {
  1202. if (packet.cmd === 'suback') {
  1203. client.end()
  1204. done()
  1205. }
  1206. })
  1207. })
  1208. it('should accept an array of unsubs', function (done) {
  1209. var client = connect()
  1210. var topics = ['topic1', 'topic2']
  1211. client.once('connect', function () {
  1212. client.unsubscribe(topics)
  1213. })
  1214. server.once('client', function (serverClient) {
  1215. serverClient.once('unsubscribe', function (packet) {
  1216. packet.unsubscriptions.should.eql(topics)
  1217. done()
  1218. })
  1219. })
  1220. })
  1221. it('should fire a callback on unsuback', function (done) {
  1222. var client = connect()
  1223. var topic = 'topic'
  1224. client.once('connect', function () {
  1225. client.unsubscribe(topic, done)
  1226. })
  1227. server.once('client', function (serverClient) {
  1228. serverClient.once('unsubscribe', function (packet) {
  1229. serverClient.unsuback(packet)
  1230. client.end()
  1231. })
  1232. })
  1233. })
  1234. it('should unsubscribe from a chinese topic', function (done) {
  1235. var client = connect()
  1236. var topic = '中国'
  1237. client.once('connect', function () {
  1238. client.unsubscribe(topic)
  1239. })
  1240. server.once('client', function (serverClient) {
  1241. serverClient.once('unsubscribe', function (packet) {
  1242. packet.unsubscriptions.should.containEql(topic)
  1243. client.end()
  1244. done()
  1245. })
  1246. })
  1247. })
  1248. })
  1249. describe('keepalive', function () {
  1250. var clock
  1251. beforeEach(function () {
  1252. clock = sinon.useFakeTimers()
  1253. })
  1254. afterEach(function () {
  1255. clock.restore()
  1256. })
  1257. it('should checkPing at keepalive interval', function (done) {
  1258. var interval = 3
  1259. var client = connect({ keepalive: interval })
  1260. client._checkPing = sinon.spy()
  1261. client.once('connect', function () {
  1262. clock.tick(interval * 1000)
  1263. client._checkPing.callCount.should.equal(1)
  1264. clock.tick(interval * 1000)
  1265. client._checkPing.callCount.should.equal(2)
  1266. clock.tick(interval * 1000)
  1267. client._checkPing.callCount.should.equal(3)
  1268. client.end()
  1269. done()
  1270. })
  1271. })
  1272. it('should not checkPing if publishing at a higher rate than keepalive', function (done) {
  1273. var intervalMs = 3000
  1274. var client = connect({keepalive: intervalMs / 1000})
  1275. client._checkPing = sinon.spy()
  1276. client.once('connect', function () {
  1277. client.publish('foo', 'bar')
  1278. clock.tick(intervalMs - 1)
  1279. client.publish('foo', 'bar')
  1280. clock.tick(2)
  1281. client._checkPing.callCount.should.equal(0)
  1282. client.end()
  1283. done()
  1284. })
  1285. })
  1286. it('should checkPing if publishing at a higher rate than keepalive and reschedulePings===false', function (done) {
  1287. var intervalMs = 3000
  1288. var client = connect({
  1289. keepalive: intervalMs / 1000,
  1290. reschedulePings: false
  1291. })
  1292. client._checkPing = sinon.spy()
  1293. client.once('connect', function () {
  1294. client.publish('foo', 'bar')
  1295. clock.tick(intervalMs - 1)
  1296. client.publish('foo', 'bar')
  1297. clock.tick(2)
  1298. client._checkPing.callCount.should.equal(1)
  1299. client.end()
  1300. done()
  1301. })
  1302. })
  1303. })
  1304. describe('pinging', function () {
  1305. it('should set a ping timer', function (done) {
  1306. var client = connect({keepalive: 3})
  1307. client.once('connect', function () {
  1308. should.exist(client.pingTimer)
  1309. client.end()
  1310. done()
  1311. })
  1312. })
  1313. it('should not set a ping timer keepalive=0', function (done) {
  1314. var client = connect({keepalive: 0})
  1315. client.on('connect', function () {
  1316. should.not.exist(client.pingTimer)
  1317. client.end()
  1318. done()
  1319. })
  1320. })
  1321. it('should reconnect if pingresp is not sent', function (done) {
  1322. var client = connect({keepalive: 1, reconnectPeriod: 100})
  1323. // Fake no pingresp being send by stubbing the _handlePingresp function
  1324. client._handlePingresp = function () {}
  1325. client.once('connect', function () {
  1326. client.once('connect', function () {
  1327. client.end()
  1328. done()
  1329. })
  1330. })
  1331. })
  1332. it('should not reconnect if pingresp is successful', function (done) {
  1333. var client = connect({keepalive: 100})
  1334. client.once('close', function () {
  1335. done(new Error('Client closed connection'))
  1336. })
  1337. setTimeout(done, 1000)
  1338. })
  1339. it('should defer the next ping when sending a control packet', function (done) {
  1340. var client = connect({keepalive: 1})
  1341. client.once('connect', function () {
  1342. client._checkPing = sinon.spy()
  1343. client.publish('foo', 'bar')
  1344. setTimeout(function () {
  1345. client._checkPing.callCount.should.equal(0)
  1346. client.publish('foo', 'bar')
  1347. setTimeout(function () {
  1348. client._checkPing.callCount.should.equal(0)
  1349. client.publish('foo', 'bar')
  1350. setTimeout(function () {
  1351. client._checkPing.callCount.should.equal(0)
  1352. done()
  1353. }, 75)
  1354. }, 75)
  1355. }, 75)
  1356. })
  1357. })
  1358. })
  1359. describe('subscribing', function () {
  1360. it('should send a subscribe message (offline)', function (done) {
  1361. var client = connect()
  1362. client.subscribe('test')
  1363. server.once('client', function (serverClient) {
  1364. serverClient.once('subscribe', function () {
  1365. done()
  1366. })
  1367. })
  1368. })
  1369. it('should send a subscribe message', function (done) {
  1370. var client = connect()
  1371. var topic = 'test'
  1372. client.once('connect', function () {
  1373. client.subscribe(topic)
  1374. })
  1375. server.once('client', function (serverClient) {
  1376. serverClient.once('subscribe', function (packet) {
  1377. var result = {
  1378. topic: topic,
  1379. qos: 0
  1380. }
  1381. if (version === 5) {
  1382. result.nl = false
  1383. result.rap = false
  1384. result.rh = 0
  1385. }
  1386. packet.subscriptions.should.containEql(result)
  1387. done()
  1388. })
  1389. })
  1390. })
  1391. it('should emit a packetsend event', function (done) {
  1392. var client = connect()
  1393. var testTopic = 'testTopic'
  1394. client.once('connect', function () {
  1395. client.subscribe(testTopic)
  1396. })
  1397. client.on('packetsend', function (packet) {
  1398. if (packet.cmd === 'subscribe') {
  1399. done()
  1400. }
  1401. })
  1402. })
  1403. it('should emit a packetreceive event', function (done) {
  1404. var client = connect()
  1405. var testTopic = 'testTopic'
  1406. client.once('connect', function () {
  1407. client.subscribe(testTopic)
  1408. })
  1409. client.on('packetreceive', function (packet) {
  1410. if (packet.cmd === 'suback') {
  1411. done()
  1412. }
  1413. })
  1414. })
  1415. it('should accept an array of subscriptions', function (done) {
  1416. var client = connect()
  1417. var subs = ['test1', 'test2']
  1418. client.once('connect', function () {
  1419. client.subscribe(subs)
  1420. })
  1421. server.once('client', function (serverClient) {
  1422. serverClient.once('subscribe', function (packet) {
  1423. // i.e. [{topic: 'a', qos: 0}, {topic: 'b', qos: 0}]
  1424. var expected = subs.map(function (i) {
  1425. var result = {topic: i, qos: 0}
  1426. if (version === 5) {
  1427. result.nl = false
  1428. result.rap = false
  1429. result.rh = 0
  1430. }
  1431. return result
  1432. })
  1433. packet.subscriptions.should.eql(expected)
  1434. done()
  1435. })
  1436. })
  1437. })
  1438. it('should accept an hash of subscriptions', function (done) {
  1439. var client = connect()
  1440. var topics = {
  1441. test1: {qos: 0},
  1442. test2: {qos: 1}
  1443. }
  1444. client.once('connect', function () {
  1445. client.subscribe(topics)
  1446. })
  1447. server.once('client', function (serverClient) {
  1448. serverClient.once('subscribe', function (packet) {
  1449. var k
  1450. var expected = []
  1451. for (k in topics) {
  1452. if (topics.hasOwnProperty(k)) {
  1453. var result = {
  1454. topic: k,
  1455. qos: topics[k].qos
  1456. }
  1457. if (version === 5) {
  1458. result.nl = false
  1459. result.rap = false
  1460. result.rh = 0
  1461. }
  1462. expected.push(result)
  1463. }
  1464. }
  1465. packet.subscriptions.should.eql(expected)
  1466. done()
  1467. })
  1468. })
  1469. })
  1470. it('should accept an options parameter', function (done) {
  1471. var client = connect()
  1472. var topic = 'test'
  1473. var opts = {qos: 1}
  1474. client.once('connect', function () {
  1475. client.subscribe(topic, opts)
  1476. })
  1477. server.once('client', function (serverClient) {
  1478. serverClient.once('subscribe', function (packet) {
  1479. var expected = [{
  1480. topic: topic,
  1481. qos: 1
  1482. }]
  1483. if (version === 5) {
  1484. expected[0].nl = false
  1485. expected[0].rap = false
  1486. expected[0].rh = 0
  1487. }
  1488. packet.subscriptions.should.eql(expected)
  1489. done()
  1490. })
  1491. })
  1492. })
  1493. it('should subscribe with the default options for an empty options parameter', function (done) {
  1494. var client = connect()
  1495. var topic = 'test'
  1496. var defaultOpts = {qos: 0}
  1497. client.once('connect', function () {
  1498. client.subscribe(topic, {})
  1499. })
  1500. server.once('client', function (serverClient) {
  1501. serverClient.once('subscribe', function (packet) {
  1502. var result = {
  1503. topic: topic,
  1504. qos: defaultOpts.qos
  1505. }
  1506. if (version === 5) {
  1507. result.nl = false
  1508. result.rap = false
  1509. result.rh = 0
  1510. }
  1511. packet.subscriptions.should.containEql(result)
  1512. done()
  1513. })
  1514. })
  1515. })
  1516. it('should fire a callback on suback', function (done) {
  1517. var client = connect()
  1518. var topic = 'test'
  1519. client.once('connect', function () {
  1520. client.subscribe(topic, { qos: 2 }, function (err, granted) {
  1521. if (err) {
  1522. done(err)
  1523. } else {
  1524. should.exist(granted, 'granted not given')
  1525. var result = {topic: 'test', qos: 2}
  1526. if (version === 5) {
  1527. result.nl = false
  1528. result.rap = false
  1529. result.rh = 0
  1530. result.properties = undefined
  1531. }
  1532. granted.should.containEql(result)
  1533. done()
  1534. }
  1535. })
  1536. })
  1537. })
  1538. it('should fire a callback with error if disconnected (options provided)', function (done) {
  1539. var client = connect()
  1540. var topic = 'test'
  1541. client.once('connect', function () {
  1542. client.end(true, function () {
  1543. client.subscribe(topic, {qos: 2}, function (err, granted) {
  1544. should.not.exist(granted, 'granted given')
  1545. should.exist(err, 'no error given')
  1546. done()
  1547. })
  1548. })
  1549. })
  1550. })
  1551. it('should fire a callback with error if disconnected (options not provided)', function (done) {
  1552. var client = connect()
  1553. var topic = 'test'
  1554. client.once('connect', function () {
  1555. client.end(true, function () {
  1556. client.subscribe(topic, function (err, granted) {
  1557. should.not.exist(granted, 'granted given')
  1558. should.exist(err, 'no error given')
  1559. done()
  1560. })
  1561. })
  1562. })
  1563. })
  1564. it('should subscribe with a chinese topic', function (done) {
  1565. var client = connect()
  1566. var topic = '中国'
  1567. client.once('connect', function () {
  1568. client.subscribe(topic)
  1569. })
  1570. server.once('client', function (serverClient) {
  1571. serverClient.once('subscribe', function (packet) {
  1572. var result = {
  1573. topic: topic,
  1574. qos: 0
  1575. }
  1576. if (version === 5) {
  1577. result.nl = false
  1578. result.rap = false
  1579. result.rh = 0
  1580. }
  1581. packet.subscriptions.should.containEql(result)
  1582. done()
  1583. })
  1584. })
  1585. })
  1586. })
  1587. describe('receiving messages', function () {
  1588. it('should fire the message event', function (done) {
  1589. var client = connect()
  1590. var testPacket = {
  1591. topic: 'test',
  1592. payload: 'message',
  1593. retain: true,
  1594. qos: 1,
  1595. messageId: 5
  1596. }
  1597. client.subscribe(testPacket.topic)
  1598. client.once('message', function (topic, message, packet) {
  1599. topic.should.equal(testPacket.topic)
  1600. message.toString().should.equal(testPacket.payload)
  1601. packet.should.equal(packet)
  1602. client.end()
  1603. done()
  1604. })
  1605. server.once('client', function (serverClient) {
  1606. serverClient.on('subscribe', function () {
  1607. serverClient.publish(testPacket)
  1608. })
  1609. })
  1610. })
  1611. it('should emit a packetreceive event', function (done) {
  1612. var client = connect()
  1613. var testPacket = {
  1614. topic: 'test',
  1615. payload: 'message',
  1616. retain: true,
  1617. qos: 1,
  1618. messageId: 5
  1619. }
  1620. client.subscribe(testPacket.topic)
  1621. client.on('packetreceive', function (packet) {
  1622. if (packet.cmd === 'publish') {
  1623. packet.qos.should.equal(1)
  1624. packet.topic.should.equal(testPacket.topic)
  1625. packet.payload.toString().should.equal(testPacket.payload)
  1626. packet.retain.should.equal(true)
  1627. client.end()
  1628. done()
  1629. }
  1630. })
  1631. server.once('client', function (serverClient) {
  1632. serverClient.on('subscribe', function () {
  1633. serverClient.publish(testPacket)
  1634. })
  1635. })
  1636. })
  1637. it('should support binary data', function (done) {
  1638. var client = connect({ encoding: 'binary' })
  1639. var testPacket = {
  1640. topic: 'test',
  1641. payload: 'message',
  1642. retain: true,
  1643. qos: 1,
  1644. messageId: 5
  1645. }
  1646. client.subscribe(testPacket.topic)
  1647. client.once('message', function (topic, message, packet) {
  1648. topic.should.equal(testPacket.topic)
  1649. message.should.be.an.instanceOf(Buffer)
  1650. message.toString().should.equal(testPacket.payload)
  1651. packet.should.equal(packet)
  1652. done()
  1653. })
  1654. server.once('client', function (serverClient) {
  1655. serverClient.on('subscribe', function () {
  1656. serverClient.publish(testPacket)
  1657. })
  1658. })
  1659. })
  1660. it('should emit a message event (qos=2)', function (done) {
  1661. var client = connect()
  1662. var testPacket = {
  1663. topic: 'test',
  1664. payload: 'message',
  1665. retain: true,
  1666. qos: 2,
  1667. messageId: 5
  1668. }
  1669. server.testPublish = testPacket
  1670. client.subscribe(testPacket.topic)
  1671. client.once('message', function (topic, message, packet) {
  1672. topic.should.equal(testPacket.topic)
  1673. message.toString().should.equal(testPacket.payload)
  1674. packet.should.equal(packet)
  1675. done()
  1676. })
  1677. server.once('client', function (serverClient) {
  1678. serverClient.on('subscribe', function () {
  1679. serverClient.publish(testPacket)
  1680. })
  1681. })
  1682. })
  1683. it('should emit a message event (qos=2) - repeated publish', function (done) {
  1684. var client = connect()
  1685. var testPacket = {
  1686. topic: 'test',
  1687. payload: 'message',
  1688. retain: true,
  1689. qos: 2,
  1690. messageId: 5
  1691. }
  1692. server.testPublish = testPacket
  1693. client.subscribe(testPacket.topic)
  1694. client.on('message', function (topic, message, packet) {
  1695. topic.should.equal(testPacket.topic)
  1696. message.toString().should.equal(testPacket.payload)
  1697. packet.should.equal(packet)
  1698. done()
  1699. })
  1700. server.once('client', function (serverClient) {
  1701. serverClient.on('subscribe', function () {
  1702. serverClient.publish(testPacket)
  1703. // twice, should be ignored
  1704. serverClient.publish(testPacket)
  1705. })
  1706. })
  1707. })
  1708. it('should support chinese topic', function (done) {
  1709. var client = connect({ encoding: 'binary' })
  1710. var testPacket = {
  1711. topic: '国',
  1712. payload: 'message',
  1713. retain: true,
  1714. qos: 1,
  1715. messageId: 5
  1716. }
  1717. client.subscribe(testPacket.topic)
  1718. client.once('message', function (topic, message, packet) {
  1719. topic.should.equal(testPacket.topic)
  1720. message.should.be.an.instanceOf(Buffer)
  1721. message.toString().should.equal(testPacket.payload)
  1722. packet.should.equal(packet)
  1723. done()
  1724. })
  1725. server.once('client', function (serverClient) {
  1726. serverClient.on('subscribe', function () {
  1727. serverClient.publish(testPacket)
  1728. })
  1729. })
  1730. })
  1731. })
  1732. describe('qos handling', function () {
  1733. it('should follow qos 0 semantics (trivial)', function (done) {
  1734. var client = connect()
  1735. var testTopic = 'test'
  1736. var testMessage = 'message'
  1737. client.once('connect', function () {
  1738. client.subscribe(testTopic, {qos: 0})
  1739. })
  1740. server.once('client', function (serverClient) {
  1741. serverClient.once('subscribe', function () {
  1742. serverClient.publish({
  1743. topic: testTopic,
  1744. payload: testMessage,
  1745. qos: 0,
  1746. retain: false
  1747. })
  1748. done()
  1749. })
  1750. })
  1751. })
  1752. it('should follow qos 1 semantics', function (done) {
  1753. var client = connect()
  1754. var testTopic = 'test'
  1755. var testMessage = 'message'
  1756. var mid = 50
  1757. client.once('connect', function () {
  1758. client.subscribe(testTopic, {qos: 1})
  1759. })
  1760. server.once('client', function (serverClient) {
  1761. serverClient.once('subscribe', function () {
  1762. serverClient.publish({
  1763. topic: testTopic,
  1764. payload: testMessage,
  1765. messageId: mid,
  1766. qos: 1
  1767. })
  1768. })
  1769. serverClient.once('puback', function (packet) {
  1770. packet.messageId.should.equal(mid)
  1771. done()
  1772. })
  1773. })
  1774. })
  1775. it('should follow qos 2 semantics', function (done) {
  1776. var client = connect()
  1777. var testTopic = 'test'
  1778. var testMessage = 'message'
  1779. var mid = 253
  1780. var publishReceived = false
  1781. var pubrecReceived = false
  1782. var pubrelReceived = false
  1783. client.once('connect', function () {
  1784. client.subscribe(testTopic, {qos: 2})
  1785. })
  1786. client.on('packetreceive', (packet) => {
  1787. switch (packet.cmd) {
  1788. case 'connack':
  1789. case 'suback':
  1790. // expected, but not specifically part of QOS 2 semantics
  1791. break
  1792. case 'publish':
  1793. pubrecReceived.should.be.false()
  1794. pubrelReceived.should.be.false()
  1795. publishReceived = true
  1796. break
  1797. case 'pubrel':
  1798. publishReceived.should.be.true()
  1799. pubrecReceived.should.be.true()
  1800. pubrelReceived = true
  1801. break
  1802. default:
  1803. should.fail()
  1804. }
  1805. })
  1806. server.once('client', function (serverClient) {
  1807. serverClient.once('subscribe', function () {
  1808. serverClient.publish({
  1809. topic: testTopic,
  1810. payload: testMessage,
  1811. qos: 2,
  1812. messageId: mid
  1813. })
  1814. })
  1815. serverClient.on('pubrec', function () {
  1816. publishReceived.should.be.true()
  1817. pubrelReceived.should.be.false()
  1818. pubrecReceived = true
  1819. })
  1820. serverClient.once('pubcomp', function () {
  1821. client.removeAllListeners()
  1822. serverClient.removeAllListeners()
  1823. publishReceived.should.be.true()
  1824. pubrecReceived.should.be.true()
  1825. pubrelReceived.should.be.true()
  1826. done()
  1827. })
  1828. })
  1829. })
  1830. it('should should empty the incoming store after a qos 2 handshake is completed', function (done) {
  1831. var client = connect()
  1832. var testTopic = 'test'
  1833. var testMessage = 'message'
  1834. var mid = 253
  1835. client.once('connect', function () {
  1836. client.subscribe(testTopic, {qos: 2})
  1837. })
  1838. client.on('packetreceive', (packet) => {
  1839. if (packet.cmd === 'pubrel') {
  1840. should(client.incomingStore._inflights.size).be.equal(1)
  1841. }
  1842. })
  1843. server.once('client', function (serverClient) {
  1844. serverClient.once('subscribe', function () {
  1845. serverClient.publish({
  1846. topic: testTopic,
  1847. payload: testMessage,
  1848. qos: 2,
  1849. messageId: mid
  1850. })
  1851. })
  1852. serverClient.once('pubcomp', function () {
  1853. should(client.incomingStore._inflights.size).be.equal(0)
  1854. client.removeAllListeners()
  1855. done()
  1856. })
  1857. })
  1858. })
  1859. function testMultiplePubrel (shouldSendPubcompFail, done) {
  1860. var client = connect()
  1861. var testTopic = 'test'
  1862. var testMessage = 'message'
  1863. var mid = 253
  1864. var pubcompCount = 0
  1865. var pubrelCount = 0
  1866. var handleMessageCount = 0
  1867. var emitMessageCount = 0
  1868. var origSendPacket = client._sendPacket
  1869. var shouldSendFail
  1870. client.handleMessage = function (packet, callback) {
  1871. handleMessageCount++
  1872. callback()
  1873. }
  1874. client.on('message', function () {
  1875. emitMessageCount++
  1876. })
  1877. client._sendPacket = function (packet, sendDone) {
  1878. shouldSendFail = packet.cmd === 'pubcomp' && shouldSendPubcompFail
  1879. if (sendDone) {
  1880. sendDone(shouldSendFail ? new Error('testing pubcomp failure') : undefined)
  1881. }
  1882. // send the mocked response
  1883. switch (packet.cmd) {
  1884. case 'subscribe':
  1885. const suback = {cmd: 'suback', messageId: packet.messageId, granted: [2]}
  1886. client._handlePacket(suback, function (err) {
  1887. should(err).not.be.ok()
  1888. })
  1889. break
  1890. case 'pubrec':
  1891. case 'pubcomp':
  1892. // for both pubrec and pubcomp, reply with pubrel, simulating the server not receiving the pubcomp
  1893. if (packet.cmd === 'pubcomp') {
  1894. pubcompCount++
  1895. if (pubcompCount === 2) {
  1896. // end the test once the client has gone through two rounds of replying to pubrel messages
  1897. pubrelCount.should.be.exactly(2)
  1898. handleMessageCount.should.be.exactly(1)
  1899. emitMessageCount.should.be.exactly(1)
  1900. client._sendPacket = origSendPacket
  1901. done()
  1902. break
  1903. }
  1904. }
  1905. // simulate the pubrel message, either in response to pubrec or to mock pubcomp failing to be received
  1906. const pubrel = {cmd: 'pubrel', messageId: mid}
  1907. pubrelCount++
  1908. client._handlePacket(pubrel, function (err) {
  1909. if (shouldSendFail) {
  1910. should(err).be.ok()
  1911. } else {
  1912. should(err).not.be.ok()
  1913. }
  1914. })
  1915. break
  1916. }
  1917. }
  1918. client.once('connect', function () {
  1919. client.subscribe(testTopic, {qos: 2})
  1920. const publish = {cmd: 'publish', topic: testTopic, payload: testMessage, qos: 2, messageId: mid}
  1921. client._handlePacket(publish, function (err) {
  1922. should(err).not.be.ok()
  1923. })
  1924. })
  1925. }
  1926. it('handle qos 2 messages exactly once when multiple pubrel received', function (done) {
  1927. testMultiplePubrel(false, done)
  1928. })
  1929. it('handle qos 2 messages exactly once when multiple pubrel received and sending pubcomp fails on client', function (done) {
  1930. testMultiplePubrel(true, done)
  1931. })
  1932. })
  1933. describe('auto reconnect', function () {
  1934. it('should mark the client disconnecting if #end called', function () {
  1935. var client = connect()
  1936. client.end()
  1937. client.disconnecting.should.eql(true)
  1938. })
  1939. it('should reconnect after stream disconnect', function (done) {
  1940. var client = connect()
  1941. var tryReconnect = true
  1942. client.on('connect', function () {
  1943. if (tryReconnect) {
  1944. client.stream.end()
  1945. tryReconnect = false
  1946. } else {
  1947. client.end()
  1948. done()
  1949. }
  1950. })
  1951. })
  1952. it('should emit \'reconnect\' when reconnecting', function (done) {
  1953. var client = connect()
  1954. var tryReconnect = true
  1955. var reconnectEvent = false
  1956. client.on('reconnect', function () {
  1957. reconnectEvent = true
  1958. })
  1959. client.on('connect', function () {
  1960. if (tryReconnect) {
  1961. client.stream.end()
  1962. tryReconnect = false
  1963. } else {
  1964. reconnectEvent.should.equal(true)
  1965. client.end()
  1966. done()
  1967. }
  1968. })
  1969. })
  1970. it('should emit \'offline\' after going offline', function (done) {
  1971. var client = connect()
  1972. var tryReconnect = true
  1973. var offlineEvent = false
  1974. client.on('offline', function () {
  1975. offlineEvent = true
  1976. })
  1977. client.on('connect', function () {
  1978. if (tryReconnect) {
  1979. client.stream.end()
  1980. tryReconnect = false
  1981. } else {
  1982. offlineEvent.should.equal(true)
  1983. client.end()
  1984. done()
  1985. }
  1986. })
  1987. })
  1988. it('should not reconnect if it was ended by the user', function (done) {
  1989. var client = connect()
  1990. client.on('connect', function () {
  1991. client.end()
  1992. done() // it will raise an exception if called two times
  1993. })
  1994. })
  1995. it('should setup a reconnect timer on disconnect', function (done) {
  1996. var client = connect()
  1997. client.once('connect', function () {
  1998. should.not.exist(client.reconnectTimer)
  1999. client.stream.end()
  2000. })
  2001. client.once('close', function () {
  2002. should.exist(client.reconnectTimer)
  2003. client.end()
  2004. done()
  2005. })
  2006. })
  2007. it('should allow specification of a reconnect period', function (done) {
  2008. var end
  2009. var period = 200
  2010. var client = connect({reconnectPeriod: period})
  2011. var reconnect = false
  2012. var start = Date.now()
  2013. client.on('connect', function () {
  2014. if (!reconnect) {
  2015. client.stream.end()
  2016. reconnect = true
  2017. } else {
  2018. client.end()
  2019. end = Date.now()
  2020. if (end - start >= period) {
  2021. // Connected in about 2 seconds, that's good enough
  2022. done()
  2023. } else {
  2024. done(new Error('Strange reconnect period'))
  2025. }
  2026. }
  2027. })
  2028. })
  2029. it('should always cleanup successfully on reconnection', function (done) {
  2030. var client = connect({host: 'this_hostname_should_not_exist', connectTimeout: 0, reconnectPeriod: 1})
  2031. setTimeout(client.end.bind(client, done), 50)
  2032. })
  2033. it('should resend in-flight QoS 1 publish messages from the client', function (done) {
  2034. var client = connect({reconnectPeriod: 200})
  2035. var serverPublished = false
  2036. var clientCalledBack = false
  2037. server.once('client', function (serverClient) {
  2038. serverClient.on('connect', function () {
  2039. setImmediate(function () {
  2040. serverClient.stream.destroy()
  2041. })
  2042. })
  2043. server.once('client', function (serverClientNew) {
  2044. serverClientNew.on('publish', function () {
  2045. serverPublished = true
  2046. check()
  2047. })
  2048. })
  2049. })
  2050. client.publish('hello', 'world', { qos: 1 }, function () {
  2051. clientCalledBack = true
  2052. check()
  2053. })
  2054. function check () {
  2055. if (serverPublished && clientCalledBack) {
  2056. client.end()
  2057. done()
  2058. }
  2059. }
  2060. })
  2061. it('should not resend in-flight publish messages if disconnecting', function (done) {
  2062. var client = connect({reconnectPeriod: 200})
  2063. var serverPublished = false
  2064. var clientCalledBack = false
  2065. server.once('client', function (serverClient) {
  2066. serverClient.on('connect', function () {
  2067. setImmediate(function () {
  2068. serverClient.stream.destroy()
  2069. client.end()
  2070. serverPublished.should.be.false()
  2071. clientCalledBack.should.be.false()
  2072. done()
  2073. })
  2074. })
  2075. server.once('client', function (serverClientNew) {
  2076. serverClientNew.on('publish', function () {
  2077. serverPublished = true
  2078. })
  2079. })
  2080. })
  2081. client.publish('hello', 'world', { qos: 1 }, function () {
  2082. clientCalledBack = true
  2083. })
  2084. })
  2085. it('should resend in-flight QoS 2 publish messages from the client', function (done) {
  2086. var client = connect({reconnectPeriod: 200})
  2087. var serverPublished = false
  2088. var clientCalledBack = false
  2089. server.once('client', function (serverClient) {
  2090. // ignore errors
  2091. serverClient.on('error', function () {})
  2092. serverClient.on('publish', function () {
  2093. setImmediate(function () {
  2094. serverClient.stream.destroy()
  2095. })
  2096. })
  2097. server.once('client', function (serverClientNew) {
  2098. serverClientNew.on('pubrel', function () {
  2099. serverPublished = true
  2100. check()
  2101. })
  2102. })
  2103. })
  2104. client.publish('hello', 'world', { qos: 2 }, function () {
  2105. clientCalledBack = true
  2106. check()
  2107. })
  2108. function check () {
  2109. if (serverPublished && clientCalledBack) {
  2110. client.end()
  2111. done()
  2112. }
  2113. }
  2114. })
  2115. it('should not resend in-flight QoS 1 removed publish messages from the client', function (done) {
  2116. var client = connect({reconnectPeriod: 200})
  2117. var clientCalledBack = false
  2118. server.once('client', function (serverClient) {
  2119. serverClient.on('connect', function () {
  2120. setImmediate(function () {
  2121. serverClient.stream.destroy()
  2122. })
  2123. })
  2124. server.once('client', function (serverClientNew) {
  2125. serverClientNew.on('publish', function () {
  2126. should.fail()
  2127. done()
  2128. })
  2129. })
  2130. })
  2131. client.publish('hello', 'world', { qos: 1 }, function (err) {
  2132. clientCalledBack = true
  2133. should(err.message).be.equal('Message removed')
  2134. })
  2135. should(Object.keys(client.outgoing).length).be.equal(1)
  2136. should(client.outgoingStore._inflights.size).be.equal(1)
  2137. client.removeOutgoingMessage(client.getLastMessageId())
  2138. should(Object.keys(client.outgoing).length).be.equal(0)
  2139. should(client.outgoingStore._inflights.size).be.equal(0)
  2140. clientCalledBack.should.be.true()
  2141. client.end()
  2142. done()
  2143. })
  2144. it('should not resend in-flight QoS 2 removed publish messages from the client', function (done) {
  2145. var client = connect({reconnectPeriod: 200})
  2146. var clientCalledBack = false
  2147. server.once('client', function (serverClient) {
  2148. serverClient.on('connect', function () {
  2149. setImmediate(function () {
  2150. serverClient.stream.destroy()
  2151. })
  2152. })
  2153. server.once('client', function (serverClientNew) {
  2154. serverClientNew.on('publish', function () {
  2155. should.fail()
  2156. done()
  2157. })
  2158. })
  2159. })
  2160. client.publish('hello', 'world', { qos: 2 }, function (err) {
  2161. clientCalledBack = true
  2162. should(err.message).be.equal('Message removed')
  2163. })
  2164. should(Object.keys(client.outgoing).length).be.equal(1)
  2165. should(client.outgoingStore._inflights.size).be.equal(1)
  2166. client.removeOutgoingMessage(client.getLastMessageId())
  2167. should(Object.keys(client.outgoing).length).be.equal(0)
  2168. should(client.outgoingStore._inflights.size).be.equal(0)
  2169. clientCalledBack.should.be.true()
  2170. client.end()
  2171. done()
  2172. })
  2173. it('should resubscribe when reconnecting', function (done) {
  2174. var client = connect({ reconnectPeriod: 100 })
  2175. var tryReconnect = true
  2176. var reconnectEvent = false
  2177. client.on('reconnect', function () {
  2178. reconnectEvent = true
  2179. })
  2180. client.on('connect', function () {
  2181. if (tryReconnect) {
  2182. client.subscribe('hello', function () {
  2183. client.stream.end()
  2184. server.once('client', function (serverClient) {
  2185. serverClient.on('subscribe', function () {
  2186. client.end()
  2187. done()
  2188. })
  2189. })
  2190. })
  2191. tryReconnect = false
  2192. } else {
  2193. reconnectEvent.should.equal(true)
  2194. }
  2195. })
  2196. })
  2197. it('should not resubscribe when reconnecting if resubscribe is disabled', function (done) {
  2198. var client = connect({ reconnectPeriod: 100, resubscribe: false })
  2199. var tryReconnect = true
  2200. var reconnectEvent = false
  2201. client.on('reconnect', function () {
  2202. reconnectEvent = true
  2203. })
  2204. client.on('connect', function () {
  2205. if (tryReconnect) {
  2206. client.subscribe('hello', function () {
  2207. client.stream.end()
  2208. server.once('client', function (serverClient) {
  2209. serverClient.on('subscribe', function () {
  2210. should.fail()
  2211. })
  2212. })
  2213. })
  2214. tryReconnect = false
  2215. } else {
  2216. reconnectEvent.should.equal(true)
  2217. should(Object.keys(client._resubscribeTopics).length).be.equal(0)
  2218. done()
  2219. }
  2220. })
  2221. })
  2222. it('should not resubscribe when reconnecting if suback is error', function (done) {
  2223. var tryReconnect = true
  2224. var reconnectEvent = false
  2225. var server2 = new Server(function (c) {
  2226. c.on('connect', function (packet) {
  2227. c.connack({returnCode: 0})
  2228. })
  2229. c.on('subscribe', function (packet) {
  2230. c.suback({
  2231. messageId: packet.messageId,
  2232. granted: packet.subscriptions.map(function (e) {
  2233. return e.qos | 0x80
  2234. })
  2235. })
  2236. c.pubrel({ messageId: Math.floor(Math.random() * 9000) + 1000 })
  2237. })
  2238. })
  2239. server2.listen(port + 49, function () {
  2240. var client = mqtt.connect({
  2241. port: port + 49,
  2242. host: 'localhost',
  2243. reconnectPeriod: 100
  2244. })
  2245. client.on('reconnect', function () {
  2246. reconnectEvent = true
  2247. })
  2248. client.on('connect', function () {
  2249. if (tryReconnect) {
  2250. client.subscribe('hello', function () {
  2251. client.stream.end()
  2252. server.once('client', function (serverClient) {
  2253. serverClient.on('subscribe', function () {
  2254. should.fail()
  2255. })
  2256. })
  2257. })
  2258. tryReconnect = false
  2259. } else {
  2260. reconnectEvent.should.equal(true)
  2261. should(Object.keys(client._resubscribeTopics).length).be.equal(0)
  2262. server2.close()
  2263. done()
  2264. }
  2265. })
  2266. })
  2267. })
  2268. it('should preserved incomingStore after disconnecting if clean is false', function (done) {
  2269. var reconnect = false
  2270. var client = {}
  2271. var incomingStore = new mqtt.Store({ clean: false })
  2272. var outgoingStore = new mqtt.Store({ clean: false })
  2273. var server2 = new Server(function (c) {
  2274. c.on('connect', function (packet) {
  2275. c.connack({returnCode: 0})
  2276. if (reconnect) {
  2277. c.pubrel({ messageId: 1 })
  2278. }
  2279. })
  2280. c.on('subscribe', function (packet) {
  2281. c.suback({
  2282. messageId: packet.messageId,
  2283. granted: packet.subscriptions.map(function (e) {
  2284. return e.qos
  2285. })
  2286. })
  2287. c.publish({ topic: 'topic', payload: 'payload', qos: 2, messageId: 1, retain: false })
  2288. })
  2289. c.on('pubrec', function (packet) {
  2290. client.end(false, function () {
  2291. client.reconnect({
  2292. incomingStore: incomingStore,
  2293. outgoingStore: outgoingStore
  2294. })
  2295. })
  2296. })
  2297. c.on('pubcomp', function (packet) {
  2298. client.end()
  2299. server2.close()
  2300. done()
  2301. })
  2302. })
  2303. server2.listen(port + 50, function () {
  2304. client = mqtt.connect({
  2305. port: port + 50,
  2306. host: 'localhost',
  2307. clean: false,
  2308. clientId: 'cid1',
  2309. reconnectPeriod: 0,
  2310. incomingStore: incomingStore,
  2311. outgoingStore: outgoingStore
  2312. })
  2313. client.on('connect', function () {
  2314. if (!reconnect) {
  2315. client.subscribe('test', {qos: 2}, function () {
  2316. })
  2317. reconnect = true
  2318. }
  2319. })
  2320. client.on('message', function (topic, message) {
  2321. topic.should.equal('topic')
  2322. message.toString().should.equal('payload')
  2323. })
  2324. })
  2325. })
  2326. it('should clear outgoing if close from server', function (done) {
  2327. var reconnect = false
  2328. var client = {}
  2329. var server2 = new Server(function (c) {
  2330. c.on('connect', function (packet) {
  2331. c.connack({returnCode: 0})
  2332. })
  2333. c.on('subscribe', function (packet) {
  2334. if (reconnect) {
  2335. c.suback({
  2336. messageId: packet.messageId,
  2337. granted: packet.subscriptions.map(function (e) {
  2338. return e.qos
  2339. })
  2340. })
  2341. } else {
  2342. c.destroy()
  2343. }
  2344. })
  2345. })
  2346. server2.listen(port + 50, function () {
  2347. client = mqtt.connect({
  2348. port: port + 50,
  2349. host: 'localhost',
  2350. clean: true,
  2351. clientId: 'cid1',
  2352. reconnectPeriod: 0
  2353. })
  2354. client.on('connect', function () {
  2355. client.subscribe('test', {qos: 2}, function (e) {
  2356. if (!e) {
  2357. client.end()
  2358. }
  2359. })
  2360. })
  2361. client.on('close', function () {
  2362. if (reconnect) {
  2363. server2.close()
  2364. done()
  2365. } else {
  2366. Object.keys(client.outgoing).length.should.equal(0)
  2367. reconnect = true
  2368. client.reconnect()
  2369. }
  2370. })
  2371. })
  2372. })
  2373. it('should resend in-flight QoS 1 publish messages from the client if clean is false', function (done) {
  2374. var reconnect = false
  2375. var client = {}
  2376. var incomingStore = new mqtt.Store({ clean: false })
  2377. var outgoingStore = new mqtt.Store({ clean: false })
  2378. var server2 = new Server(function (c) {
  2379. c.on('connect', function (packet) {
  2380. c.connack({returnCode: 0})
  2381. })
  2382. c.on('publish', function (packet) {
  2383. if (reconnect) {
  2384. server2.close()
  2385. done()
  2386. } else {
  2387. client.end(true, function () {
  2388. client.reconnect({
  2389. incomingStore: incomingStore,
  2390. outgoingStore: outgoingStore
  2391. })
  2392. reconnect = true
  2393. })
  2394. }
  2395. })
  2396. })
  2397. server2.listen(port + 50, function () {
  2398. client = mqtt.connect({
  2399. port: port + 50,
  2400. host: 'localhost',
  2401. clean: false,
  2402. clientId: 'cid1',
  2403. reconnectPeriod: 0,
  2404. incomingStore: incomingStore,
  2405. outgoingStore: outgoingStore
  2406. })
  2407. client.on('connect', function () {
  2408. if (!reconnect) {
  2409. client.publish('topic', 'payload', {qos: 1})
  2410. }
  2411. })
  2412. client.on('error', function () {})
  2413. })
  2414. })
  2415. it('should resend in-flight QoS 2 publish messages from the client if clean is false', function (done) {
  2416. var reconnect = false
  2417. var client = {}
  2418. var incomingStore = new mqtt.Store({ clean: false })
  2419. var outgoingStore = new mqtt.Store({ clean: false })
  2420. var server2 = new Server(function (c) {
  2421. c.on('connect', function (packet) {
  2422. c.connack({returnCode: 0})
  2423. })
  2424. c.on('publish', function (packet) {
  2425. if (reconnect) {
  2426. server2.close()
  2427. done()
  2428. } else {
  2429. client.end(true, function () {
  2430. client.reconnect({
  2431. incomingStore: incomingStore,
  2432. outgoingStore: outgoingStore
  2433. })
  2434. reconnect = true
  2435. })
  2436. }
  2437. })
  2438. })
  2439. server2.listen(port + 50, function () {
  2440. client = mqtt.connect({
  2441. port: port + 50,
  2442. host: 'localhost',
  2443. clean: false,
  2444. clientId: 'cid1',
  2445. reconnectPeriod: 0,
  2446. incomingStore: incomingStore,
  2447. outgoingStore: outgoingStore
  2448. })
  2449. client.on('connect', function () {
  2450. if (!reconnect) {
  2451. client.publish('topic', 'payload', {qos: 2})
  2452. }
  2453. })
  2454. client.on('error', function () {})
  2455. })
  2456. })
  2457. it('should resend in-flight QoS 2 pubrel messages from the client if clean is false', function (done) {
  2458. var reconnect = false
  2459. var client = {}
  2460. var incomingStore = new mqtt.Store({ clean: false })
  2461. var outgoingStore = new mqtt.Store({ clean: false })
  2462. var server2 = new Server(function (c) {
  2463. c.on('connect', function (packet) {
  2464. c.connack({returnCode: 0})
  2465. })
  2466. c.on('publish', function (packet) {
  2467. if (!reconnect) {
  2468. c.pubrec({messageId: packet.messageId})
  2469. }
  2470. })
  2471. c.on('pubrel', function () {
  2472. if (reconnect) {
  2473. server2.close()
  2474. done()
  2475. } else {
  2476. client.end(true, function () {
  2477. client.reconnect({
  2478. incomingStore: incomingStore,
  2479. outgoingStore: outgoingStore
  2480. })
  2481. reconnect = true
  2482. })
  2483. }
  2484. })
  2485. })
  2486. server2.listen(port + 50, function () {
  2487. client = mqtt.connect({
  2488. port: port + 50,
  2489. host: 'localhost',
  2490. clean: false,
  2491. clientId: 'cid1',
  2492. reconnectPeriod: 0,
  2493. incomingStore: incomingStore,
  2494. outgoingStore: outgoingStore
  2495. })
  2496. client.on('connect', function () {
  2497. if (!reconnect) {
  2498. client.publish('topic', 'payload', {qos: 2})
  2499. }
  2500. })
  2501. client.on('error', function () {})
  2502. })
  2503. })
  2504. it('should resend in-flight publish messages by published order', function (done) {
  2505. var publishCount = 0
  2506. var reconnect = false
  2507. var disconnectOnce = true
  2508. var client = {}
  2509. var incomingStore = new mqtt.Store({ clean: false })
  2510. var outgoingStore = new mqtt.Store({ clean: false })
  2511. var server2 = new Server(function (c) {
  2512. // errors are not interesting for this test
  2513. // but they might happen on some platforms
  2514. c.on('error', function () {})
  2515. c.on('connect', function (packet) {
  2516. c.connack({returnCode: 0})
  2517. })
  2518. c.on('publish', function (packet) {
  2519. c.puback({messageId: packet.messageId})
  2520. if (reconnect) {
  2521. switch (publishCount++) {
  2522. case 0:
  2523. packet.payload.toString().should.equal('payload1')
  2524. break
  2525. case 1:
  2526. packet.payload.toString().should.equal('payload2')
  2527. break
  2528. case 2:
  2529. packet.payload.toString().should.equal('payload3')
  2530. server2.close()
  2531. done()
  2532. break
  2533. }
  2534. } else {
  2535. if (disconnectOnce) {
  2536. client.end(true, function () {
  2537. reconnect = true
  2538. client.reconnect({
  2539. incomingStore: incomingStore,
  2540. outgoingStore: outgoingStore
  2541. })
  2542. })
  2543. disconnectOnce = false
  2544. }
  2545. }
  2546. })
  2547. })
  2548. server2.listen(port + 50, function () {
  2549. client = mqtt.connect({
  2550. port: port + 50,
  2551. host: 'localhost',
  2552. clean: false,
  2553. clientId: 'cid1',
  2554. reconnectPeriod: 0,
  2555. incomingStore: incomingStore,
  2556. outgoingStore: outgoingStore
  2557. })
  2558. client.nextId = 65535
  2559. client.on('connect', function () {
  2560. if (!reconnect) {
  2561. client.publish('topic', 'payload1', {qos: 1})
  2562. client.publish('topic', 'payload2', {qos: 1})
  2563. client.publish('topic', 'payload3', {qos: 1})
  2564. }
  2565. })
  2566. client.on('error', function () {})
  2567. })
  2568. })
  2569. it('should be able to pub/sub if reconnect() is called at close handler', function (done) {
  2570. var client = connect({ reconnectPeriod: 0 })
  2571. var tryReconnect = true
  2572. var reconnectEvent = false
  2573. client.on('close', function () {
  2574. if (tryReconnect) {
  2575. tryReconnect = false
  2576. client.reconnect()
  2577. } else {
  2578. reconnectEvent.should.equal(true)
  2579. done()
  2580. }
  2581. })
  2582. client.on('reconnect', function () {
  2583. reconnectEvent = true
  2584. })
  2585. client.on('connect', function () {
  2586. if (tryReconnect) {
  2587. client.end()
  2588. } else {
  2589. client.subscribe('hello', function () {
  2590. client.end()
  2591. })
  2592. }
  2593. })
  2594. })
  2595. it('should be able to pub/sub if reconnect() is called at out of close handler', function (done) {
  2596. var client = connect({ reconnectPeriod: 0 })
  2597. var tryReconnect = true
  2598. var reconnectEvent = false
  2599. client.on('close', function () {
  2600. if (tryReconnect) {
  2601. tryReconnect = false
  2602. setTimeout(function () {
  2603. client.reconnect()
  2604. }, 100)
  2605. } else {
  2606. reconnectEvent.should.equal(true)
  2607. done()
  2608. }
  2609. })
  2610. client.on('reconnect', function () {
  2611. reconnectEvent = true
  2612. })
  2613. client.on('connect', function () {
  2614. if (tryReconnect) {
  2615. client.end()
  2616. } else {
  2617. client.subscribe('hello', function () {
  2618. client.end()
  2619. })
  2620. }
  2621. })
  2622. })
  2623. context('with alternate server client', function () {
  2624. var cachedClientListeners
  2625. var connack = version === 5 ? { reasonCode: 0 } : { returnCode: 0 }
  2626. beforeEach(function () {
  2627. cachedClientListeners = server.listeners('client')
  2628. server.removeAllListeners('client')
  2629. })
  2630. afterEach(function () {
  2631. server.removeAllListeners('client')
  2632. cachedClientListeners.forEach(function (listener) {
  2633. server.on('client', listener)
  2634. })
  2635. })
  2636. it('should resubscribe even if disconnect is before suback', function (done) {
  2637. var client = mqtt.connect(Object.assign({ reconnectPeriod: 100 }, config))
  2638. var subscribeCount = 0
  2639. var connectCount = 0
  2640. server.on('client', function (serverClient) {
  2641. serverClient.on('connect', function () {
  2642. connectCount++
  2643. serverClient.connack(connack)
  2644. })
  2645. serverClient.on('subscribe', function () {
  2646. subscribeCount++
  2647. // disconnect before sending the suback on the first subscribe
  2648. if (subscribeCount === 1) {
  2649. client.stream.end()
  2650. }
  2651. // after the second connection, confirm that the only two
  2652. // subscribes have taken place, then cleanup and exit
  2653. if (connectCount >= 2) {
  2654. subscribeCount.should.equal(2)
  2655. client.end(true, done)
  2656. }
  2657. })
  2658. })
  2659. client.subscribe('hello')
  2660. })
  2661. it('should resubscribe exactly once', function (done) {
  2662. var client = mqtt.connect(Object.assign({ reconnectPeriod: 100 }, config))
  2663. var subscribeCount = 0
  2664. server.on('client', function (serverClient) {
  2665. serverClient.on('connect', function () {
  2666. serverClient.connack(connack)
  2667. })
  2668. serverClient.on('subscribe', function () {
  2669. subscribeCount++
  2670. // disconnect before sending the suback on the first subscribe
  2671. if (subscribeCount === 1) {
  2672. client.stream.end()
  2673. }
  2674. // after the second connection, only two subs
  2675. // subscribes have taken place, then cleanup and exit
  2676. if (subscribeCount === 2) {
  2677. client.end(true, done)
  2678. }
  2679. })
  2680. })
  2681. client.subscribe('hello')
  2682. })
  2683. })
  2684. })
  2685. }