123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- 'use strict'
- var Transform = require('readable-stream').Transform
- var duplexify = require('duplexify')
- /* global wx */
- var socketTask
- var proxy
- var stream
- function buildProxy () {
- var proxy = new Transform()
- proxy._write = function (chunk, encoding, next) {
- socketTask.send({
- data: chunk.buffer,
- success: function () {
- next()
- },
- fail: function (errMsg) {
- next(new Error(errMsg))
- }
- })
- }
- proxy._flush = function socketEnd (done) {
- socketTask.close({
- success: function () {
- done()
- }
- })
- }
- return proxy
- }
- function setDefaultOpts (opts) {
- if (!opts.hostname) {
- opts.hostname = 'localhost'
- }
- if (!opts.path) {
- opts.path = '/'
- }
- if (!opts.wsOptions) {
- opts.wsOptions = {}
- }
- }
- function buildUrl (opts, client) {
- var protocol = opts.protocol === 'wxs' ? 'wss' : 'ws'
- var url = protocol + '://' + opts.hostname + opts.path
- if (opts.port && opts.port !== 80 && opts.port !== 443) {
- url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path
- }
- if (typeof (opts.transformWsUrl) === 'function') {
- url = opts.transformWsUrl(url, opts, client)
- }
- return url
- }
- function bindEventHandler () {
- socketTask.onOpen(function () {
- stream.setReadable(proxy)
- stream.setWritable(proxy)
- stream.emit('connect')
- })
- socketTask.onMessage(function (res) {
- var data = res.data
- if (data instanceof ArrayBuffer) data = Buffer.from(data)
- else data = Buffer.from(data, 'utf8')
- proxy.push(data)
- })
- socketTask.onClose(function () {
- stream.end()
- stream.destroy()
- })
- socketTask.onError(function (res) {
- stream.destroy(new Error(res.errMsg))
- })
- }
- function buildStream (client, opts) {
- opts.hostname = opts.hostname || opts.host
- if (!opts.hostname) {
- throw new Error('Could not determine host. Specify host manually.')
- }
- var websocketSubProtocol =
- (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
- ? 'mqttv3.1'
- : 'mqtt'
- setDefaultOpts(opts)
- var url = buildUrl(opts, client)
- socketTask = wx.connectSocket({
- url: url,
- protocols: websocketSubProtocol
- })
- proxy = buildProxy()
- stream = duplexify.obj()
- stream._destroy = function (err, cb) {
- socketTask.close({
- success: function () {
- cb && cb(err)
- }
- })
- }
- var destroyRef = stream.destroy
- stream.destroy = function () {
- stream.destroy = destroyRef
- var self = this
- process.nextTick(function () {
- socketTask.close({
- fail: function () {
- self._destroy(new Error())
- }
- })
- })
- }.bind(stream)
- bindEventHandler()
- return stream
- }
- module.exports = buildStream
|