123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- 'use strict'
- var handleClient
- var websocket = require('websocket-stream')
- var WebSocketServer = require('ws').Server
- var Connection = require('mqtt-connection')
- var http = require('http')
- handleClient = function (client) {
- var self = this
- if (!self.clients) {
- self.clients = {}
- }
- client.on('connect', function (packet) {
- if (packet.clientId === 'invalid') {
- client.connack({returnCode: 2})
- } else {
- client.connack({returnCode: 0})
- }
- self.clients[packet.clientId] = client
- client.subscriptions = []
- })
- client.on('publish', function (packet) {
- var i, k, c, s, publish
- switch (packet.qos) {
- case 0:
- break
- case 1:
- client.puback(packet)
- break
- case 2:
- client.pubrec(packet)
- break
- }
- for (k in self.clients) {
- c = self.clients[k]
- publish = false
- for (i = 0; i < c.subscriptions.length; i++) {
- s = c.subscriptions[i]
- if (s.test(packet.topic)) {
- publish = true
- }
- }
- if (publish) {
- try {
- c.publish({topic: packet.topic, payload: packet.payload})
- } catch (error) {
- delete self.clients[k]
- }
- }
- }
- })
- client.on('pubrel', function (packet) {
- client.pubcomp(packet)
- })
- client.on('pubrec', function (packet) {
- client.pubrel(packet)
- })
- client.on('pubcomp', function () {
- // Nothing to be done
- })
- client.on('subscribe', function (packet) {
- var qos
- var topic
- var reg
- var granted = []
- for (var i = 0; i < packet.subscriptions.length; i++) {
- qos = packet.subscriptions[i].qos
- topic = packet.subscriptions[i].topic
- reg = new RegExp(topic.replace('+', '[^/]+').replace('#', '.+') + '$')
- granted.push(qos)
- client.subscriptions.push(reg)
- }
- client.suback({messageId: packet.messageId, granted: granted})
- })
- client.on('unsubscribe', function (packet) {
- client.unsuback(packet)
- })
- client.on('pingreq', function () {
- client.pingresp()
- })
- }
- function start (startPort, done) {
- var server = http.createServer()
- var wss = new WebSocketServer({server: server})
- wss.on('connection', function (ws) {
- var stream, connection
- if (!(ws.protocol === 'mqtt' ||
- ws.protocol === 'mqttv3.1')) {
- return ws.close()
- }
- stream = websocket(ws)
- connection = new Connection(stream)
- handleClient.call(server, connection)
- })
- server.listen(startPort, done)
- server.on('request', function (req, res) {
- res.statusCode = 404
- res.end('Not Found')
- })
- return server
- }
- if (require.main === module) {
- start(process.env.PORT || process.env.ZUUL_PORT, function (err) {
- if (err) {
- console.error(err)
- return
- }
- console.log('tunnelled server started on port', process.env.PORT || process.env.ZUUL_PORT)
- })
- }
|