Source: zyre.js

/*
 * Copyright (c) 2017 Sebastian Rager
 *
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 */

const EventEmitter = require('events');
const uuid = require('uuid');
const ZHelper = require('./zhelper');
const ZyrePeers = require('./zyre_peers');
const ZyreGroups = require('./zyre_groups');
const ZyreNode = require('./zyre_node');
const ZBeacon = require('./zbeacon');
const ZreMsg = require('./zre_msg');

/**
 * Zyre represents the public api.
 *
 * @extends EventEmitter
 */
class Zyre extends EventEmitter {
  /**
   * @param {object} options - Options object
   * @param {string} [options.name] - Name of the zyre node
   * @param {string} [options.iface] - Network interface or IPv4 address
   * @param {object} [options.headers] - Optional headers, sent to every peer on discovery
   * @param {number} [options.evasive=5000] - Evasive timeout in ms
   * @param {number} [options.expired=30000] - Expired timeout in ms
   * @param {number} [options.port=49152] - Port for incoming messages, will be incremented if used
   * @param {number} [options.bport=5670] - Broadcast beacon port
   * @param {number} [options.binterval=1000] - Broadcast beacon interval in ms
   */
  constructor({
    name,
    iface,
    headers,
    evasive = 5000,
    expired = 30000,
    port = 49152,
    bport = 5670,
    binterval = 1000,
  } = {}) {
    super();

    if (typeof iface === 'string') {
      this._ifaceData = ZHelper.getIfData(iface);
    } else {
      this._ifaceData = ZHelper.getIfData();
    }

    if (typeof this._ifaceData === 'undefined') {
      throw new Error('Could not find a valid IPv4 interface with given parameters');
    }

    // Create new uuid
    this._identity = Buffer.alloc(16);
    uuid.v4(null, this._identity, 0);

    // Set the name to the first six bytes of the uuid if name is not given
    if (typeof name === 'string') {
      this._name = name;
    } else {
      this._name = this._identity.toString('hex', 0, 6);
    }

    if (typeof headers === 'object' && headers !== null) this._headers = headers;
    if (typeof evasive === 'number') this._evasive = evasive;
    if (typeof expired === 'number') this._expired = expired;
    if (typeof port === 'number') this._port = port;
    if (typeof bport === 'number') this._bport = bport;
    if (typeof binterval === 'number') this._binterval = binterval;

    this._createHandler();
  }

  /**
   * Finds a free TCP port on the host, starts the ZyreNode and the ZBeacon, adds listeners.
   * Executes the callback or returns a Promise on success.
   *
   * @fires Zyre#connect
   * @fires Zyre#disconnect
   * @fires Zyre#expired
   * @fires Zyre#whisper
   * @fires Zyre#shout
   * @fires Zyre#join
   * @fires Zyre#leave
   * @param {function} callback - Executed on success
   * @return {Promise}
   */
  start(callback) {
    // Initialize groups
    this._zyreGroups = new ZyreGroups();

    // Initialize peers
    this._zyrePeers = new ZyrePeers({
      identity: this._identity,
      evasive: this._evasive,
      expired: this._expired,
    });

    this._zyrePeers.on('expired', this._expiredHandler);
    this._zyrePeers.on('disconnect', this._disconnectHandler);

    return new Promise((resolve) => {
      // Initialize node and beacon when the mailbox port is found
      ZHelper.getFreePort(this._ifaceData.address, this._port).then((mailbox) => {
        this._zyreNode = new ZyreNode({
          identity: this._identity,
          name: this._name,
          address: this._ifaceData.address,
          mailbox,
          headers: this._headers,
          zyrePeers: this._zyrePeers,
          zyreGroups: this._zyreGroups,
        });

        this._zyreNode.on('hello', this._connectHandler);
        this._zyreNode.on('whisper', this._whisperHandler);
        this._zyreNode.on('shout', this._shoutHandler);
        this._zyreNode.on('join', this._joinHandler);
        this._zyreNode.on('leave', this._leaveHandler);

        this._zBeacon = new ZBeacon({
          identity: this._identity,
          mailbox,
          ifaceData: this._ifaceData,
          port: this._bport,
          interval: this._binterval,
          zyrePeers: this._zyrePeers,
        });

        // Start node and beacon
        this._zyreNode.startListening().then(() => {
          this._zBeacon.start().then(() => {
            if (typeof callback === 'function') callback();
            resolve();
          });
        });
      });
    });
  }

  /**
   * Stops listening, closes all sockets, removes all event listeners and disconnects from all
   * peers. Executes the callback or returns a Promise on success.
   *
   * @param {function} callback - Executed on success
   * @return {Promise}
   */
  stop(callback) {
    this._zyreNode.removeAllListeners();
    this._zyrePeers.removeAllListeners();
    this._zyrePeers.disconnectAll();

    return new Promise((resolve) => {
      this._zBeacon.stop().then(() => {
        this._zyreNode.stopListening().then(() => {
          if (typeof callback === 'function') callback();
          resolve();
        });
      });
    });
  }

  /**
   * Sends a message to a ZyrePeer.
   *
   * @param {string} identity - Identity of the peer
   * @param {(string|Buffer)} message - Message to send
   */
  whisper(identity, message) {
    if (this._zyrePeers.exists(identity)) {
      this._zyrePeers.getPeer(identity).send(new ZreMsg(ZreMsg.WHISPER, {
        content: message,
      }));
    }
  }

  /**
   * Sends a message to a ZyreGroup.
   *
   * @param {string} group - Name of the group
   * @param {(string|Buffer)} message - Message to send
   */
  shout(group, message) {
    if (this._zyreGroups.exists(group)) {
      this._zyreGroups.getGroup(group).send(new ZreMsg(ZreMsg.SHOUT, {
        group,
        content: message,
      }));
    }
  }

  /**
   * Joins a group.
   *
   * @param {string} group - Name of the group
   */
  join(group) {
    this._zyreNode.join(group);
  }

  /**
   * Leaves a group.
   *
   * @param {string} group - Name of the group
   */
  leave(group) {
    this._zyreNode.leave(group);
  }

  /**
   * @return {string} Identity
   */
  getIdentity() {
    return this._identity.toString('hex');
  }

  /**
   * Returns an object with information of the ZyrePeer with the given identity.
   *
   * @param {string} identity - Identity of the peer
   * @return {PeerObject}
   */
  getPeer(identity) {
    if (this._zyrePeers.exists(identity)) return this._zyrePeers.getPeer(identity).toObj();
    return undefined;
  }

  /**
   * Returns an object with information of all ZyrePeers.
   *
   * @return {PeersObject}
   */
  getPeers() {
    return this._zyrePeers.toObj();
  }

  /**
   * Returns an object with information of the ZyreGroup with the given name.
   *
   * @param {string} name - Name of the group
   * @return {PeersObject}
   */
  getGroup(name) {
    if (this._zyreGroups.exists(name)) return this._zyreGroups.getGroup(name).toObj();
    return undefined;
  }

  /**
   * Returns an object with information of all ZyreGroups.
   *
   * @return {GroupsObject}
   */
  getGroups() {
    return this._zyreGroups.toObj();
  }

  /**
   * Sets the encoding of received messages. Defaults to utf8.
   *
   * @param {?string} encoding - Encoding of messages
   */
  setEncoding(encoding) {
    switch (encoding) {
      case 'ascii':
      case 'utf8':
      case 'utf16le':
      case 'ucs2':
      case 'base64':
      case 'binary':
      case 'hex':
        this._encoding = encoding;
        break;

      case 'raw':
      case null:
        this._encoding = 'raw';
        break;

      default:
        this._encoding = 'utf8';
    }
  }

  /**
   * Returns the encoded content of the ZreMsg. Encoding is set by setEncoding(encoding).
   *
   * @protected
   * @param {Buffer} content - Content of a ZreMsg
   * @return {(string|Buffer)}
   */
  _getEncodedContent(content) {
    if (typeof this._encoding === 'undefined') return content.toString('utf8');
    if (this._encoding === 'raw') return content;
    return content.toString(this._encoding);
  }

  /**
   * Creates handler as object properties in a separate method to ensure proper scope via arrow
   * functions.
   *
   * @protected
   */
  _createHandler() {
    this._connectHandler = (zyrePeer) => {
      /**
       * @event Zyre#connect
       * @property {string} identity - Identity of the peer
       * @property {string} name - Name of the peer
       * @property {object} headers - Headers of the peer
       */
      this.emit('connect', zyrePeer.identity, zyrePeer.name, zyrePeer.headers);
    };

    this._disconnectHandler = (zyrePeer) => {
      /**
       * @event Zyre#disconnect
       * @property {string} identity - Identity of the peer
       * @property {string} name - Name of the peer
       */
      this.emit('disconnect', zyrePeer.identity, zyrePeer.name);
    };

    this._expiredHandler = (zyrePeer) => {
      /**
       * @event Zyre#expired
       * @property {string} identity - Identity of the peer
       * @property {string} name - Name of the peer
       */
      this.emit('expired', zyrePeer.identity, zyrePeer.name);
    };

    this._whisperHandler = (zyrePeer, content) => {
      const encodedContent = this._getEncodedContent(content);
      /**
       * @event Zyre#whisper
       * @property {string} identity - Identity of the peer
       * @property {string} name - Name of the peer
       * @property {(string|Buffer)} message - Message
       */
      this.emit('whisper', zyrePeer.identity, zyrePeer.name, encodedContent);
    };

    this._shoutHandler = (zyrePeer, content, group) => {
      const encodedContent = this._getEncodedContent(content);
      /**
       * @event Zyre#shout
       * @property {string} identity - Identity of the peer
       * @property {string} name - Name of the peer
       * @property {(string|Buffer)} message - Message
       * @property {string} group - Group where the message came from
       */
      this.emit('shout', zyrePeer.identity, zyrePeer.name, encodedContent, group);
    };

    this._joinHandler = (zyrePeer, group) => {
      /**
       * @event Zyre#join
       * @property {string} identity - Identity of the peer
       * @property {string} name - Name of the peer
       * @property {string} group - Group which the peer joins
       */
      this.emit('join', zyrePeer.identity, zyrePeer.name, group);
    };

    this._leaveHandler = (zyrePeer, group) => {
      /**
       * @event Zyre#leave
       * @property {string} identity - Identity of the peer
       * @property {string} name - Name of the peer
       * @property {string} group - Group which the peer leaves
       */
      this.emit('leave', zyrePeer.identity, zyrePeer.name, group);
    };
  }

  /**
   * Returns a new Zyre instance.
   *
   * @param {object} options - Options object
   * @param {string} [options.name] - Name of the zyre node
   * @param {string} [options.iface] - Network interface to use
   * @param {object} [options.headers] - Optional headers, sent to every peer on discovery
   * @param {number} [options.evasive=5000] - Evasive timeout in ms
   * @param {number} [options.expired=30000] - Expired timeout in ms
   * @param {number} [options.port=49152] - Port for incoming messages, will be incremented if used
   * @param {number} [options.bport=5670] - Broadcast beacon port
   * @param {number} [options.binterval=1000] - Broadcast beacon interval in ms
   * @return {Zyre}
   */
  static new(...args) {
    return new Zyre(...args);
  }
}

module.exports = Zyre;