writeToStream.js 30 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117
  1. const protocol = require('./constants')
  2. const empty = Buffer.allocUnsafe(0)
  3. const zeroBuf = Buffer.from([0])
  4. const numbers = require('./numbers')
  5. const nextTick = require('process-nextick-args').nextTick
  6. const debug = require('debug')('mqtt-packet:writeToStream')
  7. const numCache = numbers.cache
  8. const generateNumber = numbers.generateNumber
  9. const generateCache = numbers.generateCache
  10. const genBufVariableByteInt = numbers.genBufVariableByteInt
  11. const generate4ByteBuffer = numbers.generate4ByteBuffer
  12. let writeNumber = writeNumberCached
  13. let toGenerate = true
  14. function generate (packet, stream, opts) {
  15. debug('generate called')
  16. if (stream.cork) {
  17. stream.cork()
  18. nextTick(uncork, stream)
  19. }
  20. if (toGenerate) {
  21. toGenerate = false
  22. generateCache()
  23. }
  24. debug('generate: packet.cmd: %s', packet.cmd)
  25. switch (packet.cmd) {
  26. case 'connect':
  27. return connect(packet, stream, opts)
  28. case 'connack':
  29. return connack(packet, stream, opts)
  30. case 'publish':
  31. return publish(packet, stream, opts)
  32. case 'puback':
  33. case 'pubrec':
  34. case 'pubrel':
  35. case 'pubcomp':
  36. return confirmation(packet, stream, opts)
  37. case 'subscribe':
  38. return subscribe(packet, stream, opts)
  39. case 'suback':
  40. return suback(packet, stream, opts)
  41. case 'unsubscribe':
  42. return unsubscribe(packet, stream, opts)
  43. case 'unsuback':
  44. return unsuback(packet, stream, opts)
  45. case 'pingreq':
  46. case 'pingresp':
  47. return emptyPacket(packet, stream, opts)
  48. case 'disconnect':
  49. return disconnect(packet, stream, opts)
  50. case 'auth':
  51. return auth(packet, stream, opts)
  52. default:
  53. stream.emit('error', new Error('Unknown command'))
  54. return false
  55. }
  56. }
  57. /**
  58. * Controls numbers cache.
  59. * Set to "false" to allocate buffers on-the-flight instead of pre-generated cache
  60. */
  61. Object.defineProperty(generate, 'cacheNumbers', {
  62. get () {
  63. return writeNumber === writeNumberCached
  64. },
  65. set (value) {
  66. if (value) {
  67. if (!numCache || Object.keys(numCache).length === 0) toGenerate = true
  68. writeNumber = writeNumberCached
  69. } else {
  70. toGenerate = false
  71. writeNumber = writeNumberGenerated
  72. }
  73. }
  74. })
  75. function uncork (stream) {
  76. stream.uncork()
  77. }
  78. function connect (packet, stream, opts) {
  79. const settings = packet || {}
  80. const protocolId = settings.protocolId || 'MQTT'
  81. let protocolVersion = settings.protocolVersion || 4
  82. const will = settings.will
  83. let clean = settings.clean
  84. const keepalive = settings.keepalive || 0
  85. const clientId = settings.clientId || ''
  86. const username = settings.username
  87. const password = settings.password
  88. /* mqtt5 new oprions */
  89. const properties = settings.properties
  90. if (clean === undefined) clean = true
  91. let length = 0
  92. // Must be a string and non-falsy
  93. if (!protocolId ||
  94. (typeof protocolId !== 'string' && !Buffer.isBuffer(protocolId))) {
  95. stream.emit('error', new Error('Invalid protocolId'))
  96. return false
  97. } else length += protocolId.length + 2
  98. // Must be 3 or 4 or 5
  99. if (protocolVersion !== 3 && protocolVersion !== 4 && protocolVersion !== 5) {
  100. stream.emit('error', new Error('Invalid protocol version'))
  101. return false
  102. } else length += 1
  103. // ClientId might be omitted in 3.1.1 and 5, but only if cleanSession is set to 1
  104. if ((typeof clientId === 'string' || Buffer.isBuffer(clientId)) &&
  105. (clientId || protocolVersion >= 4) && (clientId || clean)) {
  106. length += Buffer.byteLength(clientId) + 2
  107. } else {
  108. if (protocolVersion < 4) {
  109. stream.emit('error', new Error('clientId must be supplied before 3.1.1'))
  110. return false
  111. }
  112. if ((clean * 1) === 0) {
  113. stream.emit('error', new Error('clientId must be given if cleanSession set to 0'))
  114. return false
  115. }
  116. }
  117. // Must be a two byte number
  118. if (typeof keepalive !== 'number' ||
  119. keepalive < 0 ||
  120. keepalive > 65535 ||
  121. keepalive % 1 !== 0) {
  122. stream.emit('error', new Error('Invalid keepalive'))
  123. return false
  124. } else length += 2
  125. // Connect flags
  126. length += 1
  127. // Properties
  128. if (protocolVersion === 5) {
  129. var propertiesData = getProperties(stream, properties)
  130. if (!propertiesData) { return false }
  131. length += propertiesData.length
  132. }
  133. // If will exists...
  134. if (will) {
  135. // It must be an object
  136. if (typeof will !== 'object') {
  137. stream.emit('error', new Error('Invalid will'))
  138. return false
  139. }
  140. // It must have topic typeof string
  141. if (!will.topic || typeof will.topic !== 'string') {
  142. stream.emit('error', new Error('Invalid will topic'))
  143. return false
  144. } else {
  145. length += Buffer.byteLength(will.topic) + 2
  146. }
  147. // Payload
  148. length += 2 // payload length
  149. if (will.payload) {
  150. if (will.payload.length >= 0) {
  151. if (typeof will.payload === 'string') {
  152. length += Buffer.byteLength(will.payload)
  153. } else {
  154. length += will.payload.length
  155. }
  156. } else {
  157. stream.emit('error', new Error('Invalid will payload'))
  158. return false
  159. }
  160. }
  161. // will properties
  162. var willProperties = {}
  163. if (protocolVersion === 5) {
  164. willProperties = getProperties(stream, will.properties)
  165. if (!willProperties) { return false }
  166. length += willProperties.length
  167. }
  168. }
  169. // Username
  170. let providedUsername = false
  171. if (username != null) {
  172. if (isStringOrBuffer(username)) {
  173. providedUsername = true
  174. length += Buffer.byteLength(username) + 2
  175. } else {
  176. stream.emit('error', new Error('Invalid username'))
  177. return false
  178. }
  179. }
  180. // Password
  181. if (password != null) {
  182. if (!providedUsername) {
  183. stream.emit('error', new Error('Username is required to use password'))
  184. return false
  185. }
  186. if (isStringOrBuffer(password)) {
  187. length += byteLength(password) + 2
  188. } else {
  189. stream.emit('error', new Error('Invalid password'))
  190. return false
  191. }
  192. }
  193. // Generate header
  194. stream.write(protocol.CONNECT_HEADER)
  195. // Generate length
  196. writeVarByteInt(stream, length)
  197. // Generate protocol ID
  198. writeStringOrBuffer(stream, protocolId)
  199. if (settings.bridgeMode) {
  200. protocolVersion += 128
  201. }
  202. stream.write(
  203. protocolVersion === 131
  204. ? protocol.VERSION131
  205. : protocolVersion === 132
  206. ? protocol.VERSION132
  207. : protocolVersion === 4
  208. ? protocol.VERSION4
  209. : protocolVersion === 5
  210. ? protocol.VERSION5
  211. : protocol.VERSION3
  212. )
  213. // Connect flags
  214. let flags = 0
  215. flags |= (username != null) ? protocol.USERNAME_MASK : 0
  216. flags |= (password != null) ? protocol.PASSWORD_MASK : 0
  217. flags |= (will && will.retain) ? protocol.WILL_RETAIN_MASK : 0
  218. flags |= (will && will.qos) ? will.qos << protocol.WILL_QOS_SHIFT : 0
  219. flags |= will ? protocol.WILL_FLAG_MASK : 0
  220. flags |= clean ? protocol.CLEAN_SESSION_MASK : 0
  221. stream.write(Buffer.from([flags]))
  222. // Keepalive
  223. writeNumber(stream, keepalive)
  224. // Properties
  225. if (protocolVersion === 5) {
  226. propertiesData.write()
  227. }
  228. // Client ID
  229. writeStringOrBuffer(stream, clientId)
  230. // Will
  231. if (will) {
  232. if (protocolVersion === 5) {
  233. willProperties.write()
  234. }
  235. writeString(stream, will.topic)
  236. writeStringOrBuffer(stream, will.payload)
  237. }
  238. // Username and password
  239. if (username != null) {
  240. writeStringOrBuffer(stream, username)
  241. }
  242. if (password != null) {
  243. writeStringOrBuffer(stream, password)
  244. }
  245. // This is a small packet that happens only once on a stream
  246. // We assume the stream is always free to receive more data after this
  247. return true
  248. }
  249. function connack (packet, stream, opts) {
  250. const version = opts ? opts.protocolVersion : 4
  251. const settings = packet || {}
  252. const rc = version === 5 ? settings.reasonCode : settings.returnCode
  253. const properties = settings.properties
  254. let length = 2 // length of rc and sessionHeader
  255. // Check return code
  256. if (typeof rc !== 'number') {
  257. stream.emit('error', new Error('Invalid return code'))
  258. return false
  259. }
  260. // mqtt5 properties
  261. let propertiesData = null
  262. if (version === 5) {
  263. propertiesData = getProperties(stream, properties)
  264. if (!propertiesData) { return false }
  265. length += propertiesData.length
  266. }
  267. stream.write(protocol.CONNACK_HEADER)
  268. // length
  269. writeVarByteInt(stream, length)
  270. stream.write(settings.sessionPresent ? protocol.SESSIONPRESENT_HEADER : zeroBuf)
  271. stream.write(Buffer.from([rc]))
  272. if (propertiesData != null) {
  273. propertiesData.write()
  274. }
  275. return true
  276. }
  277. function publish (packet, stream, opts) {
  278. debug('publish: packet: %o', packet)
  279. const version = opts ? opts.protocolVersion : 4
  280. const settings = packet || {}
  281. const qos = settings.qos || 0
  282. const retain = settings.retain ? protocol.RETAIN_MASK : 0
  283. const topic = settings.topic
  284. const payload = settings.payload || empty
  285. const id = settings.messageId
  286. const properties = settings.properties
  287. let length = 0
  288. // Topic must be a non-empty string or Buffer
  289. if (typeof topic === 'string') length += Buffer.byteLength(topic) + 2
  290. else if (Buffer.isBuffer(topic)) length += topic.length + 2
  291. else {
  292. stream.emit('error', new Error('Invalid topic'))
  293. return false
  294. }
  295. // Get the payload length
  296. if (!Buffer.isBuffer(payload)) length += Buffer.byteLength(payload)
  297. else length += payload.length
  298. // Message ID must a number if qos > 0
  299. if (qos && typeof id !== 'number') {
  300. stream.emit('error', new Error('Invalid messageId'))
  301. return false
  302. } else if (qos) length += 2
  303. // mqtt5 properties
  304. let propertiesData = null
  305. if (version === 5) {
  306. propertiesData = getProperties(stream, properties)
  307. if (!propertiesData) { return false }
  308. length += propertiesData.length
  309. }
  310. // Header
  311. stream.write(protocol.PUBLISH_HEADER[qos][settings.dup ? 1 : 0][retain ? 1 : 0])
  312. // Remaining length
  313. writeVarByteInt(stream, length)
  314. // Topic
  315. writeNumber(stream, byteLength(topic))
  316. stream.write(topic)
  317. // Message ID
  318. if (qos > 0) writeNumber(stream, id)
  319. // Properties
  320. if (propertiesData != null) {
  321. propertiesData.write()
  322. }
  323. // Payload
  324. debug('publish: payload: %o', payload)
  325. return stream.write(payload)
  326. }
  327. /* Puback, pubrec, pubrel and pubcomp */
  328. function confirmation (packet, stream, opts) {
  329. const version = opts ? opts.protocolVersion : 4
  330. const settings = packet || {}
  331. const type = settings.cmd || 'puback'
  332. const id = settings.messageId
  333. const dup = (settings.dup && type === 'pubrel') ? protocol.DUP_MASK : 0
  334. let qos = 0
  335. const reasonCode = settings.reasonCode
  336. const properties = settings.properties
  337. let length = version === 5 ? 3 : 2
  338. if (type === 'pubrel') qos = 1
  339. // Check message ID
  340. if (typeof id !== 'number') {
  341. stream.emit('error', new Error('Invalid messageId'))
  342. return false
  343. }
  344. // properies mqtt 5
  345. let propertiesData = null
  346. if (version === 5) {
  347. // Confirm should not add empty property length with no properties (rfc 3.4.2.2.1)
  348. if (typeof properties === 'object') {
  349. propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
  350. if (!propertiesData) { return false }
  351. length += propertiesData.length
  352. }
  353. }
  354. // Header
  355. stream.write(protocol.ACKS[type][qos][dup][0])
  356. // Length
  357. writeVarByteInt(stream, length)
  358. // Message ID
  359. writeNumber(stream, id)
  360. // reason code in header
  361. if (version === 5) {
  362. stream.write(Buffer.from([reasonCode]))
  363. }
  364. // properies mqtt 5
  365. if (propertiesData !== null) {
  366. propertiesData.write()
  367. }
  368. return true
  369. }
  370. function subscribe (packet, stream, opts) {
  371. debug('subscribe: packet: ')
  372. const version = opts ? opts.protocolVersion : 4
  373. const settings = packet || {}
  374. const dup = settings.dup ? protocol.DUP_MASK : 0
  375. const id = settings.messageId
  376. const subs = settings.subscriptions
  377. const properties = settings.properties
  378. let length = 0
  379. // Check message ID
  380. if (typeof id !== 'number') {
  381. stream.emit('error', new Error('Invalid messageId'))
  382. return false
  383. } else length += 2
  384. // properies mqtt 5
  385. let propertiesData = null
  386. if (version === 5) {
  387. propertiesData = getProperties(stream, properties)
  388. if (!propertiesData) { return false }
  389. length += propertiesData.length
  390. }
  391. // Check subscriptions
  392. if (typeof subs === 'object' && subs.length) {
  393. for (let i = 0; i < subs.length; i += 1) {
  394. const itopic = subs[i].topic
  395. const iqos = subs[i].qos
  396. if (typeof itopic !== 'string') {
  397. stream.emit('error', new Error('Invalid subscriptions - invalid topic'))
  398. return false
  399. }
  400. if (typeof iqos !== 'number') {
  401. stream.emit('error', new Error('Invalid subscriptions - invalid qos'))
  402. return false
  403. }
  404. if (version === 5) {
  405. const nl = subs[i].nl || false
  406. if (typeof nl !== 'boolean') {
  407. stream.emit('error', new Error('Invalid subscriptions - invalid No Local'))
  408. return false
  409. }
  410. const rap = subs[i].rap || false
  411. if (typeof rap !== 'boolean') {
  412. stream.emit('error', new Error('Invalid subscriptions - invalid Retain as Published'))
  413. return false
  414. }
  415. const rh = subs[i].rh || 0
  416. if (typeof rh !== 'number' || rh > 2) {
  417. stream.emit('error', new Error('Invalid subscriptions - invalid Retain Handling'))
  418. return false
  419. }
  420. }
  421. length += Buffer.byteLength(itopic) + 2 + 1
  422. }
  423. } else {
  424. stream.emit('error', new Error('Invalid subscriptions'))
  425. return false
  426. }
  427. // Generate header
  428. debug('subscribe: writing to stream: %o', protocol.SUBSCRIBE_HEADER)
  429. stream.write(protocol.SUBSCRIBE_HEADER[1][dup ? 1 : 0][0])
  430. // Generate length
  431. writeVarByteInt(stream, length)
  432. // Generate message ID
  433. writeNumber(stream, id)
  434. // properies mqtt 5
  435. if (propertiesData !== null) {
  436. propertiesData.write()
  437. }
  438. let result = true
  439. // Generate subs
  440. for (const sub of subs) {
  441. const jtopic = sub.topic
  442. const jqos = sub.qos
  443. const jnl = +sub.nl
  444. const jrap = +sub.rap
  445. const jrh = sub.rh
  446. let joptions
  447. // Write topic string
  448. writeString(stream, jtopic)
  449. // options process
  450. joptions = protocol.SUBSCRIBE_OPTIONS_QOS[jqos]
  451. if (version === 5) {
  452. joptions |= jnl ? protocol.SUBSCRIBE_OPTIONS_NL : 0
  453. joptions |= jrap ? protocol.SUBSCRIBE_OPTIONS_RAP : 0
  454. joptions |= jrh ? protocol.SUBSCRIBE_OPTIONS_RH[jrh] : 0
  455. }
  456. // Write options
  457. result = stream.write(Buffer.from([joptions]))
  458. }
  459. return result
  460. }
  461. function suback (packet, stream, opts) {
  462. const version = opts ? opts.protocolVersion : 4
  463. const settings = packet || {}
  464. const id = settings.messageId
  465. const granted = settings.granted
  466. const properties = settings.properties
  467. let length = 0
  468. // Check message ID
  469. if (typeof id !== 'number') {
  470. stream.emit('error', new Error('Invalid messageId'))
  471. return false
  472. } else length += 2
  473. // Check granted qos vector
  474. if (typeof granted === 'object' && granted.length) {
  475. for (let i = 0; i < granted.length; i += 1) {
  476. if (typeof granted[i] !== 'number') {
  477. stream.emit('error', new Error('Invalid qos vector'))
  478. return false
  479. }
  480. length += 1
  481. }
  482. } else {
  483. stream.emit('error', new Error('Invalid qos vector'))
  484. return false
  485. }
  486. // properies mqtt 5
  487. let propertiesData = null
  488. if (version === 5) {
  489. propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
  490. if (!propertiesData) { return false }
  491. length += propertiesData.length
  492. }
  493. // header
  494. stream.write(protocol.SUBACK_HEADER)
  495. // Length
  496. writeVarByteInt(stream, length)
  497. // Message ID
  498. writeNumber(stream, id)
  499. // properies mqtt 5
  500. if (propertiesData !== null) {
  501. propertiesData.write()
  502. }
  503. return stream.write(Buffer.from(granted))
  504. }
  505. function unsubscribe (packet, stream, opts) {
  506. const version = opts ? opts.protocolVersion : 4
  507. const settings = packet || {}
  508. const id = settings.messageId
  509. const dup = settings.dup ? protocol.DUP_MASK : 0
  510. const unsubs = settings.unsubscriptions
  511. const properties = settings.properties
  512. let length = 0
  513. // Check message ID
  514. if (typeof id !== 'number') {
  515. stream.emit('error', new Error('Invalid messageId'))
  516. return false
  517. } else {
  518. length += 2
  519. }
  520. // Check unsubs
  521. if (typeof unsubs === 'object' && unsubs.length) {
  522. for (let i = 0; i < unsubs.length; i += 1) {
  523. if (typeof unsubs[i] !== 'string') {
  524. stream.emit('error', new Error('Invalid unsubscriptions'))
  525. return false
  526. }
  527. length += Buffer.byteLength(unsubs[i]) + 2
  528. }
  529. } else {
  530. stream.emit('error', new Error('Invalid unsubscriptions'))
  531. return false
  532. }
  533. // properies mqtt 5
  534. let propertiesData = null
  535. if (version === 5) {
  536. propertiesData = getProperties(stream, properties)
  537. if (!propertiesData) { return false }
  538. length += propertiesData.length
  539. }
  540. // Header
  541. stream.write(protocol.UNSUBSCRIBE_HEADER[1][dup ? 1 : 0][0])
  542. // Length
  543. writeVarByteInt(stream, length)
  544. // Message ID
  545. writeNumber(stream, id)
  546. // properies mqtt 5
  547. if (propertiesData !== null) {
  548. propertiesData.write()
  549. }
  550. // Unsubs
  551. let result = true
  552. for (let j = 0; j < unsubs.length; j++) {
  553. result = writeString(stream, unsubs[j])
  554. }
  555. return result
  556. }
  557. function unsuback (packet, stream, opts) {
  558. const version = opts ? opts.protocolVersion : 4
  559. const settings = packet || {}
  560. const id = settings.messageId
  561. const dup = settings.dup ? protocol.DUP_MASK : 0
  562. const granted = settings.granted
  563. const properties = settings.properties
  564. const type = settings.cmd
  565. const qos = 0
  566. let length = 2
  567. // Check message ID
  568. if (typeof id !== 'number') {
  569. stream.emit('error', new Error('Invalid messageId'))
  570. return false
  571. }
  572. // Check granted
  573. if (version === 5) {
  574. if (typeof granted === 'object' && granted.length) {
  575. for (let i = 0; i < granted.length; i += 1) {
  576. if (typeof granted[i] !== 'number') {
  577. stream.emit('error', new Error('Invalid qos vector'))
  578. return false
  579. }
  580. length += 1
  581. }
  582. } else {
  583. stream.emit('error', new Error('Invalid qos vector'))
  584. return false
  585. }
  586. }
  587. // properies mqtt 5
  588. let propertiesData = null
  589. if (version === 5) {
  590. propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
  591. if (!propertiesData) { return false }
  592. length += propertiesData.length
  593. }
  594. // Header
  595. stream.write(protocol.ACKS[type][qos][dup][0])
  596. // Length
  597. writeVarByteInt(stream, length)
  598. // Message ID
  599. writeNumber(stream, id)
  600. // properies mqtt 5
  601. if (propertiesData !== null) {
  602. propertiesData.write()
  603. }
  604. // payload
  605. if (version === 5) {
  606. stream.write(Buffer.from(granted))
  607. }
  608. return true
  609. }
  610. function emptyPacket (packet, stream, opts) {
  611. return stream.write(protocol.EMPTY[packet.cmd])
  612. }
  613. function disconnect (packet, stream, opts) {
  614. const version = opts ? opts.protocolVersion : 4
  615. const settings = packet || {}
  616. const reasonCode = settings.reasonCode
  617. const properties = settings.properties
  618. let length = version === 5 ? 1 : 0
  619. // properies mqtt 5
  620. let propertiesData = null
  621. if (version === 5) {
  622. propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
  623. if (!propertiesData) { return false }
  624. length += propertiesData.length
  625. }
  626. // Header
  627. stream.write(Buffer.from([protocol.codes.disconnect << 4]))
  628. // Length
  629. writeVarByteInt(stream, length)
  630. // reason code in header
  631. if (version === 5) {
  632. stream.write(Buffer.from([reasonCode]))
  633. }
  634. // properies mqtt 5
  635. if (propertiesData !== null) {
  636. propertiesData.write()
  637. }
  638. return true
  639. }
  640. function auth (packet, stream, opts) {
  641. const version = opts ? opts.protocolVersion : 4
  642. const settings = packet || {}
  643. const reasonCode = settings.reasonCode
  644. const properties = settings.properties
  645. let length = version === 5 ? 1 : 0
  646. if (version !== 5) stream.emit('error', new Error('Invalid mqtt version for auth packet'))
  647. // properies mqtt 5
  648. const propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
  649. if (!propertiesData) { return false }
  650. length += propertiesData.length
  651. // Header
  652. stream.write(Buffer.from([protocol.codes.auth << 4]))
  653. // Length
  654. writeVarByteInt(stream, length)
  655. // reason code in header
  656. stream.write(Buffer.from([reasonCode]))
  657. // properies mqtt 5
  658. if (propertiesData !== null) {
  659. propertiesData.write()
  660. }
  661. return true
  662. }
  663. /**
  664. * writeVarByteInt - write an MQTT style variable byte integer to the buffer
  665. *
  666. * @param <Buffer> buffer - destination
  667. * @param <Number> pos - offset
  668. * @param <Number> length - length (>0)
  669. * @returns <Number> number of bytes written
  670. *
  671. * @api private
  672. */
  673. const varByteIntCache = {}
  674. function writeVarByteInt (stream, num) {
  675. if (num > protocol.VARBYTEINT_MAX) {
  676. stream.emit('error', new Error(`Invalid variable byte integer: ${num}`))
  677. return false
  678. }
  679. let buffer = varByteIntCache[num]
  680. if (!buffer) {
  681. buffer = genBufVariableByteInt(num)
  682. if (num < 16384) varByteIntCache[num] = buffer
  683. }
  684. debug('writeVarByteInt: writing to stream: %o', buffer)
  685. return stream.write(buffer)
  686. }
  687. /**
  688. * writeString - write a utf8 string to the buffer
  689. *
  690. * @param <Buffer> buffer - destination
  691. * @param <Number> pos - offset
  692. * @param <String> string - string to write
  693. * @return <Number> number of bytes written
  694. *
  695. * @api private
  696. */
  697. function writeString (stream, string) {
  698. const strlen = Buffer.byteLength(string)
  699. writeNumber(stream, strlen)
  700. debug('writeString: %s', string)
  701. return stream.write(string, 'utf8')
  702. }
  703. /**
  704. * writeStringPair - write a utf8 string pairs to the buffer
  705. *
  706. * @param <Buffer> buffer - destination
  707. * @param <String> name - string name to write
  708. * @param <String> value - string value to write
  709. * @return <Number> number of bytes written
  710. *
  711. * @api private
  712. */
  713. function writeStringPair (stream, name, value) {
  714. writeString(stream, name)
  715. writeString(stream, value)
  716. }
  717. /**
  718. * writeNumber - write a two byte number to the buffer
  719. *
  720. * @param <Buffer> buffer - destination
  721. * @param <Number> pos - offset
  722. * @param <String> number - number to write
  723. * @return <Number> number of bytes written
  724. *
  725. * @api private
  726. */
  727. function writeNumberCached (stream, number) {
  728. debug('writeNumberCached: number: %d', number)
  729. debug('writeNumberCached: %o', numCache[number])
  730. return stream.write(numCache[number])
  731. }
  732. function writeNumberGenerated (stream, number) {
  733. const generatedNumber = generateNumber(number)
  734. debug('writeNumberGenerated: %o', generatedNumber)
  735. return stream.write(generatedNumber)
  736. }
  737. function write4ByteNumber (stream, number) {
  738. const generated4ByteBuffer = generate4ByteBuffer(number)
  739. debug('write4ByteNumber: %o', generated4ByteBuffer)
  740. return stream.write(generated4ByteBuffer)
  741. }
  742. /**
  743. * writeStringOrBuffer - write a String or Buffer with the its length prefix
  744. *
  745. * @param <Buffer> buffer - destination
  746. * @param <Number> pos - offset
  747. * @param <String> toWrite - String or Buffer
  748. * @return <Number> number of bytes written
  749. */
  750. function writeStringOrBuffer (stream, toWrite) {
  751. if (typeof toWrite === 'string') {
  752. writeString(stream, toWrite)
  753. } else if (toWrite) {
  754. writeNumber(stream, toWrite.length)
  755. stream.write(toWrite)
  756. } else writeNumber(stream, 0)
  757. }
  758. function getProperties (stream, properties) {
  759. /* connect properties */
  760. if (typeof properties !== 'object' || properties.length != null) {
  761. return {
  762. length: 1,
  763. write () {
  764. writeProperties(stream, {}, 0)
  765. }
  766. }
  767. }
  768. let propertiesLength = 0
  769. function getLengthProperty (name, value) {
  770. const type = protocol.propertiesTypes[name]
  771. let length = 0
  772. switch (type) {
  773. case 'byte': {
  774. if (typeof value !== 'boolean') {
  775. stream.emit('error', new Error(`Invalid ${name}: ${value}`))
  776. return false
  777. }
  778. length += 1 + 1
  779. break
  780. }
  781. case 'int8': {
  782. if (typeof value !== 'number' || value < 0 || value > 0xff) {
  783. stream.emit('error', new Error(`Invalid ${name}: ${value}`))
  784. return false
  785. }
  786. length += 1 + 1
  787. break
  788. }
  789. case 'binary': {
  790. if (value && value === null) {
  791. stream.emit('error', new Error(`Invalid ${name}: ${value}`))
  792. return false
  793. }
  794. length += 1 + Buffer.byteLength(value) + 2
  795. break
  796. }
  797. case 'int16': {
  798. if (typeof value !== 'number' || value < 0 || value > 0xffff) {
  799. stream.emit('error', new Error(`Invalid ${name}: ${value}`))
  800. return false
  801. }
  802. length += 1 + 2
  803. break
  804. }
  805. case 'int32': {
  806. if (typeof value !== 'number' || value < 0 || value > 0xffffffff) {
  807. stream.emit('error', new Error(`Invalid ${name}: ${value}`))
  808. return false
  809. }
  810. length += 1 + 4
  811. break
  812. }
  813. case 'var': {
  814. // var byte integer is max 24 bits packed in 32 bits
  815. if (typeof value !== 'number' || value < 0 || value > 0x0fffffff) {
  816. stream.emit('error', new Error(`Invalid ${name}: ${value}`))
  817. return false
  818. }
  819. length += 1 + Buffer.byteLength(genBufVariableByteInt(value))
  820. break
  821. }
  822. case 'string': {
  823. if (typeof value !== 'string') {
  824. stream.emit('error', new Error(`Invalid ${name}: ${value}`))
  825. return false
  826. }
  827. length += 1 + 2 + Buffer.byteLength(value.toString())
  828. break
  829. }
  830. case 'pair': {
  831. if (typeof value !== 'object') {
  832. stream.emit('error', new Error(`Invalid ${name}: ${value}`))
  833. return false
  834. }
  835. length += Object.getOwnPropertyNames(value).reduce((result, name) => {
  836. const currentValue = value[name]
  837. if (Array.isArray(currentValue)) {
  838. result += currentValue.reduce((currentLength, value) => {
  839. currentLength += 1 + 2 + Buffer.byteLength(name.toString()) + 2 + Buffer.byteLength(value.toString())
  840. return currentLength
  841. }, 0)
  842. } else {
  843. result += 1 + 2 + Buffer.byteLength(name.toString()) + 2 + Buffer.byteLength(value[name].toString())
  844. }
  845. return result
  846. }, 0)
  847. break
  848. }
  849. default: {
  850. stream.emit('error', new Error(`Invalid property ${name}: ${value}`))
  851. return false
  852. }
  853. }
  854. return length
  855. }
  856. if (properties) {
  857. for (const propName in properties) {
  858. let propLength = 0
  859. let propValueLength = 0
  860. const propValue = properties[propName]
  861. if (Array.isArray(propValue)) {
  862. for (let valueIndex = 0; valueIndex < propValue.length; valueIndex++) {
  863. propValueLength = getLengthProperty(propName, propValue[valueIndex])
  864. if (!propValueLength) { return false }
  865. propLength += propValueLength
  866. }
  867. } else {
  868. propValueLength = getLengthProperty(propName, propValue)
  869. if (!propValueLength) { return false }
  870. propLength = propValueLength
  871. }
  872. if (!propLength) return false
  873. propertiesLength += propLength
  874. }
  875. }
  876. const propertiesLengthLength = Buffer.byteLength(genBufVariableByteInt(propertiesLength))
  877. return {
  878. length: propertiesLengthLength + propertiesLength,
  879. write () {
  880. writeProperties(stream, properties, propertiesLength)
  881. }
  882. }
  883. }
  884. function getPropertiesByMaximumPacketSize (stream, properties, opts, length) {
  885. const mayEmptyProps = ['reasonString', 'userProperties']
  886. const maximumPacketSize = opts && opts.properties && opts.properties.maximumPacketSize ? opts.properties.maximumPacketSize : 0
  887. let propertiesData = getProperties(stream, properties)
  888. if (maximumPacketSize) {
  889. while (length + propertiesData.length > maximumPacketSize) {
  890. const currentMayEmptyProp = mayEmptyProps.shift()
  891. if (currentMayEmptyProp && properties[currentMayEmptyProp]) {
  892. delete properties[currentMayEmptyProp]
  893. propertiesData = getProperties(stream, properties)
  894. } else {
  895. return false
  896. }
  897. }
  898. }
  899. return propertiesData
  900. }
  901. function writeProperty (stream, propName, value) {
  902. const type = protocol.propertiesTypes[propName]
  903. switch (type) {
  904. case 'byte': {
  905. stream.write(Buffer.from([protocol.properties[propName]]))
  906. stream.write(Buffer.from([+value]))
  907. break
  908. }
  909. case 'int8': {
  910. stream.write(Buffer.from([protocol.properties[propName]]))
  911. stream.write(Buffer.from([value]))
  912. break
  913. }
  914. case 'binary': {
  915. stream.write(Buffer.from([protocol.properties[propName]]))
  916. writeStringOrBuffer(stream, value)
  917. break
  918. }
  919. case 'int16': {
  920. stream.write(Buffer.from([protocol.properties[propName]]))
  921. writeNumber(stream, value)
  922. break
  923. }
  924. case 'int32': {
  925. stream.write(Buffer.from([protocol.properties[propName]]))
  926. write4ByteNumber(stream, value)
  927. break
  928. }
  929. case 'var': {
  930. stream.write(Buffer.from([protocol.properties[propName]]))
  931. writeVarByteInt(stream, value)
  932. break
  933. }
  934. case 'string': {
  935. stream.write(Buffer.from([protocol.properties[propName]]))
  936. writeString(stream, value)
  937. break
  938. }
  939. case 'pair': {
  940. Object.getOwnPropertyNames(value).forEach(name => {
  941. const currentValue = value[name]
  942. if (Array.isArray(currentValue)) {
  943. currentValue.forEach(value => {
  944. stream.write(Buffer.from([protocol.properties[propName]]))
  945. writeStringPair(stream, name.toString(), value.toString())
  946. })
  947. } else {
  948. stream.write(Buffer.from([protocol.properties[propName]]))
  949. writeStringPair(stream, name.toString(), currentValue.toString())
  950. }
  951. })
  952. break
  953. }
  954. default: {
  955. stream.emit('error', new Error(`Invalid property ${propName} value: ${value}`))
  956. return false
  957. }
  958. }
  959. }
  960. function writeProperties (stream, properties, propertiesLength) {
  961. /* write properties to stream */
  962. writeVarByteInt(stream, propertiesLength)
  963. for (const propName in properties) {
  964. if (Object.prototype.hasOwnProperty.call(properties, propName) && properties[propName] !== null) {
  965. const value = properties[propName]
  966. if (Array.isArray(value)) {
  967. for (let valueIndex = 0; valueIndex < value.length; valueIndex++) {
  968. writeProperty(stream, propName, value[valueIndex])
  969. }
  970. } else {
  971. writeProperty(stream, propName, value)
  972. }
  973. }
  974. }
  975. }
  976. function byteLength (bufOrString) {
  977. if (!bufOrString) return 0
  978. else if (bufOrString instanceof Buffer) return bufOrString.length
  979. else return Buffer.byteLength(bufOrString)
  980. }
  981. function isStringOrBuffer (field) {
  982. return typeof field === 'string' || field instanceof Buffer
  983. }
  984. module.exports = generate