123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709 |
- const bl = require('bl')
- const EventEmitter = require('events')
- const Packet = require('./packet')
- const constants = require('./constants')
- const debug = require('debug')('mqtt-packet:parser')
- class Parser extends EventEmitter {
- constructor () {
- super()
- this.parser = this.constructor.parser
- }
- static parser (opt) {
- if (!(this instanceof Parser)) return (new Parser()).parser(opt)
- this.settings = opt || {}
- this._states = [
- '_parseHeader',
- '_parseLength',
- '_parsePayload',
- '_newPacket'
- ]
- this._resetState()
- return this
- }
- _resetState () {
- debug('_resetState: resetting packet, error, _list, and _stateCounter')
- this.packet = new Packet()
- this.error = null
- this._list = bl()
- this._stateCounter = 0
- }
- parse (buf) {
- if (this.error) this._resetState()
- this._list.append(buf)
- debug('parse: current state: %s', this._states[this._stateCounter])
- while ((this.packet.length !== -1 || this._list.length > 0) &&
- this[this._states[this._stateCounter]]() &&
- !this.error) {
- this._stateCounter++
- debug('parse: state complete. _stateCounter is now: %d', this._stateCounter)
- debug('parse: packet.length: %d, buffer list length: %d', this.packet.length, this._list.length)
- if (this._stateCounter >= this._states.length) this._stateCounter = 0
- }
- debug('parse: exited while loop. packet: %d, buffer list length: %d', this.packet.length, this._list.length)
- return this._list.length
- }
- _parseHeader () {
- // There is at least one byte in the buffer
- const zero = this._list.readUInt8(0)
- this.packet.cmd = constants.types[zero >> constants.CMD_SHIFT]
- this.packet.retain = (zero & constants.RETAIN_MASK) !== 0
- this.packet.qos = (zero >> constants.QOS_SHIFT) & constants.QOS_MASK
- this.packet.dup = (zero & constants.DUP_MASK) !== 0
- debug('_parseHeader: packet: %o', this.packet)
- this._list.consume(1)
- return true
- }
- _parseLength () {
- // There is at least one byte in the list
- const result = this._parseVarByteNum(true)
- if (result) {
- this.packet.length = result.value
- this._list.consume(result.bytes)
- }
- debug('_parseLength %d', result.value)
- return !!result
- }
- _parsePayload () {
- debug('_parsePayload: payload %O', this._list)
- let result = false
- // Do we have a payload? Do we have enough data to complete the payload?
- // PINGs have no payload
- if (this.packet.length === 0 || this._list.length >= this.packet.length) {
- this._pos = 0
- switch (this.packet.cmd) {
- case 'connect':
- this._parseConnect()
- break
- case 'connack':
- this._parseConnack()
- break
- case 'publish':
- this._parsePublish()
- break
- case 'puback':
- case 'pubrec':
- case 'pubrel':
- case 'pubcomp':
- this._parseConfirmation()
- break
- case 'subscribe':
- this._parseSubscribe()
- break
- case 'suback':
- this._parseSuback()
- break
- case 'unsubscribe':
- this._parseUnsubscribe()
- break
- case 'unsuback':
- this._parseUnsuback()
- break
- case 'pingreq':
- case 'pingresp':
- // These are empty, nothing to do
- break
- case 'disconnect':
- this._parseDisconnect()
- break
- case 'auth':
- this._parseAuth()
- break
- default:
- this._emitError(new Error('Not supported'))
- }
- result = true
- }
- debug('_parsePayload complete result: %s', result)
- return result
- }
- _parseConnect () {
- debug('_parseConnect')
- let topic // Will topic
- let payload // Will payload
- let password // Password
- let username // Username
- const flags = {}
- const packet = this.packet
- // Parse protocolId
- const protocolId = this._parseString()
- if (protocolId === null) return this._emitError(new Error('Cannot parse protocolId'))
- if (protocolId !== 'MQTT' && protocolId !== 'MQIsdp') {
- return this._emitError(new Error('Invalid protocolId'))
- }
- packet.protocolId = protocolId
- // Parse constants version number
- if (this._pos >= this._list.length) return this._emitError(new Error('Packet too short'))
- packet.protocolVersion = this._list.readUInt8(this._pos)
- if (packet.protocolVersion >= 128) {
- packet.bridgeMode = true
- packet.protocolVersion = packet.protocolVersion - 128
- }
- if (packet.protocolVersion !== 3 && packet.protocolVersion !== 4 && packet.protocolVersion !== 5) {
- return this._emitError(new Error('Invalid protocol version'))
- }
- this._pos++
- if (this._pos >= this._list.length) {
- return this._emitError(new Error('Packet too short'))
- }
- // Parse connect flags
- flags.username = (this._list.readUInt8(this._pos) & constants.USERNAME_MASK)
- flags.password = (this._list.readUInt8(this._pos) & constants.PASSWORD_MASK)
- flags.will = (this._list.readUInt8(this._pos) & constants.WILL_FLAG_MASK)
- if (flags.will) {
- packet.will = {}
- packet.will.retain = (this._list.readUInt8(this._pos) & constants.WILL_RETAIN_MASK) !== 0
- packet.will.qos = (this._list.readUInt8(this._pos) &
- constants.WILL_QOS_MASK) >> constants.WILL_QOS_SHIFT
- }
- packet.clean = (this._list.readUInt8(this._pos) & constants.CLEAN_SESSION_MASK) !== 0
- this._pos++
- // Parse keepalive
- packet.keepalive = this._parseNum()
- if (packet.keepalive === -1) return this._emitError(new Error('Packet too short'))
- // parse properties
- if (packet.protocolVersion === 5) {
- const properties = this._parseProperties()
- if (Object.getOwnPropertyNames(properties).length) {
- packet.properties = properties
- }
- }
- // Parse clientId
- const clientId = this._parseString()
- if (clientId === null) return this._emitError(new Error('Packet too short'))
- packet.clientId = clientId
- debug('_parseConnect: packet.clientId: %s', packet.clientId)
- if (flags.will) {
- if (packet.protocolVersion === 5) {
- const willProperties = this._parseProperties()
- if (Object.getOwnPropertyNames(willProperties).length) {
- packet.will.properties = willProperties
- }
- }
- // Parse will topic
- topic = this._parseString()
- if (topic === null) return this._emitError(new Error('Cannot parse will topic'))
- packet.will.topic = topic
- debug('_parseConnect: packet.will.topic: %s', packet.will.topic)
- // Parse will payload
- payload = this._parseBuffer()
- if (payload === null) return this._emitError(new Error('Cannot parse will payload'))
- packet.will.payload = payload
- debug('_parseConnect: packet.will.paylaod: %s', packet.will.payload)
- }
- // Parse username
- if (flags.username) {
- username = this._parseString()
- if (username === null) return this._emitError(new Error('Cannot parse username'))
- packet.username = username
- debug('_parseConnect: packet.username: %s', packet.username)
- }
- // Parse password
- if (flags.password) {
- password = this._parseBuffer()
- if (password === null) return this._emitError(new Error('Cannot parse password'))
- packet.password = password
- }
- // need for right parse auth packet and self set up
- this.settings = packet
- debug('_parseConnect: complete')
- return packet
- }
- _parseConnack () {
- debug('_parseConnack')
- const packet = this.packet
- if (this._list.length < 1) return null
- packet.sessionPresent = !!(this._list.readUInt8(this._pos++) & constants.SESSIONPRESENT_MASK)
- if (this.settings.protocolVersion === 5) {
- if (this._list.length >= 2) {
- packet.reasonCode = this._list.readUInt8(this._pos++)
- } else {
- packet.reasonCode = 0
- }
- } else {
- if (this._list.length < 2) return null
- packet.returnCode = this._list.readUInt8(this._pos++)
- }
- if (packet.returnCode === -1 || packet.reasonCode === -1) return this._emitError(new Error('Cannot parse return code'))
- // mqtt 5 properties
- if (this.settings.protocolVersion === 5) {
- const properties = this._parseProperties()
- if (Object.getOwnPropertyNames(properties).length) {
- packet.properties = properties
- }
- }
- debug('_parseConnack: complete')
- }
- _parsePublish () {
- debug('_parsePublish')
- const packet = this.packet
- packet.topic = this._parseString()
- if (packet.topic === null) return this._emitError(new Error('Cannot parse topic'))
- // Parse messageId
- if (packet.qos > 0) if (!this._parseMessageId()) { return }
- // Properties mqtt 5
- if (this.settings.protocolVersion === 5) {
- const properties = this._parseProperties()
- if (Object.getOwnPropertyNames(properties).length) {
- packet.properties = properties
- }
- }
- packet.payload = this._list.slice(this._pos, packet.length)
- debug('_parsePublish: payload from buffer list: %o', packet.payload)
- }
- _parseSubscribe () {
- debug('_parseSubscribe')
- const packet = this.packet
- let topic
- let options
- let qos
- let rh
- let rap
- let nl
- let subscription
- if (packet.qos !== 1) {
- return this._emitError(new Error('Wrong subscribe header'))
- }
- packet.subscriptions = []
- if (!this._parseMessageId()) { return }
- // Properties mqtt 5
- if (this.settings.protocolVersion === 5) {
- const properties = this._parseProperties()
- if (Object.getOwnPropertyNames(properties).length) {
- packet.properties = properties
- }
- }
- while (this._pos < packet.length) {
- // Parse topic
- topic = this._parseString()
- if (topic === null) return this._emitError(new Error('Cannot parse topic'))
- if (this._pos >= packet.length) return this._emitError(new Error('Malformed Subscribe Payload'))
- options = this._parseByte()
- qos = options & constants.SUBSCRIBE_OPTIONS_QOS_MASK
- nl = ((options >> constants.SUBSCRIBE_OPTIONS_NL_SHIFT) & constants.SUBSCRIBE_OPTIONS_NL_MASK) !== 0
- rap = ((options >> constants.SUBSCRIBE_OPTIONS_RAP_SHIFT) & constants.SUBSCRIBE_OPTIONS_RAP_MASK) !== 0
- rh = (options >> constants.SUBSCRIBE_OPTIONS_RH_SHIFT) & constants.SUBSCRIBE_OPTIONS_RH_MASK
- subscription = { topic, qos }
- // mqtt 5 options
- if (this.settings.protocolVersion === 5) {
- subscription.nl = nl
- subscription.rap = rap
- subscription.rh = rh
- } else if (this.settings.bridgeMode) {
- subscription.rh = 0
- subscription.rap = true
- subscription.nl = true
- }
- // Push pair to subscriptions
- debug('_parseSubscribe: push subscription `%s` to subscription', subscription)
- packet.subscriptions.push(subscription)
- }
- }
- _parseSuback () {
- debug('_parseSuback')
- const packet = this.packet
- this.packet.granted = []
- if (!this._parseMessageId()) { return }
- // Properties mqtt 5
- if (this.settings.protocolVersion === 5) {
- const properties = this._parseProperties()
- if (Object.getOwnPropertyNames(properties).length) {
- packet.properties = properties
- }
- }
- // Parse granted QoSes
- while (this._pos < this.packet.length) {
- this.packet.granted.push(this._list.readUInt8(this._pos++))
- }
- }
- _parseUnsubscribe () {
- debug('_parseUnsubscribe')
- const packet = this.packet
- packet.unsubscriptions = []
- // Parse messageId
- if (!this._parseMessageId()) { return }
- // Properties mqtt 5
- if (this.settings.protocolVersion === 5) {
- const properties = this._parseProperties()
- if (Object.getOwnPropertyNames(properties).length) {
- packet.properties = properties
- }
- }
- while (this._pos < packet.length) {
- // Parse topic
- const topic = this._parseString()
- if (topic === null) return this._emitError(new Error('Cannot parse topic'))
- // Push topic to unsubscriptions
- debug('_parseUnsubscribe: push topic `%s` to unsubscriptions', topic)
- packet.unsubscriptions.push(topic)
- }
- }
- _parseUnsuback () {
- debug('_parseUnsuback')
- const packet = this.packet
- if (!this._parseMessageId()) return this._emitError(new Error('Cannot parse messageId'))
- // Properties mqtt 5
- if (this.settings.protocolVersion === 5) {
- const properties = this._parseProperties()
- if (Object.getOwnPropertyNames(properties).length) {
- packet.properties = properties
- }
- // Parse granted QoSes
- packet.granted = []
- while (this._pos < this.packet.length) {
- this.packet.granted.push(this._list.readUInt8(this._pos++))
- }
- }
- }
- // parse packets like puback, pubrec, pubrel, pubcomp
- _parseConfirmation () {
- debug('_parseConfirmation: packet.cmd: `%s`', this.packet.cmd)
- const packet = this.packet
- this._parseMessageId()
- if (this.settings.protocolVersion === 5) {
- if (packet.length > 2) {
- // response code
- packet.reasonCode = this._parseByte()
- debug('_parseConfirmation: packet.reasonCode `%d`', packet.reasonCode)
- } else {
- packet.reasonCode = 0
- }
- if (packet.length > 3) {
- // properies mqtt 5
- const properties = this._parseProperties()
- if (Object.getOwnPropertyNames(properties).length) {
- packet.properties = properties
- }
- }
- }
- return true
- }
- // parse disconnect packet
- _parseDisconnect () {
- const packet = this.packet
- debug('_parseDisconnect')
- if (this.settings.protocolVersion === 5) {
- // response code
- if (this._list.length > 0) {
- packet.reasonCode = this._parseByte()
- } else {
- packet.reasonCode = 0
- }
- // properies mqtt 5
- const properties = this._parseProperties()
- if (Object.getOwnPropertyNames(properties).length) {
- packet.properties = properties
- }
- }
- debug('_parseDisconnect result: true')
- return true
- }
- // parse auth packet
- _parseAuth () {
- debug('_parseAuth')
- const packet = this.packet
- if (this.settings.protocolVersion !== 5) {
- return this._emitError(new Error('Not supported auth packet for this version MQTT'))
- }
- // response code
- packet.reasonCode = this._parseByte()
- // properies mqtt 5
- const properties = this._parseProperties()
- if (Object.getOwnPropertyNames(properties).length) {
- packet.properties = properties
- }
- debug('_parseAuth: result: true')
- return true
- }
- _parseMessageId () {
- const packet = this.packet
- packet.messageId = this._parseNum()
- if (packet.messageId === null) {
- this._emitError(new Error('Cannot parse messageId'))
- return false
- }
- debug('_parseMessageId: packet.messageId %d', packet.messageId)
- return true
- }
- _parseString (maybeBuffer) {
- const length = this._parseNum()
- const end = length + this._pos
- if (length === -1 || end > this._list.length || end > this.packet.length) return null
- const result = this._list.toString('utf8', this._pos, end)
- this._pos += length
- debug('_parseString: result: %s', result)
- return result
- }
- _parseStringPair () {
- debug('_parseStringPair')
- return {
- name: this._parseString(),
- value: this._parseString()
- }
- }
- _parseBuffer () {
- const length = this._parseNum()
- const end = length + this._pos
- if (length === -1 || end > this._list.length || end > this.packet.length) return null
- const result = this._list.slice(this._pos, end)
- this._pos += length
- debug('_parseBuffer: result: %o', result)
- return result
- }
- _parseNum () {
- if (this._list.length - this._pos < 2) return -1
- const result = this._list.readUInt16BE(this._pos)
- this._pos += 2
- debug('_parseNum: result: %s', result)
- return result
- }
- _parse4ByteNum () {
- if (this._list.length - this._pos < 4) return -1
- const result = this._list.readUInt32BE(this._pos)
- this._pos += 4
- debug('_parse4ByteNum: result: %s', result)
- return result
- }
- _parseVarByteNum (fullInfoFlag) {
- debug('_parseVarByteNum')
- const maxBytes = 4
- let bytes = 0
- let mul = 1
- let value = 0
- let result = false
- let current
- const padding = this._pos ? this._pos : 0
- while (bytes < maxBytes && (padding + bytes) < this._list.length) {
- current = this._list.readUInt8(padding + bytes++)
- value += mul * (current & constants.VARBYTEINT_MASK)
- mul *= 0x80
- if ((current & constants.VARBYTEINT_FIN_MASK) === 0) {
- result = true
- break
- }
- if (this._list.length <= bytes) {
- break
- }
- }
- if (!result && bytes === maxBytes && this._list.length >= bytes) {
- this._emitError(new Error('Invalid variable byte integer'))
- }
- if (padding) {
- this._pos += bytes
- }
- result = result
- ? fullInfoFlag ? {
- bytes,
- value
- } : value
- : false
- debug('_parseVarByteNum: result: %o', result)
- return result
- }
- _parseByte () {
- const result = this._list.readUInt8(this._pos)
- this._pos++
- debug('_parseByte: result: %o', result)
- return result
- }
- _parseByType (type) {
- debug('_parseByType: type: %s', type)
- switch (type) {
- case 'byte': {
- return this._parseByte() !== 0
- }
- case 'int8': {
- return this._parseByte()
- }
- case 'int16': {
- return this._parseNum()
- }
- case 'int32': {
- return this._parse4ByteNum()
- }
- case 'var': {
- return this._parseVarByteNum()
- }
- case 'string': {
- return this._parseString()
- }
- case 'pair': {
- return this._parseStringPair()
- }
- case 'binary': {
- return this._parseBuffer()
- }
- }
- }
- _parseProperties () {
- debug('_parseProperties')
- const length = this._parseVarByteNum()
- const start = this._pos
- const end = start + length
- const result = {}
- while (this._pos < end) {
- const type = this._parseByte()
- const name = constants.propertiesCodes[type]
- if (!name) {
- this._emitError(new Error('Unknown property'))
- return false
- }
- // user properties process
- if (name === 'userProperties') {
- if (!result[name]) {
- result[name] = Object.create(null)
- }
- const currentUserProperty = this._parseByType(constants.propertiesTypes[name])
- if (result[name][currentUserProperty.name]) {
- if (Array.isArray(result[name][currentUserProperty.name])) {
- result[name][currentUserProperty.name].push(currentUserProperty.value)
- } else {
- const currentValue = result[name][currentUserProperty.name]
- result[name][currentUserProperty.name] = [currentValue]
- result[name][currentUserProperty.name].push(currentUserProperty.value)
- }
- } else {
- result[name][currentUserProperty.name] = currentUserProperty.value
- }
- continue
- }
- if (result[name]) {
- if (Array.isArray(result[name])) {
- result[name].push(this._parseByType(constants.propertiesTypes[name]))
- } else {
- result[name] = [result[name]]
- result[name].push(this._parseByType(constants.propertiesTypes[name]))
- }
- } else {
- result[name] = this._parseByType(constants.propertiesTypes[name])
- }
- }
- return result
- }
- _newPacket () {
- debug('_newPacket')
- if (this.packet) {
- this._list.consume(this.packet.length)
- debug('_newPacket: parser emit packet: packet.cmd: %s, packet.payload: %s, packet.length: %d', this.packet.cmd, this.packet.payload, this.packet.length)
- this.emit('packet', this.packet)
- }
- debug('_newPacket: new packet')
- this.packet = new Packet()
- this._pos = 0
- return true
- }
- _emitError (err) {
- debug('_emitError')
- this.error = err
- this.emit('error', err)
- }
- }
- module.exports = Parser
|