123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- 'use strict'
- /**
- * Module dependencies
- */
- var xtend = require('xtend')
- var Readable = require('readable-stream').Readable
- var streamsOpts = { objectMode: true }
- var defaultStoreOptions = {
- clean: true
- }
- /**
- * es6-map can preserve insertion order even if ES version is older.
- *
- * https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Map#Description
- * It should be noted that a Map which is a map of an object, especially
- * a dictionary of dictionaries, will only map to the object's insertion
- * order. In ES2015 this is ordered for objects but for older versions of
- * ES, this may be random and not ordered.
- *
- */
- var Map = require('es6-map')
- /**
- * In-memory implementation of the message store
- * This can actually be saved into files.
- *
- * @param {Object} [options] - store options
- */
- function Store (options) {
- if (!(this instanceof Store)) {
- return new Store(options)
- }
- this.options = options || {}
- // Defaults
- this.options = xtend(defaultStoreOptions, options)
- this._inflights = new Map()
- }
- /**
- * Adds a packet to the store, a packet is
- * anything that has a messageId property.
- *
- */
- Store.prototype.put = function (packet, cb) {
- this._inflights.set(packet.messageId, packet)
- if (cb) {
- cb()
- }
- return this
- }
- /**
- * Creates a stream with all the packets in the store
- *
- */
- Store.prototype.createStream = function () {
- var stream = new Readable(streamsOpts)
- var destroyed = false
- var values = []
- var i = 0
- this._inflights.forEach(function (value, key) {
- values.push(value)
- })
- stream._read = function () {
- if (!destroyed && i < values.length) {
- this.push(values[i++])
- } else {
- this.push(null)
- }
- }
- stream.destroy = function () {
- if (destroyed) {
- return
- }
- var self = this
- destroyed = true
- process.nextTick(function () {
- self.emit('close')
- })
- }
- return stream
- }
- /**
- * deletes a packet from the store.
- */
- Store.prototype.del = function (packet, cb) {
- packet = this._inflights.get(packet.messageId)
- if (packet) {
- this._inflights.delete(packet.messageId)
- cb(null, packet)
- } else if (cb) {
- cb(new Error('missing packet'))
- }
- return this
- }
- /**
- * get a packet from the store.
- */
- Store.prototype.get = function (packet, cb) {
- packet = this._inflights.get(packet.messageId)
- if (packet) {
- cb(null, packet)
- } else if (cb) {
- cb(new Error('missing packet'))
- }
- return this
- }
- /**
- * Close the store
- */
- Store.prototype.close = function (cb) {
- if (this.options.clean) {
- this._inflights = null
- }
- if (cb) {
- cb()
- }
- }
- module.exports = Store
|