wx.js 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. 'use strict'
  2. var Transform = require('readable-stream').Transform
  3. var duplexify = require('duplexify')
  4. /* global wx */
  5. var socketTask
  6. var proxy
  7. var stream
  8. function buildProxy () {
  9. var proxy = new Transform()
  10. proxy._write = function (chunk, encoding, next) {
  11. socketTask.send({
  12. data: chunk.buffer,
  13. success: function () {
  14. next()
  15. },
  16. fail: function (errMsg) {
  17. next(new Error(errMsg))
  18. }
  19. })
  20. }
  21. proxy._flush = function socketEnd (done) {
  22. socketTask.close({
  23. success: function () {
  24. done()
  25. }
  26. })
  27. }
  28. return proxy
  29. }
  30. function setDefaultOpts (opts) {
  31. if (!opts.hostname) {
  32. opts.hostname = 'localhost'
  33. }
  34. if (!opts.path) {
  35. opts.path = '/'
  36. }
  37. if (!opts.wsOptions) {
  38. opts.wsOptions = {}
  39. }
  40. }
  41. function buildUrl (opts, client) {
  42. var protocol = opts.protocol === 'wxs' ? 'wss' : 'ws'
  43. var url = protocol + '://' + opts.hostname + opts.path
  44. if (opts.port && opts.port !== 80 && opts.port !== 443) {
  45. url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path
  46. }
  47. if (typeof (opts.transformWsUrl) === 'function') {
  48. url = opts.transformWsUrl(url, opts, client)
  49. }
  50. return url
  51. }
  52. function bindEventHandler () {
  53. socketTask.onOpen(function () {
  54. stream.setReadable(proxy)
  55. stream.setWritable(proxy)
  56. stream.emit('connect')
  57. })
  58. socketTask.onMessage(function (res) {
  59. var data = res.data
  60. if (data instanceof ArrayBuffer) data = Buffer.from(data)
  61. else data = Buffer.from(data, 'utf8')
  62. proxy.push(data)
  63. })
  64. socketTask.onClose(function () {
  65. stream.end()
  66. stream.destroy()
  67. })
  68. socketTask.onError(function (res) {
  69. stream.destroy(new Error(res.errMsg))
  70. })
  71. }
  72. function buildStream (client, opts) {
  73. opts.hostname = opts.hostname || opts.host
  74. if (!opts.hostname) {
  75. throw new Error('Could not determine host. Specify host manually.')
  76. }
  77. var websocketSubProtocol =
  78. (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
  79. ? 'mqttv3.1'
  80. : 'mqtt'
  81. setDefaultOpts(opts)
  82. var url = buildUrl(opts, client)
  83. socketTask = wx.connectSocket({
  84. url: url,
  85. protocols: websocketSubProtocol
  86. })
  87. proxy = buildProxy()
  88. stream = duplexify.obj()
  89. stream._destroy = function (err, cb) {
  90. socketTask.close({
  91. success: function () {
  92. cb && cb(err)
  93. }
  94. })
  95. }
  96. var destroyRef = stream.destroy
  97. stream.destroy = function () {
  98. stream.destroy = destroyRef
  99. var self = this
  100. process.nextTick(function () {
  101. socketTask.close({
  102. fail: function () {
  103. self._destroy(new Error())
  104. }
  105. })
  106. })
  107. }.bind(stream)
  108. bindEventHandler()
  109. return stream
  110. }
  111. module.exports = buildStream