1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117 |
- const protocol = require('./constants')
- const empty = Buffer.allocUnsafe(0)
- const zeroBuf = Buffer.from([0])
- const numbers = require('./numbers')
- const nextTick = require('process-nextick-args').nextTick
- const debug = require('debug')('mqtt-packet:writeToStream')
- const numCache = numbers.cache
- const generateNumber = numbers.generateNumber
- const generateCache = numbers.generateCache
- const genBufVariableByteInt = numbers.genBufVariableByteInt
- const generate4ByteBuffer = numbers.generate4ByteBuffer
- let writeNumber = writeNumberCached
- let toGenerate = true
- function generate (packet, stream, opts) {
- debug('generate called')
- if (stream.cork) {
- stream.cork()
- nextTick(uncork, stream)
- }
- if (toGenerate) {
- toGenerate = false
- generateCache()
- }
- debug('generate: packet.cmd: %s', packet.cmd)
- switch (packet.cmd) {
- case 'connect':
- return connect(packet, stream, opts)
- case 'connack':
- return connack(packet, stream, opts)
- case 'publish':
- return publish(packet, stream, opts)
- case 'puback':
- case 'pubrec':
- case 'pubrel':
- case 'pubcomp':
- return confirmation(packet, stream, opts)
- case 'subscribe':
- return subscribe(packet, stream, opts)
- case 'suback':
- return suback(packet, stream, opts)
- case 'unsubscribe':
- return unsubscribe(packet, stream, opts)
- case 'unsuback':
- return unsuback(packet, stream, opts)
- case 'pingreq':
- case 'pingresp':
- return emptyPacket(packet, stream, opts)
- case 'disconnect':
- return disconnect(packet, stream, opts)
- case 'auth':
- return auth(packet, stream, opts)
- default:
- stream.emit('error', new Error('Unknown command'))
- return false
- }
- }
- /**
- * Controls numbers cache.
- * Set to "false" to allocate buffers on-the-flight instead of pre-generated cache
- */
- Object.defineProperty(generate, 'cacheNumbers', {
- get () {
- return writeNumber === writeNumberCached
- },
- set (value) {
- if (value) {
- if (!numCache || Object.keys(numCache).length === 0) toGenerate = true
- writeNumber = writeNumberCached
- } else {
- toGenerate = false
- writeNumber = writeNumberGenerated
- }
- }
- })
- function uncork (stream) {
- stream.uncork()
- }
- function connect (packet, stream, opts) {
- const settings = packet || {}
- const protocolId = settings.protocolId || 'MQTT'
- let protocolVersion = settings.protocolVersion || 4
- const will = settings.will
- let clean = settings.clean
- const keepalive = settings.keepalive || 0
- const clientId = settings.clientId || ''
- const username = settings.username
- const password = settings.password
- /* mqtt5 new oprions */
- const properties = settings.properties
- if (clean === undefined) clean = true
- let length = 0
- // Must be a string and non-falsy
- if (!protocolId ||
- (typeof protocolId !== 'string' && !Buffer.isBuffer(protocolId))) {
- stream.emit('error', new Error('Invalid protocolId'))
- return false
- } else length += protocolId.length + 2
- // Must be 3 or 4 or 5
- if (protocolVersion !== 3 && protocolVersion !== 4 && protocolVersion !== 5) {
- stream.emit('error', new Error('Invalid protocol version'))
- return false
- } else length += 1
- // ClientId might be omitted in 3.1.1 and 5, but only if cleanSession is set to 1
- if ((typeof clientId === 'string' || Buffer.isBuffer(clientId)) &&
- (clientId || protocolVersion >= 4) && (clientId || clean)) {
- length += Buffer.byteLength(clientId) + 2
- } else {
- if (protocolVersion < 4) {
- stream.emit('error', new Error('clientId must be supplied before 3.1.1'))
- return false
- }
- if ((clean * 1) === 0) {
- stream.emit('error', new Error('clientId must be given if cleanSession set to 0'))
- return false
- }
- }
- // Must be a two byte number
- if (typeof keepalive !== 'number' ||
- keepalive < 0 ||
- keepalive > 65535 ||
- keepalive % 1 !== 0) {
- stream.emit('error', new Error('Invalid keepalive'))
- return false
- } else length += 2
- // Connect flags
- length += 1
- // Properties
- if (protocolVersion === 5) {
- var propertiesData = getProperties(stream, properties)
- if (!propertiesData) { return false }
- length += propertiesData.length
- }
- // If will exists...
- if (will) {
- // It must be an object
- if (typeof will !== 'object') {
- stream.emit('error', new Error('Invalid will'))
- return false
- }
- // It must have topic typeof string
- if (!will.topic || typeof will.topic !== 'string') {
- stream.emit('error', new Error('Invalid will topic'))
- return false
- } else {
- length += Buffer.byteLength(will.topic) + 2
- }
- // Payload
- length += 2 // payload length
- if (will.payload) {
- if (will.payload.length >= 0) {
- if (typeof will.payload === 'string') {
- length += Buffer.byteLength(will.payload)
- } else {
- length += will.payload.length
- }
- } else {
- stream.emit('error', new Error('Invalid will payload'))
- return false
- }
- }
- // will properties
- var willProperties = {}
- if (protocolVersion === 5) {
- willProperties = getProperties(stream, will.properties)
- if (!willProperties) { return false }
- length += willProperties.length
- }
- }
- // Username
- let providedUsername = false
- if (username != null) {
- if (isStringOrBuffer(username)) {
- providedUsername = true
- length += Buffer.byteLength(username) + 2
- } else {
- stream.emit('error', new Error('Invalid username'))
- return false
- }
- }
- // Password
- if (password != null) {
- if (!providedUsername) {
- stream.emit('error', new Error('Username is required to use password'))
- return false
- }
- if (isStringOrBuffer(password)) {
- length += byteLength(password) + 2
- } else {
- stream.emit('error', new Error('Invalid password'))
- return false
- }
- }
- // Generate header
- stream.write(protocol.CONNECT_HEADER)
- // Generate length
- writeVarByteInt(stream, length)
- // Generate protocol ID
- writeStringOrBuffer(stream, protocolId)
- if (settings.bridgeMode) {
- protocolVersion += 128
- }
- stream.write(
- protocolVersion === 131
- ? protocol.VERSION131
- : protocolVersion === 132
- ? protocol.VERSION132
- : protocolVersion === 4
- ? protocol.VERSION4
- : protocolVersion === 5
- ? protocol.VERSION5
- : protocol.VERSION3
- )
- // Connect flags
- let flags = 0
- flags |= (username != null) ? protocol.USERNAME_MASK : 0
- flags |= (password != null) ? protocol.PASSWORD_MASK : 0
- flags |= (will && will.retain) ? protocol.WILL_RETAIN_MASK : 0
- flags |= (will && will.qos) ? will.qos << protocol.WILL_QOS_SHIFT : 0
- flags |= will ? protocol.WILL_FLAG_MASK : 0
- flags |= clean ? protocol.CLEAN_SESSION_MASK : 0
- stream.write(Buffer.from([flags]))
- // Keepalive
- writeNumber(stream, keepalive)
- // Properties
- if (protocolVersion === 5) {
- propertiesData.write()
- }
- // Client ID
- writeStringOrBuffer(stream, clientId)
- // Will
- if (will) {
- if (protocolVersion === 5) {
- willProperties.write()
- }
- writeString(stream, will.topic)
- writeStringOrBuffer(stream, will.payload)
- }
- // Username and password
- if (username != null) {
- writeStringOrBuffer(stream, username)
- }
- if (password != null) {
- writeStringOrBuffer(stream, password)
- }
- // This is a small packet that happens only once on a stream
- // We assume the stream is always free to receive more data after this
- return true
- }
- function connack (packet, stream, opts) {
- const version = opts ? opts.protocolVersion : 4
- const settings = packet || {}
- const rc = version === 5 ? settings.reasonCode : settings.returnCode
- const properties = settings.properties
- let length = 2 // length of rc and sessionHeader
- // Check return code
- if (typeof rc !== 'number') {
- stream.emit('error', new Error('Invalid return code'))
- return false
- }
- // mqtt5 properties
- let propertiesData = null
- if (version === 5) {
- propertiesData = getProperties(stream, properties)
- if (!propertiesData) { return false }
- length += propertiesData.length
- }
- stream.write(protocol.CONNACK_HEADER)
- // length
- writeVarByteInt(stream, length)
- stream.write(settings.sessionPresent ? protocol.SESSIONPRESENT_HEADER : zeroBuf)
- stream.write(Buffer.from([rc]))
- if (propertiesData != null) {
- propertiesData.write()
- }
- return true
- }
- function publish (packet, stream, opts) {
- debug('publish: packet: %o', packet)
- const version = opts ? opts.protocolVersion : 4
- const settings = packet || {}
- const qos = settings.qos || 0
- const retain = settings.retain ? protocol.RETAIN_MASK : 0
- const topic = settings.topic
- const payload = settings.payload || empty
- const id = settings.messageId
- const properties = settings.properties
- let length = 0
- // Topic must be a non-empty string or Buffer
- if (typeof topic === 'string') length += Buffer.byteLength(topic) + 2
- else if (Buffer.isBuffer(topic)) length += topic.length + 2
- else {
- stream.emit('error', new Error('Invalid topic'))
- return false
- }
- // Get the payload length
- if (!Buffer.isBuffer(payload)) length += Buffer.byteLength(payload)
- else length += payload.length
- // Message ID must a number if qos > 0
- if (qos && typeof id !== 'number') {
- stream.emit('error', new Error('Invalid messageId'))
- return false
- } else if (qos) length += 2
- // mqtt5 properties
- let propertiesData = null
- if (version === 5) {
- propertiesData = getProperties(stream, properties)
- if (!propertiesData) { return false }
- length += propertiesData.length
- }
- // Header
- stream.write(protocol.PUBLISH_HEADER[qos][settings.dup ? 1 : 0][retain ? 1 : 0])
- // Remaining length
- writeVarByteInt(stream, length)
- // Topic
- writeNumber(stream, byteLength(topic))
- stream.write(topic)
- // Message ID
- if (qos > 0) writeNumber(stream, id)
- // Properties
- if (propertiesData != null) {
- propertiesData.write()
- }
- // Payload
- debug('publish: payload: %o', payload)
- return stream.write(payload)
- }
- /* Puback, pubrec, pubrel and pubcomp */
- function confirmation (packet, stream, opts) {
- const version = opts ? opts.protocolVersion : 4
- const settings = packet || {}
- const type = settings.cmd || 'puback'
- const id = settings.messageId
- const dup = (settings.dup && type === 'pubrel') ? protocol.DUP_MASK : 0
- let qos = 0
- const reasonCode = settings.reasonCode
- const properties = settings.properties
- let length = version === 5 ? 3 : 2
- if (type === 'pubrel') qos = 1
- // Check message ID
- if (typeof id !== 'number') {
- stream.emit('error', new Error('Invalid messageId'))
- return false
- }
- // properies mqtt 5
- let propertiesData = null
- if (version === 5) {
- // Confirm should not add empty property length with no properties (rfc 3.4.2.2.1)
- if (typeof properties === 'object') {
- propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
- if (!propertiesData) { return false }
- length += propertiesData.length
- }
- }
- // Header
- stream.write(protocol.ACKS[type][qos][dup][0])
- // Length
- writeVarByteInt(stream, length)
- // Message ID
- writeNumber(stream, id)
- // reason code in header
- if (version === 5) {
- stream.write(Buffer.from([reasonCode]))
- }
- // properies mqtt 5
- if (propertiesData !== null) {
- propertiesData.write()
- }
- return true
- }
- function subscribe (packet, stream, opts) {
- debug('subscribe: packet: ')
- const version = opts ? opts.protocolVersion : 4
- const settings = packet || {}
- const dup = settings.dup ? protocol.DUP_MASK : 0
- const id = settings.messageId
- const subs = settings.subscriptions
- const properties = settings.properties
- let length = 0
- // Check message ID
- if (typeof id !== 'number') {
- stream.emit('error', new Error('Invalid messageId'))
- return false
- } else length += 2
- // properies mqtt 5
- let propertiesData = null
- if (version === 5) {
- propertiesData = getProperties(stream, properties)
- if (!propertiesData) { return false }
- length += propertiesData.length
- }
- // Check subscriptions
- if (typeof subs === 'object' && subs.length) {
- for (let i = 0; i < subs.length; i += 1) {
- const itopic = subs[i].topic
- const iqos = subs[i].qos
- if (typeof itopic !== 'string') {
- stream.emit('error', new Error('Invalid subscriptions - invalid topic'))
- return false
- }
- if (typeof iqos !== 'number') {
- stream.emit('error', new Error('Invalid subscriptions - invalid qos'))
- return false
- }
- if (version === 5) {
- const nl = subs[i].nl || false
- if (typeof nl !== 'boolean') {
- stream.emit('error', new Error('Invalid subscriptions - invalid No Local'))
- return false
- }
- const rap = subs[i].rap || false
- if (typeof rap !== 'boolean') {
- stream.emit('error', new Error('Invalid subscriptions - invalid Retain as Published'))
- return false
- }
- const rh = subs[i].rh || 0
- if (typeof rh !== 'number' || rh > 2) {
- stream.emit('error', new Error('Invalid subscriptions - invalid Retain Handling'))
- return false
- }
- }
- length += Buffer.byteLength(itopic) + 2 + 1
- }
- } else {
- stream.emit('error', new Error('Invalid subscriptions'))
- return false
- }
- // Generate header
- debug('subscribe: writing to stream: %o', protocol.SUBSCRIBE_HEADER)
- stream.write(protocol.SUBSCRIBE_HEADER[1][dup ? 1 : 0][0])
- // Generate length
- writeVarByteInt(stream, length)
- // Generate message ID
- writeNumber(stream, id)
- // properies mqtt 5
- if (propertiesData !== null) {
- propertiesData.write()
- }
- let result = true
- // Generate subs
- for (const sub of subs) {
- const jtopic = sub.topic
- const jqos = sub.qos
- const jnl = +sub.nl
- const jrap = +sub.rap
- const jrh = sub.rh
- let joptions
- // Write topic string
- writeString(stream, jtopic)
- // options process
- joptions = protocol.SUBSCRIBE_OPTIONS_QOS[jqos]
- if (version === 5) {
- joptions |= jnl ? protocol.SUBSCRIBE_OPTIONS_NL : 0
- joptions |= jrap ? protocol.SUBSCRIBE_OPTIONS_RAP : 0
- joptions |= jrh ? protocol.SUBSCRIBE_OPTIONS_RH[jrh] : 0
- }
- // Write options
- result = stream.write(Buffer.from([joptions]))
- }
- return result
- }
- function suback (packet, stream, opts) {
- const version = opts ? opts.protocolVersion : 4
- const settings = packet || {}
- const id = settings.messageId
- const granted = settings.granted
- const properties = settings.properties
- let length = 0
- // Check message ID
- if (typeof id !== 'number') {
- stream.emit('error', new Error('Invalid messageId'))
- return false
- } else length += 2
- // Check granted qos vector
- if (typeof granted === 'object' && granted.length) {
- for (let i = 0; i < granted.length; i += 1) {
- if (typeof granted[i] !== 'number') {
- stream.emit('error', new Error('Invalid qos vector'))
- return false
- }
- length += 1
- }
- } else {
- stream.emit('error', new Error('Invalid qos vector'))
- return false
- }
- // properies mqtt 5
- let propertiesData = null
- if (version === 5) {
- propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
- if (!propertiesData) { return false }
- length += propertiesData.length
- }
- // header
- stream.write(protocol.SUBACK_HEADER)
- // Length
- writeVarByteInt(stream, length)
- // Message ID
- writeNumber(stream, id)
- // properies mqtt 5
- if (propertiesData !== null) {
- propertiesData.write()
- }
- return stream.write(Buffer.from(granted))
- }
- function unsubscribe (packet, stream, opts) {
- const version = opts ? opts.protocolVersion : 4
- const settings = packet || {}
- const id = settings.messageId
- const dup = settings.dup ? protocol.DUP_MASK : 0
- const unsubs = settings.unsubscriptions
- const properties = settings.properties
- let length = 0
- // Check message ID
- if (typeof id !== 'number') {
- stream.emit('error', new Error('Invalid messageId'))
- return false
- } else {
- length += 2
- }
- // Check unsubs
- if (typeof unsubs === 'object' && unsubs.length) {
- for (let i = 0; i < unsubs.length; i += 1) {
- if (typeof unsubs[i] !== 'string') {
- stream.emit('error', new Error('Invalid unsubscriptions'))
- return false
- }
- length += Buffer.byteLength(unsubs[i]) + 2
- }
- } else {
- stream.emit('error', new Error('Invalid unsubscriptions'))
- return false
- }
- // properies mqtt 5
- let propertiesData = null
- if (version === 5) {
- propertiesData = getProperties(stream, properties)
- if (!propertiesData) { return false }
- length += propertiesData.length
- }
- // Header
- stream.write(protocol.UNSUBSCRIBE_HEADER[1][dup ? 1 : 0][0])
- // Length
- writeVarByteInt(stream, length)
- // Message ID
- writeNumber(stream, id)
- // properies mqtt 5
- if (propertiesData !== null) {
- propertiesData.write()
- }
- // Unsubs
- let result = true
- for (let j = 0; j < unsubs.length; j++) {
- result = writeString(stream, unsubs[j])
- }
- return result
- }
- function unsuback (packet, stream, opts) {
- const version = opts ? opts.protocolVersion : 4
- const settings = packet || {}
- const id = settings.messageId
- const dup = settings.dup ? protocol.DUP_MASK : 0
- const granted = settings.granted
- const properties = settings.properties
- const type = settings.cmd
- const qos = 0
- let length = 2
- // Check message ID
- if (typeof id !== 'number') {
- stream.emit('error', new Error('Invalid messageId'))
- return false
- }
- // Check granted
- if (version === 5) {
- if (typeof granted === 'object' && granted.length) {
- for (let i = 0; i < granted.length; i += 1) {
- if (typeof granted[i] !== 'number') {
- stream.emit('error', new Error('Invalid qos vector'))
- return false
- }
- length += 1
- }
- } else {
- stream.emit('error', new Error('Invalid qos vector'))
- return false
- }
- }
- // properies mqtt 5
- let propertiesData = null
- if (version === 5) {
- propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
- if (!propertiesData) { return false }
- length += propertiesData.length
- }
- // Header
- stream.write(protocol.ACKS[type][qos][dup][0])
- // Length
- writeVarByteInt(stream, length)
- // Message ID
- writeNumber(stream, id)
- // properies mqtt 5
- if (propertiesData !== null) {
- propertiesData.write()
- }
- // payload
- if (version === 5) {
- stream.write(Buffer.from(granted))
- }
- return true
- }
- function emptyPacket (packet, stream, opts) {
- return stream.write(protocol.EMPTY[packet.cmd])
- }
- function disconnect (packet, stream, opts) {
- const version = opts ? opts.protocolVersion : 4
- const settings = packet || {}
- const reasonCode = settings.reasonCode
- const properties = settings.properties
- let length = version === 5 ? 1 : 0
- // properies mqtt 5
- let propertiesData = null
- if (version === 5) {
- propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
- if (!propertiesData) { return false }
- length += propertiesData.length
- }
- // Header
- stream.write(Buffer.from([protocol.codes.disconnect << 4]))
- // Length
- writeVarByteInt(stream, length)
- // reason code in header
- if (version === 5) {
- stream.write(Buffer.from([reasonCode]))
- }
- // properies mqtt 5
- if (propertiesData !== null) {
- propertiesData.write()
- }
- return true
- }
- function auth (packet, stream, opts) {
- const version = opts ? opts.protocolVersion : 4
- const settings = packet || {}
- const reasonCode = settings.reasonCode
- const properties = settings.properties
- let length = version === 5 ? 1 : 0
- if (version !== 5) stream.emit('error', new Error('Invalid mqtt version for auth packet'))
- // properies mqtt 5
- const propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)
- if (!propertiesData) { return false }
- length += propertiesData.length
- // Header
- stream.write(Buffer.from([protocol.codes.auth << 4]))
- // Length
- writeVarByteInt(stream, length)
- // reason code in header
- stream.write(Buffer.from([reasonCode]))
- // properies mqtt 5
- if (propertiesData !== null) {
- propertiesData.write()
- }
- return true
- }
- /**
- * writeVarByteInt - write an MQTT style variable byte integer to the buffer
- *
- * @param <Buffer> buffer - destination
- * @param <Number> pos - offset
- * @param <Number> length - length (>0)
- * @returns <Number> number of bytes written
- *
- * @api private
- */
- const varByteIntCache = {}
- function writeVarByteInt (stream, num) {
- if (num > protocol.VARBYTEINT_MAX) {
- stream.emit('error', new Error(`Invalid variable byte integer: ${num}`))
- return false
- }
- let buffer = varByteIntCache[num]
- if (!buffer) {
- buffer = genBufVariableByteInt(num)
- if (num < 16384) varByteIntCache[num] = buffer
- }
- debug('writeVarByteInt: writing to stream: %o', buffer)
- return stream.write(buffer)
- }
- /**
- * writeString - write a utf8 string to the buffer
- *
- * @param <Buffer> buffer - destination
- * @param <Number> pos - offset
- * @param <String> string - string to write
- * @return <Number> number of bytes written
- *
- * @api private
- */
- function writeString (stream, string) {
- const strlen = Buffer.byteLength(string)
- writeNumber(stream, strlen)
- debug('writeString: %s', string)
- return stream.write(string, 'utf8')
- }
- /**
- * writeStringPair - write a utf8 string pairs to the buffer
- *
- * @param <Buffer> buffer - destination
- * @param <String> name - string name to write
- * @param <String> value - string value to write
- * @return <Number> number of bytes written
- *
- * @api private
- */
- function writeStringPair (stream, name, value) {
- writeString(stream, name)
- writeString(stream, value)
- }
- /**
- * writeNumber - write a two byte number to the buffer
- *
- * @param <Buffer> buffer - destination
- * @param <Number> pos - offset
- * @param <String> number - number to write
- * @return <Number> number of bytes written
- *
- * @api private
- */
- function writeNumberCached (stream, number) {
- debug('writeNumberCached: number: %d', number)
- debug('writeNumberCached: %o', numCache[number])
- return stream.write(numCache[number])
- }
- function writeNumberGenerated (stream, number) {
- const generatedNumber = generateNumber(number)
- debug('writeNumberGenerated: %o', generatedNumber)
- return stream.write(generatedNumber)
- }
- function write4ByteNumber (stream, number) {
- const generated4ByteBuffer = generate4ByteBuffer(number)
- debug('write4ByteNumber: %o', generated4ByteBuffer)
- return stream.write(generated4ByteBuffer)
- }
- /**
- * writeStringOrBuffer - write a String or Buffer with the its length prefix
- *
- * @param <Buffer> buffer - destination
- * @param <Number> pos - offset
- * @param <String> toWrite - String or Buffer
- * @return <Number> number of bytes written
- */
- function writeStringOrBuffer (stream, toWrite) {
- if (typeof toWrite === 'string') {
- writeString(stream, toWrite)
- } else if (toWrite) {
- writeNumber(stream, toWrite.length)
- stream.write(toWrite)
- } else writeNumber(stream, 0)
- }
- function getProperties (stream, properties) {
- /* connect properties */
- if (typeof properties !== 'object' || properties.length != null) {
- return {
- length: 1,
- write () {
- writeProperties(stream, {}, 0)
- }
- }
- }
- let propertiesLength = 0
- function getLengthProperty (name, value) {
- const type = protocol.propertiesTypes[name]
- let length = 0
- switch (type) {
- case 'byte': {
- if (typeof value !== 'boolean') {
- stream.emit('error', new Error(`Invalid ${name}: ${value}`))
- return false
- }
- length += 1 + 1
- break
- }
- case 'int8': {
- if (typeof value !== 'number' || value < 0 || value > 0xff) {
- stream.emit('error', new Error(`Invalid ${name}: ${value}`))
- return false
- }
- length += 1 + 1
- break
- }
- case 'binary': {
- if (value && value === null) {
- stream.emit('error', new Error(`Invalid ${name}: ${value}`))
- return false
- }
- length += 1 + Buffer.byteLength(value) + 2
- break
- }
- case 'int16': {
- if (typeof value !== 'number' || value < 0 || value > 0xffff) {
- stream.emit('error', new Error(`Invalid ${name}: ${value}`))
- return false
- }
- length += 1 + 2
- break
- }
- case 'int32': {
- if (typeof value !== 'number' || value < 0 || value > 0xffffffff) {
- stream.emit('error', new Error(`Invalid ${name}: ${value}`))
- return false
- }
- length += 1 + 4
- break
- }
- case 'var': {
- // var byte integer is max 24 bits packed in 32 bits
- if (typeof value !== 'number' || value < 0 || value > 0x0fffffff) {
- stream.emit('error', new Error(`Invalid ${name}: ${value}`))
- return false
- }
- length += 1 + Buffer.byteLength(genBufVariableByteInt(value))
- break
- }
- case 'string': {
- if (typeof value !== 'string') {
- stream.emit('error', new Error(`Invalid ${name}: ${value}`))
- return false
- }
- length += 1 + 2 + Buffer.byteLength(value.toString())
- break
- }
- case 'pair': {
- if (typeof value !== 'object') {
- stream.emit('error', new Error(`Invalid ${name}: ${value}`))
- return false
- }
- length += Object.getOwnPropertyNames(value).reduce((result, name) => {
- const currentValue = value[name]
- if (Array.isArray(currentValue)) {
- result += currentValue.reduce((currentLength, value) => {
- currentLength += 1 + 2 + Buffer.byteLength(name.toString()) + 2 + Buffer.byteLength(value.toString())
- return currentLength
- }, 0)
- } else {
- result += 1 + 2 + Buffer.byteLength(name.toString()) + 2 + Buffer.byteLength(value[name].toString())
- }
- return result
- }, 0)
- break
- }
- default: {
- stream.emit('error', new Error(`Invalid property ${name}: ${value}`))
- return false
- }
- }
- return length
- }
- if (properties) {
- for (const propName in properties) {
- let propLength = 0
- let propValueLength = 0
- const propValue = properties[propName]
- if (Array.isArray(propValue)) {
- for (let valueIndex = 0; valueIndex < propValue.length; valueIndex++) {
- propValueLength = getLengthProperty(propName, propValue[valueIndex])
- if (!propValueLength) { return false }
- propLength += propValueLength
- }
- } else {
- propValueLength = getLengthProperty(propName, propValue)
- if (!propValueLength) { return false }
- propLength = propValueLength
- }
- if (!propLength) return false
- propertiesLength += propLength
- }
- }
- const propertiesLengthLength = Buffer.byteLength(genBufVariableByteInt(propertiesLength))
- return {
- length: propertiesLengthLength + propertiesLength,
- write () {
- writeProperties(stream, properties, propertiesLength)
- }
- }
- }
- function getPropertiesByMaximumPacketSize (stream, properties, opts, length) {
- const mayEmptyProps = ['reasonString', 'userProperties']
- const maximumPacketSize = opts && opts.properties && opts.properties.maximumPacketSize ? opts.properties.maximumPacketSize : 0
- let propertiesData = getProperties(stream, properties)
- if (maximumPacketSize) {
- while (length + propertiesData.length > maximumPacketSize) {
- const currentMayEmptyProp = mayEmptyProps.shift()
- if (currentMayEmptyProp && properties[currentMayEmptyProp]) {
- delete properties[currentMayEmptyProp]
- propertiesData = getProperties(stream, properties)
- } else {
- return false
- }
- }
- }
- return propertiesData
- }
- function writeProperty (stream, propName, value) {
- const type = protocol.propertiesTypes[propName]
- switch (type) {
- case 'byte': {
- stream.write(Buffer.from([protocol.properties[propName]]))
- stream.write(Buffer.from([+value]))
- break
- }
- case 'int8': {
- stream.write(Buffer.from([protocol.properties[propName]]))
- stream.write(Buffer.from([value]))
- break
- }
- case 'binary': {
- stream.write(Buffer.from([protocol.properties[propName]]))
- writeStringOrBuffer(stream, value)
- break
- }
- case 'int16': {
- stream.write(Buffer.from([protocol.properties[propName]]))
- writeNumber(stream, value)
- break
- }
- case 'int32': {
- stream.write(Buffer.from([protocol.properties[propName]]))
- write4ByteNumber(stream, value)
- break
- }
- case 'var': {
- stream.write(Buffer.from([protocol.properties[propName]]))
- writeVarByteInt(stream, value)
- break
- }
- case 'string': {
- stream.write(Buffer.from([protocol.properties[propName]]))
- writeString(stream, value)
- break
- }
- case 'pair': {
- Object.getOwnPropertyNames(value).forEach(name => {
- const currentValue = value[name]
- if (Array.isArray(currentValue)) {
- currentValue.forEach(value => {
- stream.write(Buffer.from([protocol.properties[propName]]))
- writeStringPair(stream, name.toString(), value.toString())
- })
- } else {
- stream.write(Buffer.from([protocol.properties[propName]]))
- writeStringPair(stream, name.toString(), currentValue.toString())
- }
- })
- break
- }
- default: {
- stream.emit('error', new Error(`Invalid property ${propName} value: ${value}`))
- return false
- }
- }
- }
- function writeProperties (stream, properties, propertiesLength) {
- /* write properties to stream */
- writeVarByteInt(stream, propertiesLength)
- for (const propName in properties) {
- if (Object.prototype.hasOwnProperty.call(properties, propName) && properties[propName] !== null) {
- const value = properties[propName]
- if (Array.isArray(value)) {
- for (let valueIndex = 0; valueIndex < value.length; valueIndex++) {
- writeProperty(stream, propName, value[valueIndex])
- }
- } else {
- writeProperty(stream, propName, value)
- }
- }
- }
- }
- function byteLength (bufOrString) {
- if (!bufOrString) return 0
- else if (bufOrString instanceof Buffer) return bufOrString.length
- else return Buffer.byteLength(bufOrString)
- }
- function isStringOrBuffer (field) {
- return typeof field === 'string' || field instanceof Buffer
- }
- module.exports = generate
|