123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- 'use strict'
- var MqttClient = require('../client')
- var Store = require('../store')
- var url = require('url')
- var xtend = require('xtend')
- var protocols = {}
- if (process.title !== 'browser') {
- protocols.mqtt = require('./tcp')
- protocols.tcp = require('./tcp')
- protocols.ssl = require('./tls')
- protocols.tls = require('./tls')
- protocols.mqtts = require('./tls')
- } else {
- protocols.wx = require('./wx')
- protocols.wxs = require('./wx')
- protocols.ali = require('./ali')
- protocols.alis = require('./ali')
- }
- protocols.ws = require('./ws')
- protocols.wss = require('./ws')
- /**
- * Parse the auth attribute and merge username and password in the options object.
- *
- * @param {Object} [opts] option object
- */
- function parseAuthOptions (opts) {
- var matches
- if (opts.auth) {
- matches = opts.auth.match(/^(.+):(.+)$/)
- if (matches) {
- opts.username = matches[1]
- opts.password = matches[2]
- } else {
- opts.username = opts.auth
- }
- }
- }
- /**
- * connect - connect to an MQTT broker.
- *
- * @param {String} [brokerUrl] - url of the broker, optional
- * @param {Object} opts - see MqttClient#constructor
- */
- function connect (brokerUrl, opts) {
- if ((typeof brokerUrl === 'object') && !opts) {
- opts = brokerUrl
- brokerUrl = null
- }
- opts = opts || {}
- if (brokerUrl) {
- var parsed = url.parse(brokerUrl, true)
- if (parsed.port != null) {
- parsed.port = Number(parsed.port)
- }
- opts = xtend(parsed, opts)
- if (opts.protocol === null) {
- throw new Error('Missing protocol')
- }
- opts.protocol = opts.protocol.replace(/:$/, '')
- }
- // merge in the auth options if supplied
- parseAuthOptions(opts)
- // support clientId passed in the query string of the url
- if (opts.query && typeof opts.query.clientId === 'string') {
- opts.clientId = opts.query.clientId
- }
- if (opts.cert && opts.key) {
- if (opts.protocol) {
- if (['mqtts', 'wss', 'wxs', 'alis'].indexOf(opts.protocol) === -1) {
- switch (opts.protocol) {
- case 'mqtt':
- opts.protocol = 'mqtts'
- break
- case 'ws':
- opts.protocol = 'wss'
- break
- case 'wx':
- opts.protocol = 'wxs'
- break
- case 'ali':
- opts.protocol = 'alis'
- break
- default:
- throw new Error('Unknown protocol for secure connection: "' + opts.protocol + '"!')
- }
- }
- } else {
- // don't know what protocol he want to use, mqtts or wss
- throw new Error('Missing secure protocol key')
- }
- }
- if (!protocols[opts.protocol]) {
- var isSecure = ['mqtts', 'wss'].indexOf(opts.protocol) !== -1
- opts.protocol = [
- 'mqtt',
- 'mqtts',
- 'ws',
- 'wss',
- 'wx',
- 'wxs',
- 'ali',
- 'alis'
- ].filter(function (key, index) {
- if (isSecure && index % 2 === 0) {
- // Skip insecure protocols when requesting a secure one.
- return false
- }
- return (typeof protocols[key] === 'function')
- })[0]
- }
- if (opts.clean === false && !opts.clientId) {
- throw new Error('Missing clientId for unclean clients')
- }
- if (opts.protocol) {
- opts.defaultProtocol = opts.protocol
- }
- function wrapper (client) {
- if (opts.servers) {
- if (!client._reconnectCount || client._reconnectCount === opts.servers.length) {
- client._reconnectCount = 0
- }
- opts.host = opts.servers[client._reconnectCount].host
- opts.port = opts.servers[client._reconnectCount].port
- opts.protocol = (!opts.servers[client._reconnectCount].protocol ? opts.defaultProtocol : opts.servers[client._reconnectCount].protocol)
- opts.hostname = opts.host
- client._reconnectCount++
- }
- return protocols[opts.protocol](client, opts)
- }
- return new MqttClient(wrapper, opts)
- }
- module.exports = connect
- module.exports.connect = connect
- module.exports.MqttClient = MqttClient
- module.exports.Store = Store
|