ali.js 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. 'use strict'
  2. var Transform = require('readable-stream').Transform
  3. var duplexify = require('duplexify')
  4. var base64 = require('base64-js')
  5. /* global FileReader */
  6. var my
  7. var proxy
  8. var stream
  9. var isInitialized = false
  10. function buildProxy () {
  11. var proxy = new Transform()
  12. proxy._write = function (chunk, encoding, next) {
  13. my.sendSocketMessage({
  14. data: chunk.buffer,
  15. success: function () {
  16. next()
  17. },
  18. fail: function () {
  19. next(new Error())
  20. }
  21. })
  22. }
  23. proxy._flush = function socketEnd (done) {
  24. my.closeSocket({
  25. success: function () {
  26. done()
  27. }
  28. })
  29. }
  30. return proxy
  31. }
  32. function setDefaultOpts (opts) {
  33. if (!opts.hostname) {
  34. opts.hostname = 'localhost'
  35. }
  36. if (!opts.path) {
  37. opts.path = '/'
  38. }
  39. if (!opts.wsOptions) {
  40. opts.wsOptions = {}
  41. }
  42. }
  43. function buildUrl (opts, client) {
  44. var protocol = opts.protocol === 'alis' ? 'wss' : 'ws'
  45. var url = protocol + '://' + opts.hostname + opts.path
  46. if (opts.port && opts.port !== 80 && opts.port !== 443) {
  47. url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path
  48. }
  49. if (typeof (opts.transformWsUrl) === 'function') {
  50. url = opts.transformWsUrl(url, opts, client)
  51. }
  52. return url
  53. }
  54. function bindEventHandler () {
  55. if (isInitialized) return
  56. isInitialized = true
  57. my.onSocketOpen(function () {
  58. stream.setReadable(proxy)
  59. stream.setWritable(proxy)
  60. stream.emit('connect')
  61. })
  62. my.onSocketMessage(function (res) {
  63. if (typeof res.data === 'string') {
  64. var array = base64.toByteArray(res.data)
  65. var buffer = Buffer.from(array)
  66. proxy.push(buffer)
  67. } else {
  68. var reader = new FileReader()
  69. reader.addEventListener('load', function () {
  70. var data = reader.result
  71. if (data instanceof ArrayBuffer) data = Buffer.from(data)
  72. else data = Buffer.from(data, 'utf8')
  73. proxy.push(data)
  74. })
  75. reader.readAsArrayBuffer(res.data)
  76. }
  77. })
  78. my.onSocketClose(function () {
  79. stream.end()
  80. stream.destroy()
  81. })
  82. my.onSocketError(function (res) {
  83. stream.destroy(res)
  84. })
  85. }
  86. function buildStream (client, opts) {
  87. opts.hostname = opts.hostname || opts.host
  88. if (!opts.hostname) {
  89. throw new Error('Could not determine host. Specify host manually.')
  90. }
  91. var websocketSubProtocol =
  92. (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
  93. ? 'mqttv3.1'
  94. : 'mqtt'
  95. setDefaultOpts(opts)
  96. var url = buildUrl(opts, client)
  97. my = opts.my
  98. my.connectSocket({
  99. url: url,
  100. protocols: websocketSubProtocol
  101. })
  102. proxy = buildProxy()
  103. stream = duplexify.obj()
  104. bindEventHandler()
  105. return stream
  106. }
  107. module.exports = buildStream