'use strict' /** * Module dependencies */ var xtend = require('xtend') var Readable = require('readable-stream').Readable var streamsOpts = { objectMode: true } var defaultStoreOptions = { clean: true } /** * es6-map can preserve insertion order even if ES version is older. * * https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Map#Description * It should be noted that a Map which is a map of an object, especially * a dictionary of dictionaries, will only map to the object's insertion * order. In ES2015 this is ordered for objects but for older versions of * ES, this may be random and not ordered. * */ var Map = require('es6-map') /** * In-memory implementation of the message store * This can actually be saved into files. * * @param {Object} [options] - store options */ function Store (options) { if (!(this instanceof Store)) { return new Store(options) } this.options = options || {} // Defaults this.options = xtend(defaultStoreOptions, options) this._inflights = new Map() } /** * Adds a packet to the store, a packet is * anything that has a messageId property. * */ Store.prototype.put = function (packet, cb) { this._inflights.set(packet.messageId, packet) if (cb) { cb() } return this } /** * Creates a stream with all the packets in the store * */ Store.prototype.createStream = function () { var stream = new Readable(streamsOpts) var destroyed = false var values = [] var i = 0 this._inflights.forEach(function (value, key) { values.push(value) }) stream._read = function () { if (!destroyed && i < values.length) { this.push(values[i++]) } else { this.push(null) } } stream.destroy = function () { if (destroyed) { return } var self = this destroyed = true process.nextTick(function () { self.emit('close') }) } return stream } /** * deletes a packet from the store. */ Store.prototype.del = function (packet, cb) { packet = this._inflights.get(packet.messageId) if (packet) { this._inflights.delete(packet.messageId) cb(null, packet) } else if (cb) { cb(new Error('missing packet')) } return this } /** * get a packet from the store. */ Store.prototype.get = function (packet, cb) { packet = this._inflights.get(packet.messageId) if (packet) { cb(null, packet) } else if (cb) { cb(new Error('missing packet')) } return this } /** * Close the store */ Store.prototype.close = function (cb) { if (this.options.clean) { this._inflights = null } if (cb) { cb() } } module.exports = Store