server.js 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. 'use strict'
  2. var handleClient
  3. var websocket = require('websocket-stream')
  4. var WebSocketServer = require('ws').Server
  5. var Connection = require('mqtt-connection')
  6. var http = require('http')
  7. handleClient = function (client) {
  8. var self = this
  9. if (!self.clients) {
  10. self.clients = {}
  11. }
  12. client.on('connect', function (packet) {
  13. if (packet.clientId === 'invalid') {
  14. client.connack({returnCode: 2})
  15. } else {
  16. client.connack({returnCode: 0})
  17. }
  18. self.clients[packet.clientId] = client
  19. client.subscriptions = []
  20. })
  21. client.on('publish', function (packet) {
  22. var i, k, c, s, publish
  23. switch (packet.qos) {
  24. case 0:
  25. break
  26. case 1:
  27. client.puback(packet)
  28. break
  29. case 2:
  30. client.pubrec(packet)
  31. break
  32. }
  33. for (k in self.clients) {
  34. c = self.clients[k]
  35. publish = false
  36. for (i = 0; i < c.subscriptions.length; i++) {
  37. s = c.subscriptions[i]
  38. if (s.test(packet.topic)) {
  39. publish = true
  40. }
  41. }
  42. if (publish) {
  43. try {
  44. c.publish({topic: packet.topic, payload: packet.payload})
  45. } catch (error) {
  46. delete self.clients[k]
  47. }
  48. }
  49. }
  50. })
  51. client.on('pubrel', function (packet) {
  52. client.pubcomp(packet)
  53. })
  54. client.on('pubrec', function (packet) {
  55. client.pubrel(packet)
  56. })
  57. client.on('pubcomp', function () {
  58. // Nothing to be done
  59. })
  60. client.on('subscribe', function (packet) {
  61. var qos
  62. var topic
  63. var reg
  64. var granted = []
  65. for (var i = 0; i < packet.subscriptions.length; i++) {
  66. qos = packet.subscriptions[i].qos
  67. topic = packet.subscriptions[i].topic
  68. reg = new RegExp(topic.replace('+', '[^/]+').replace('#', '.+') + '$')
  69. granted.push(qos)
  70. client.subscriptions.push(reg)
  71. }
  72. client.suback({messageId: packet.messageId, granted: granted})
  73. })
  74. client.on('unsubscribe', function (packet) {
  75. client.unsuback(packet)
  76. })
  77. client.on('pingreq', function () {
  78. client.pingresp()
  79. })
  80. }
  81. function start (startPort, done) {
  82. var server = http.createServer()
  83. var wss = new WebSocketServer({server: server})
  84. wss.on('connection', function (ws) {
  85. var stream, connection
  86. if (!(ws.protocol === 'mqtt' ||
  87. ws.protocol === 'mqttv3.1')) {
  88. return ws.close()
  89. }
  90. stream = websocket(ws)
  91. connection = new Connection(stream)
  92. handleClient.call(server, connection)
  93. })
  94. server.listen(startPort, done)
  95. server.on('request', function (req, res) {
  96. res.statusCode = 404
  97. res.end('Not Found')
  98. })
  99. return server
  100. }
  101. if (require.main === module) {
  102. start(process.env.PORT || process.env.ZUUL_PORT, function (err) {
  103. if (err) {
  104. console.error(err)
  105. return
  106. }
  107. console.log('tunnelled server started on port', process.env.PORT || process.env.ZUUL_PORT)
  108. })
  109. }