123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- 'use strict'
- var Transform = require('readable-stream').Transform
- var duplexify = require('duplexify')
- var base64 = require('base64-js')
- /* global FileReader */
- var my
- var proxy
- var stream
- var isInitialized = false
- function buildProxy () {
- var proxy = new Transform()
- proxy._write = function (chunk, encoding, next) {
- my.sendSocketMessage({
- data: chunk.buffer,
- success: function () {
- next()
- },
- fail: function () {
- next(new Error())
- }
- })
- }
- proxy._flush = function socketEnd (done) {
- my.closeSocket({
- success: function () {
- done()
- }
- })
- }
- return proxy
- }
- function setDefaultOpts (opts) {
- if (!opts.hostname) {
- opts.hostname = 'localhost'
- }
- if (!opts.path) {
- opts.path = '/'
- }
- if (!opts.wsOptions) {
- opts.wsOptions = {}
- }
- }
- function buildUrl (opts, client) {
- var protocol = opts.protocol === 'alis' ? 'wss' : 'ws'
- var url = protocol + '://' + opts.hostname + opts.path
- if (opts.port && opts.port !== 80 && opts.port !== 443) {
- url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path
- }
- if (typeof (opts.transformWsUrl) === 'function') {
- url = opts.transformWsUrl(url, opts, client)
- }
- return url
- }
- function bindEventHandler () {
- if (isInitialized) return
- isInitialized = true
- my.onSocketOpen(function () {
- stream.setReadable(proxy)
- stream.setWritable(proxy)
- stream.emit('connect')
- })
- my.onSocketMessage(function (res) {
- if (typeof res.data === 'string') {
- var array = base64.toByteArray(res.data)
- var buffer = Buffer.from(array)
- proxy.push(buffer)
- } else {
- var reader = new FileReader()
- reader.addEventListener('load', function () {
- var data = reader.result
- if (data instanceof ArrayBuffer) data = Buffer.from(data)
- else data = Buffer.from(data, 'utf8')
- proxy.push(data)
- })
- reader.readAsArrayBuffer(res.data)
- }
- })
- my.onSocketClose(function () {
- stream.end()
- stream.destroy()
- })
- my.onSocketError(function (res) {
- stream.destroy(res)
- })
- }
- function buildStream (client, opts) {
- opts.hostname = opts.hostname || opts.host
- if (!opts.hostname) {
- throw new Error('Could not determine host. Specify host manually.')
- }
- var websocketSubProtocol =
- (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
- ? 'mqttv3.1'
- : 'mqtt'
- setDefaultOpts(opts)
- var url = buildUrl(opts, client)
- my = opts.my
- my.connectSocket({
- url: url,
- protocols: websocketSubProtocol
- })
- proxy = buildProxy()
- stream = duplexify.obj()
- bindEventHandler()
- return stream
- }
- module.exports = buildStream
|