parser.js 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709
  1. const bl = require('bl')
  2. const EventEmitter = require('events')
  3. const Packet = require('./packet')
  4. const constants = require('./constants')
  5. const debug = require('debug')('mqtt-packet:parser')
  6. class Parser extends EventEmitter {
  7. constructor () {
  8. super()
  9. this.parser = this.constructor.parser
  10. }
  11. static parser (opt) {
  12. if (!(this instanceof Parser)) return (new Parser()).parser(opt)
  13. this.settings = opt || {}
  14. this._states = [
  15. '_parseHeader',
  16. '_parseLength',
  17. '_parsePayload',
  18. '_newPacket'
  19. ]
  20. this._resetState()
  21. return this
  22. }
  23. _resetState () {
  24. debug('_resetState: resetting packet, error, _list, and _stateCounter')
  25. this.packet = new Packet()
  26. this.error = null
  27. this._list = bl()
  28. this._stateCounter = 0
  29. }
  30. parse (buf) {
  31. if (this.error) this._resetState()
  32. this._list.append(buf)
  33. debug('parse: current state: %s', this._states[this._stateCounter])
  34. while ((this.packet.length !== -1 || this._list.length > 0) &&
  35. this[this._states[this._stateCounter]]() &&
  36. !this.error) {
  37. this._stateCounter++
  38. debug('parse: state complete. _stateCounter is now: %d', this._stateCounter)
  39. debug('parse: packet.length: %d, buffer list length: %d', this.packet.length, this._list.length)
  40. if (this._stateCounter >= this._states.length) this._stateCounter = 0
  41. }
  42. debug('parse: exited while loop. packet: %d, buffer list length: %d', this.packet.length, this._list.length)
  43. return this._list.length
  44. }
  45. _parseHeader () {
  46. // There is at least one byte in the buffer
  47. const zero = this._list.readUInt8(0)
  48. this.packet.cmd = constants.types[zero >> constants.CMD_SHIFT]
  49. this.packet.retain = (zero & constants.RETAIN_MASK) !== 0
  50. this.packet.qos = (zero >> constants.QOS_SHIFT) & constants.QOS_MASK
  51. this.packet.dup = (zero & constants.DUP_MASK) !== 0
  52. debug('_parseHeader: packet: %o', this.packet)
  53. this._list.consume(1)
  54. return true
  55. }
  56. _parseLength () {
  57. // There is at least one byte in the list
  58. const result = this._parseVarByteNum(true)
  59. if (result) {
  60. this.packet.length = result.value
  61. this._list.consume(result.bytes)
  62. }
  63. debug('_parseLength %d', result.value)
  64. return !!result
  65. }
  66. _parsePayload () {
  67. debug('_parsePayload: payload %O', this._list)
  68. let result = false
  69. // Do we have a payload? Do we have enough data to complete the payload?
  70. // PINGs have no payload
  71. if (this.packet.length === 0 || this._list.length >= this.packet.length) {
  72. this._pos = 0
  73. switch (this.packet.cmd) {
  74. case 'connect':
  75. this._parseConnect()
  76. break
  77. case 'connack':
  78. this._parseConnack()
  79. break
  80. case 'publish':
  81. this._parsePublish()
  82. break
  83. case 'puback':
  84. case 'pubrec':
  85. case 'pubrel':
  86. case 'pubcomp':
  87. this._parseConfirmation()
  88. break
  89. case 'subscribe':
  90. this._parseSubscribe()
  91. break
  92. case 'suback':
  93. this._parseSuback()
  94. break
  95. case 'unsubscribe':
  96. this._parseUnsubscribe()
  97. break
  98. case 'unsuback':
  99. this._parseUnsuback()
  100. break
  101. case 'pingreq':
  102. case 'pingresp':
  103. // These are empty, nothing to do
  104. break
  105. case 'disconnect':
  106. this._parseDisconnect()
  107. break
  108. case 'auth':
  109. this._parseAuth()
  110. break
  111. default:
  112. this._emitError(new Error('Not supported'))
  113. }
  114. result = true
  115. }
  116. debug('_parsePayload complete result: %s', result)
  117. return result
  118. }
  119. _parseConnect () {
  120. debug('_parseConnect')
  121. let topic // Will topic
  122. let payload // Will payload
  123. let password // Password
  124. let username // Username
  125. const flags = {}
  126. const packet = this.packet
  127. // Parse protocolId
  128. const protocolId = this._parseString()
  129. if (protocolId === null) return this._emitError(new Error('Cannot parse protocolId'))
  130. if (protocolId !== 'MQTT' && protocolId !== 'MQIsdp') {
  131. return this._emitError(new Error('Invalid protocolId'))
  132. }
  133. packet.protocolId = protocolId
  134. // Parse constants version number
  135. if (this._pos >= this._list.length) return this._emitError(new Error('Packet too short'))
  136. packet.protocolVersion = this._list.readUInt8(this._pos)
  137. if (packet.protocolVersion >= 128) {
  138. packet.bridgeMode = true
  139. packet.protocolVersion = packet.protocolVersion - 128
  140. }
  141. if (packet.protocolVersion !== 3 && packet.protocolVersion !== 4 && packet.protocolVersion !== 5) {
  142. return this._emitError(new Error('Invalid protocol version'))
  143. }
  144. this._pos++
  145. if (this._pos >= this._list.length) {
  146. return this._emitError(new Error('Packet too short'))
  147. }
  148. // Parse connect flags
  149. flags.username = (this._list.readUInt8(this._pos) & constants.USERNAME_MASK)
  150. flags.password = (this._list.readUInt8(this._pos) & constants.PASSWORD_MASK)
  151. flags.will = (this._list.readUInt8(this._pos) & constants.WILL_FLAG_MASK)
  152. if (flags.will) {
  153. packet.will = {}
  154. packet.will.retain = (this._list.readUInt8(this._pos) & constants.WILL_RETAIN_MASK) !== 0
  155. packet.will.qos = (this._list.readUInt8(this._pos) &
  156. constants.WILL_QOS_MASK) >> constants.WILL_QOS_SHIFT
  157. }
  158. packet.clean = (this._list.readUInt8(this._pos) & constants.CLEAN_SESSION_MASK) !== 0
  159. this._pos++
  160. // Parse keepalive
  161. packet.keepalive = this._parseNum()
  162. if (packet.keepalive === -1) return this._emitError(new Error('Packet too short'))
  163. // parse properties
  164. if (packet.protocolVersion === 5) {
  165. const properties = this._parseProperties()
  166. if (Object.getOwnPropertyNames(properties).length) {
  167. packet.properties = properties
  168. }
  169. }
  170. // Parse clientId
  171. const clientId = this._parseString()
  172. if (clientId === null) return this._emitError(new Error('Packet too short'))
  173. packet.clientId = clientId
  174. debug('_parseConnect: packet.clientId: %s', packet.clientId)
  175. if (flags.will) {
  176. if (packet.protocolVersion === 5) {
  177. const willProperties = this._parseProperties()
  178. if (Object.getOwnPropertyNames(willProperties).length) {
  179. packet.will.properties = willProperties
  180. }
  181. }
  182. // Parse will topic
  183. topic = this._parseString()
  184. if (topic === null) return this._emitError(new Error('Cannot parse will topic'))
  185. packet.will.topic = topic
  186. debug('_parseConnect: packet.will.topic: %s', packet.will.topic)
  187. // Parse will payload
  188. payload = this._parseBuffer()
  189. if (payload === null) return this._emitError(new Error('Cannot parse will payload'))
  190. packet.will.payload = payload
  191. debug('_parseConnect: packet.will.paylaod: %s', packet.will.payload)
  192. }
  193. // Parse username
  194. if (flags.username) {
  195. username = this._parseString()
  196. if (username === null) return this._emitError(new Error('Cannot parse username'))
  197. packet.username = username
  198. debug('_parseConnect: packet.username: %s', packet.username)
  199. }
  200. // Parse password
  201. if (flags.password) {
  202. password = this._parseBuffer()
  203. if (password === null) return this._emitError(new Error('Cannot parse password'))
  204. packet.password = password
  205. }
  206. // need for right parse auth packet and self set up
  207. this.settings = packet
  208. debug('_parseConnect: complete')
  209. return packet
  210. }
  211. _parseConnack () {
  212. debug('_parseConnack')
  213. const packet = this.packet
  214. if (this._list.length < 1) return null
  215. packet.sessionPresent = !!(this._list.readUInt8(this._pos++) & constants.SESSIONPRESENT_MASK)
  216. if (this.settings.protocolVersion === 5) {
  217. if (this._list.length >= 2) {
  218. packet.reasonCode = this._list.readUInt8(this._pos++)
  219. } else {
  220. packet.reasonCode = 0
  221. }
  222. } else {
  223. if (this._list.length < 2) return null
  224. packet.returnCode = this._list.readUInt8(this._pos++)
  225. }
  226. if (packet.returnCode === -1 || packet.reasonCode === -1) return this._emitError(new Error('Cannot parse return code'))
  227. // mqtt 5 properties
  228. if (this.settings.protocolVersion === 5) {
  229. const properties = this._parseProperties()
  230. if (Object.getOwnPropertyNames(properties).length) {
  231. packet.properties = properties
  232. }
  233. }
  234. debug('_parseConnack: complete')
  235. }
  236. _parsePublish () {
  237. debug('_parsePublish')
  238. const packet = this.packet
  239. packet.topic = this._parseString()
  240. if (packet.topic === null) return this._emitError(new Error('Cannot parse topic'))
  241. // Parse messageId
  242. if (packet.qos > 0) if (!this._parseMessageId()) { return }
  243. // Properties mqtt 5
  244. if (this.settings.protocolVersion === 5) {
  245. const properties = this._parseProperties()
  246. if (Object.getOwnPropertyNames(properties).length) {
  247. packet.properties = properties
  248. }
  249. }
  250. packet.payload = this._list.slice(this._pos, packet.length)
  251. debug('_parsePublish: payload from buffer list: %o', packet.payload)
  252. }
  253. _parseSubscribe () {
  254. debug('_parseSubscribe')
  255. const packet = this.packet
  256. let topic
  257. let options
  258. let qos
  259. let rh
  260. let rap
  261. let nl
  262. let subscription
  263. if (packet.qos !== 1) {
  264. return this._emitError(new Error('Wrong subscribe header'))
  265. }
  266. packet.subscriptions = []
  267. if (!this._parseMessageId()) { return }
  268. // Properties mqtt 5
  269. if (this.settings.protocolVersion === 5) {
  270. const properties = this._parseProperties()
  271. if (Object.getOwnPropertyNames(properties).length) {
  272. packet.properties = properties
  273. }
  274. }
  275. while (this._pos < packet.length) {
  276. // Parse topic
  277. topic = this._parseString()
  278. if (topic === null) return this._emitError(new Error('Cannot parse topic'))
  279. if (this._pos >= packet.length) return this._emitError(new Error('Malformed Subscribe Payload'))
  280. options = this._parseByte()
  281. qos = options & constants.SUBSCRIBE_OPTIONS_QOS_MASK
  282. nl = ((options >> constants.SUBSCRIBE_OPTIONS_NL_SHIFT) & constants.SUBSCRIBE_OPTIONS_NL_MASK) !== 0
  283. rap = ((options >> constants.SUBSCRIBE_OPTIONS_RAP_SHIFT) & constants.SUBSCRIBE_OPTIONS_RAP_MASK) !== 0
  284. rh = (options >> constants.SUBSCRIBE_OPTIONS_RH_SHIFT) & constants.SUBSCRIBE_OPTIONS_RH_MASK
  285. subscription = { topic, qos }
  286. // mqtt 5 options
  287. if (this.settings.protocolVersion === 5) {
  288. subscription.nl = nl
  289. subscription.rap = rap
  290. subscription.rh = rh
  291. } else if (this.settings.bridgeMode) {
  292. subscription.rh = 0
  293. subscription.rap = true
  294. subscription.nl = true
  295. }
  296. // Push pair to subscriptions
  297. debug('_parseSubscribe: push subscription `%s` to subscription', subscription)
  298. packet.subscriptions.push(subscription)
  299. }
  300. }
  301. _parseSuback () {
  302. debug('_parseSuback')
  303. const packet = this.packet
  304. this.packet.granted = []
  305. if (!this._parseMessageId()) { return }
  306. // Properties mqtt 5
  307. if (this.settings.protocolVersion === 5) {
  308. const properties = this._parseProperties()
  309. if (Object.getOwnPropertyNames(properties).length) {
  310. packet.properties = properties
  311. }
  312. }
  313. // Parse granted QoSes
  314. while (this._pos < this.packet.length) {
  315. this.packet.granted.push(this._list.readUInt8(this._pos++))
  316. }
  317. }
  318. _parseUnsubscribe () {
  319. debug('_parseUnsubscribe')
  320. const packet = this.packet
  321. packet.unsubscriptions = []
  322. // Parse messageId
  323. if (!this._parseMessageId()) { return }
  324. // Properties mqtt 5
  325. if (this.settings.protocolVersion === 5) {
  326. const properties = this._parseProperties()
  327. if (Object.getOwnPropertyNames(properties).length) {
  328. packet.properties = properties
  329. }
  330. }
  331. while (this._pos < packet.length) {
  332. // Parse topic
  333. const topic = this._parseString()
  334. if (topic === null) return this._emitError(new Error('Cannot parse topic'))
  335. // Push topic to unsubscriptions
  336. debug('_parseUnsubscribe: push topic `%s` to unsubscriptions', topic)
  337. packet.unsubscriptions.push(topic)
  338. }
  339. }
  340. _parseUnsuback () {
  341. debug('_parseUnsuback')
  342. const packet = this.packet
  343. if (!this._parseMessageId()) return this._emitError(new Error('Cannot parse messageId'))
  344. // Properties mqtt 5
  345. if (this.settings.protocolVersion === 5) {
  346. const properties = this._parseProperties()
  347. if (Object.getOwnPropertyNames(properties).length) {
  348. packet.properties = properties
  349. }
  350. // Parse granted QoSes
  351. packet.granted = []
  352. while (this._pos < this.packet.length) {
  353. this.packet.granted.push(this._list.readUInt8(this._pos++))
  354. }
  355. }
  356. }
  357. // parse packets like puback, pubrec, pubrel, pubcomp
  358. _parseConfirmation () {
  359. debug('_parseConfirmation: packet.cmd: `%s`', this.packet.cmd)
  360. const packet = this.packet
  361. this._parseMessageId()
  362. if (this.settings.protocolVersion === 5) {
  363. if (packet.length > 2) {
  364. // response code
  365. packet.reasonCode = this._parseByte()
  366. debug('_parseConfirmation: packet.reasonCode `%d`', packet.reasonCode)
  367. } else {
  368. packet.reasonCode = 0
  369. }
  370. if (packet.length > 3) {
  371. // properies mqtt 5
  372. const properties = this._parseProperties()
  373. if (Object.getOwnPropertyNames(properties).length) {
  374. packet.properties = properties
  375. }
  376. }
  377. }
  378. return true
  379. }
  380. // parse disconnect packet
  381. _parseDisconnect () {
  382. const packet = this.packet
  383. debug('_parseDisconnect')
  384. if (this.settings.protocolVersion === 5) {
  385. // response code
  386. if (this._list.length > 0) {
  387. packet.reasonCode = this._parseByte()
  388. } else {
  389. packet.reasonCode = 0
  390. }
  391. // properies mqtt 5
  392. const properties = this._parseProperties()
  393. if (Object.getOwnPropertyNames(properties).length) {
  394. packet.properties = properties
  395. }
  396. }
  397. debug('_parseDisconnect result: true')
  398. return true
  399. }
  400. // parse auth packet
  401. _parseAuth () {
  402. debug('_parseAuth')
  403. const packet = this.packet
  404. if (this.settings.protocolVersion !== 5) {
  405. return this._emitError(new Error('Not supported auth packet for this version MQTT'))
  406. }
  407. // response code
  408. packet.reasonCode = this._parseByte()
  409. // properies mqtt 5
  410. const properties = this._parseProperties()
  411. if (Object.getOwnPropertyNames(properties).length) {
  412. packet.properties = properties
  413. }
  414. debug('_parseAuth: result: true')
  415. return true
  416. }
  417. _parseMessageId () {
  418. const packet = this.packet
  419. packet.messageId = this._parseNum()
  420. if (packet.messageId === null) {
  421. this._emitError(new Error('Cannot parse messageId'))
  422. return false
  423. }
  424. debug('_parseMessageId: packet.messageId %d', packet.messageId)
  425. return true
  426. }
  427. _parseString (maybeBuffer) {
  428. const length = this._parseNum()
  429. const end = length + this._pos
  430. if (length === -1 || end > this._list.length || end > this.packet.length) return null
  431. const result = this._list.toString('utf8', this._pos, end)
  432. this._pos += length
  433. debug('_parseString: result: %s', result)
  434. return result
  435. }
  436. _parseStringPair () {
  437. debug('_parseStringPair')
  438. return {
  439. name: this._parseString(),
  440. value: this._parseString()
  441. }
  442. }
  443. _parseBuffer () {
  444. const length = this._parseNum()
  445. const end = length + this._pos
  446. if (length === -1 || end > this._list.length || end > this.packet.length) return null
  447. const result = this._list.slice(this._pos, end)
  448. this._pos += length
  449. debug('_parseBuffer: result: %o', result)
  450. return result
  451. }
  452. _parseNum () {
  453. if (this._list.length - this._pos < 2) return -1
  454. const result = this._list.readUInt16BE(this._pos)
  455. this._pos += 2
  456. debug('_parseNum: result: %s', result)
  457. return result
  458. }
  459. _parse4ByteNum () {
  460. if (this._list.length - this._pos < 4) return -1
  461. const result = this._list.readUInt32BE(this._pos)
  462. this._pos += 4
  463. debug('_parse4ByteNum: result: %s', result)
  464. return result
  465. }
  466. _parseVarByteNum (fullInfoFlag) {
  467. debug('_parseVarByteNum')
  468. const maxBytes = 4
  469. let bytes = 0
  470. let mul = 1
  471. let value = 0
  472. let result = false
  473. let current
  474. const padding = this._pos ? this._pos : 0
  475. while (bytes < maxBytes && (padding + bytes) < this._list.length) {
  476. current = this._list.readUInt8(padding + bytes++)
  477. value += mul * (current & constants.VARBYTEINT_MASK)
  478. mul *= 0x80
  479. if ((current & constants.VARBYTEINT_FIN_MASK) === 0) {
  480. result = true
  481. break
  482. }
  483. if (this._list.length <= bytes) {
  484. break
  485. }
  486. }
  487. if (!result && bytes === maxBytes && this._list.length >= bytes) {
  488. this._emitError(new Error('Invalid variable byte integer'))
  489. }
  490. if (padding) {
  491. this._pos += bytes
  492. }
  493. result = result
  494. ? fullInfoFlag ? {
  495. bytes,
  496. value
  497. } : value
  498. : false
  499. debug('_parseVarByteNum: result: %o', result)
  500. return result
  501. }
  502. _parseByte () {
  503. const result = this._list.readUInt8(this._pos)
  504. this._pos++
  505. debug('_parseByte: result: %o', result)
  506. return result
  507. }
  508. _parseByType (type) {
  509. debug('_parseByType: type: %s', type)
  510. switch (type) {
  511. case 'byte': {
  512. return this._parseByte() !== 0
  513. }
  514. case 'int8': {
  515. return this._parseByte()
  516. }
  517. case 'int16': {
  518. return this._parseNum()
  519. }
  520. case 'int32': {
  521. return this._parse4ByteNum()
  522. }
  523. case 'var': {
  524. return this._parseVarByteNum()
  525. }
  526. case 'string': {
  527. return this._parseString()
  528. }
  529. case 'pair': {
  530. return this._parseStringPair()
  531. }
  532. case 'binary': {
  533. return this._parseBuffer()
  534. }
  535. }
  536. }
  537. _parseProperties () {
  538. debug('_parseProperties')
  539. const length = this._parseVarByteNum()
  540. const start = this._pos
  541. const end = start + length
  542. const result = {}
  543. while (this._pos < end) {
  544. const type = this._parseByte()
  545. const name = constants.propertiesCodes[type]
  546. if (!name) {
  547. this._emitError(new Error('Unknown property'))
  548. return false
  549. }
  550. // user properties process
  551. if (name === 'userProperties') {
  552. if (!result[name]) {
  553. result[name] = Object.create(null)
  554. }
  555. const currentUserProperty = this._parseByType(constants.propertiesTypes[name])
  556. if (result[name][currentUserProperty.name]) {
  557. if (Array.isArray(result[name][currentUserProperty.name])) {
  558. result[name][currentUserProperty.name].push(currentUserProperty.value)
  559. } else {
  560. const currentValue = result[name][currentUserProperty.name]
  561. result[name][currentUserProperty.name] = [currentValue]
  562. result[name][currentUserProperty.name].push(currentUserProperty.value)
  563. }
  564. } else {
  565. result[name][currentUserProperty.name] = currentUserProperty.value
  566. }
  567. continue
  568. }
  569. if (result[name]) {
  570. if (Array.isArray(result[name])) {
  571. result[name].push(this._parseByType(constants.propertiesTypes[name]))
  572. } else {
  573. result[name] = [result[name]]
  574. result[name].push(this._parseByType(constants.propertiesTypes[name]))
  575. }
  576. } else {
  577. result[name] = this._parseByType(constants.propertiesTypes[name])
  578. }
  579. }
  580. return result
  581. }
  582. _newPacket () {
  583. debug('_newPacket')
  584. if (this.packet) {
  585. this._list.consume(this.packet.length)
  586. debug('_newPacket: parser emit packet: packet.cmd: %s, packet.payload: %s, packet.length: %d', this.packet.cmd, this.packet.payload, this.packet.length)
  587. this.emit('packet', this.packet)
  588. }
  589. debug('_newPacket: new packet')
  590. this.packet = new Packet()
  591. this._pos = 0
  592. return true
  593. }
  594. _emitError (err) {
  595. debug('_emitError')
  596. this.error = err
  597. this.emit('error', err)
  598. }
  599. }
  600. module.exports = Parser