pub.js 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. #!/usr/bin/env node
  2. 'use strict'
  3. var mqtt = require('../')
  4. var pump = require('pump')
  5. var path = require('path')
  6. var fs = require('fs')
  7. var concat = require('concat-stream')
  8. var Writable = require('readable-stream').Writable
  9. var helpMe = require('help-me')({
  10. dir: path.join(__dirname, '..', 'doc')
  11. })
  12. var minimist = require('minimist')
  13. var split2 = require('split2')
  14. function send (args) {
  15. var client = mqtt.connect(args)
  16. client.on('connect', function () {
  17. client.publish(args.topic, args.message, args, function (err) {
  18. if (err) {
  19. console.warn(err)
  20. }
  21. client.end()
  22. })
  23. })
  24. client.on('error', function (err) {
  25. console.warn(err)
  26. client.end()
  27. })
  28. }
  29. function multisend (args) {
  30. var client = mqtt.connect(args)
  31. var sender = new Writable({
  32. objectMode: true
  33. })
  34. sender._write = function (line, enc, cb) {
  35. client.publish(args.topic, line.trim(), args, cb)
  36. }
  37. client.on('connect', function () {
  38. pump(process.stdin, split2(), sender, function (err) {
  39. client.end()
  40. if (err) {
  41. throw err
  42. }
  43. })
  44. })
  45. }
  46. function start (args) {
  47. args = minimist(args, {
  48. string: ['hostname', 'username', 'password', 'key', 'cert', 'ca', 'message', 'clientId', 'i', 'id'],
  49. boolean: ['stdin', 'retain', 'help', 'insecure', 'multiline'],
  50. alias: {
  51. port: 'p',
  52. hostname: ['h', 'host'],
  53. topic: 't',
  54. message: 'm',
  55. qos: 'q',
  56. clientId: ['i', 'id'],
  57. retain: 'r',
  58. username: 'u',
  59. password: 'P',
  60. stdin: 's',
  61. multiline: 'M',
  62. protocol: ['C', 'l'],
  63. help: 'H',
  64. ca: 'cafile'
  65. },
  66. default: {
  67. host: 'localhost',
  68. qos: 0,
  69. retain: false,
  70. topic: '',
  71. message: ''
  72. }
  73. })
  74. if (args.help) {
  75. return helpMe.toStdout('publish')
  76. }
  77. if (args.key) {
  78. args.key = fs.readFileSync(args.key)
  79. }
  80. if (args.cert) {
  81. args.cert = fs.readFileSync(args.cert)
  82. }
  83. if (args.ca) {
  84. args.ca = fs.readFileSync(args.ca)
  85. }
  86. if (args.key && args.cert && !args.protocol) {
  87. args.protocol = 'mqtts'
  88. }
  89. if (args.port) {
  90. if (typeof args.port !== 'number') {
  91. console.warn('# Port: number expected, \'%s\' was given.', typeof args.port)
  92. return
  93. }
  94. }
  95. if (args['will-topic']) {
  96. args.will = {}
  97. args.will.topic = args['will-topic']
  98. args.will.payload = args['will-message']
  99. args.will.qos = args['will-qos']
  100. args.will.retain = args['will-retain']
  101. }
  102. if (args.insecure) {
  103. args.rejectUnauthorized = false
  104. }
  105. args.topic = (args.topic || args._.shift()).toString()
  106. args.message = (args.message || args._.shift()).toString()
  107. if (!args.topic) {
  108. console.error('missing topic\n')
  109. return helpMe.toStdout('publish')
  110. }
  111. if (args.stdin) {
  112. if (args.multiline) {
  113. multisend(args)
  114. } else {
  115. process.stdin.pipe(concat(function (data) {
  116. args.message = data
  117. send(args)
  118. }))
  119. }
  120. } else {
  121. send(args)
  122. }
  123. }
  124. module.exports = start
  125. if (require.main === module) {
  126. start(process.argv.slice(2))
  127. }