store.js 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. 'use strict'
  2. /**
  3. * Module dependencies
  4. */
  5. var xtend = require('xtend')
  6. var Readable = require('readable-stream').Readable
  7. var streamsOpts = { objectMode: true }
  8. var defaultStoreOptions = {
  9. clean: true
  10. }
  11. /**
  12. * es6-map can preserve insertion order even if ES version is older.
  13. *
  14. * https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Map#Description
  15. * It should be noted that a Map which is a map of an object, especially
  16. * a dictionary of dictionaries, will only map to the object's insertion
  17. * order. In ES2015 this is ordered for objects but for older versions of
  18. * ES, this may be random and not ordered.
  19. *
  20. */
  21. var Map = require('es6-map')
  22. /**
  23. * In-memory implementation of the message store
  24. * This can actually be saved into files.
  25. *
  26. * @param {Object} [options] - store options
  27. */
  28. function Store (options) {
  29. if (!(this instanceof Store)) {
  30. return new Store(options)
  31. }
  32. this.options = options || {}
  33. // Defaults
  34. this.options = xtend(defaultStoreOptions, options)
  35. this._inflights = new Map()
  36. }
  37. /**
  38. * Adds a packet to the store, a packet is
  39. * anything that has a messageId property.
  40. *
  41. */
  42. Store.prototype.put = function (packet, cb) {
  43. this._inflights.set(packet.messageId, packet)
  44. if (cb) {
  45. cb()
  46. }
  47. return this
  48. }
  49. /**
  50. * Creates a stream with all the packets in the store
  51. *
  52. */
  53. Store.prototype.createStream = function () {
  54. var stream = new Readable(streamsOpts)
  55. var destroyed = false
  56. var values = []
  57. var i = 0
  58. this._inflights.forEach(function (value, key) {
  59. values.push(value)
  60. })
  61. stream._read = function () {
  62. if (!destroyed && i < values.length) {
  63. this.push(values[i++])
  64. } else {
  65. this.push(null)
  66. }
  67. }
  68. stream.destroy = function () {
  69. if (destroyed) {
  70. return
  71. }
  72. var self = this
  73. destroyed = true
  74. process.nextTick(function () {
  75. self.emit('close')
  76. })
  77. }
  78. return stream
  79. }
  80. /**
  81. * deletes a packet from the store.
  82. */
  83. Store.prototype.del = function (packet, cb) {
  84. packet = this._inflights.get(packet.messageId)
  85. if (packet) {
  86. this._inflights.delete(packet.messageId)
  87. cb(null, packet)
  88. } else if (cb) {
  89. cb(new Error('missing packet'))
  90. }
  91. return this
  92. }
  93. /**
  94. * get a packet from the store.
  95. */
  96. Store.prototype.get = function (packet, cb) {
  97. packet = this._inflights.get(packet.messageId)
  98. if (packet) {
  99. cb(null, packet)
  100. } else if (cb) {
  101. cb(new Error('missing packet'))
  102. }
  103. return this
  104. }
  105. /**
  106. * Close the store
  107. */
  108. Store.prototype.close = function (cb) {
  109. if (this.options.clean) {
  110. this._inflights = null
  111. }
  112. if (cb) {
  113. cb()
  114. }
  115. }
  116. module.exports = Store