Source: zyre_peer.js

/*
 * Copyright (c) 2017 Sebastian Rager
 *
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 */

const EventEmitter = require('events');
const debug = require('debug')('zyre:zyre_peer');
const zeromq = require('zeromq');
const ZreMsg = require('./zre_msg');

const ID_PREFIX = 1;

/**
 * ZyrePeer represents a foreign peer in the network.
 *
 * @extends EventEmitter
 */
class ZyrePeer extends EventEmitter {
  /**
   * @param {object} options - Options object
   * @param {string} options.identity - Identity of the peer
   * @param {Buffer} options.originID - 16 byte UUID as Buffer
   * @param {number} [options.evasive=5000] - Evasive timeout in ms
   * @param {number} [options.expired=30000] - Expired timeout in ms
   */
  constructor({
    identity,
    originID,
    evasive = 5000,
    expired = 30000,
  }) {
    super();

    this.identity = identity;
    this._originID = originID;
    this._evasive = evasive;
    this._expired = expired;
    this._groups = {};
    this._connected = false;
    this._sequenceIn = 0;
    this._sequenceOut = 0;
    this._evasiveAt = 0;

    this._createHandler();
  }

  /**
   * Adds this ZyrePeer to a given ZyreGroup.
   *
   * @param {ZyreGroup} group - ZyreGroup
   */
  addToGroup(group) {
    if (typeof this._groups[group.name] === 'undefined') {
      this._groups[group.name] = group;
      group.add(this);
    }
  }

  /**
   * Removes this ZyrePeer from a given ZyreGroup.
   *
   * @param {ZyreGroup} group - ZyreGroup
   */
  removeFromGroup(group) {
    if (typeof this._groups[group.name] !== 'undefined') {
      delete this._groups[group.name];
      group.remove(this);
    }
  }

  /**
   * Connects to this ZyrePeer.
   */
  connect() {
    if (typeof this._socket === 'undefined') {
      this._socket = zeromq.socket('dealer');
      const sockIdentity = Buffer.concat([Buffer.from([ID_PREFIX]), this._originID]);

      this._socket.setsockopt(zeromq.ZMQ_IDENTITY, sockIdentity);
      this._socket.setsockopt(zeromq.ZMQ_LINGER, 0);
      this._socket.connect(this._endpoint);
    }

    this._connected = true;
    debug(`connected to ${this.identity}`);
  }

  /**
   * Disconnects from this ZyrePeer and stops every activity. Closes the zeromq dealer socket. Loses
   * all pending messages, so only use when the peer should be permanently removed.
   *
   * @fires ZyrePeer#disconnect
   */
  disconnect() {
    this._connected = false;
    this._clearTimeouts();

    Object.keys(this._groups).forEach((i) => {
      this.removeFromGroup(this._groups[i]);
    });

    if (typeof this._socket !== 'undefined') {
      this._socket.close();
      this._socket = undefined;
    }

    debug(`disconnected from ${this.identity}`);
    /**
     * @event ZyrePeer#disconnect
     * @property {ZyrePeer} zyrePeer - ZyrePeer
     */
    this.emit('disconnect', this);
  }

  /**
   * Sends a ZreMsg to this ZyrePeer.
   *
   * @param {ZreMsg} msg - ZreMsg
   */
  send(msg) {
    if (this._connected) {
      this._sequenceOut = (this._sequenceOut + 1) % 65535;
      msg.sequence = this._sequenceOut;
      msg.send(this._socket).then((cmd) => {
        debug(`sent message (${cmd}) to ${this.identity}, seq ${this._sequenceOut}`);
      });
    }
  }

  /**
   * Updates the data of this ZyrePeer, manages timeouts for evasive and expired.
   *
   * @fires ZyrePeer#expired
   * @fires ZyrePeer#disconnect
   * @param {object} options - Options object
   * @param {number} [options.sequence] - Sequence of the last received message
   * @param {string} [options.address] - IP of the peer
   * @param {number} [options.mailbox] - Network port of the peer
   * @param {string} [options.endpoint] - TCP address of the peer
   * @param {number} [options.status] - Group status of the peer
   * @param {string} [options.name] - Name of the peer
   * @param {object} [options.headers] - Headers of the peer
   * @return {ZyrePeer}
   */
  update({
    sequence,
    address,
    mailbox,
    endpoint,
    status,
    name,
    headers,
  }) {
    if (!this._setSequence(sequence)) return undefined;
    if (!this._setAddress(address, mailbox)) return undefined;

    this._setEndpoint(endpoint);
    this._setStatus(status);
    this._setName(name);
    this._setHeaders(headers);

    this._clearTimeouts();
    this._setTimeouts();

    return this;
  }

  /**
   * @typedef {object} PeerObject
   * @property {string} name
   * @property {string} endpoint
   * @property {object} headers
   * @property {string[]} groups
   * @property {boolean} evasive
   */

  /**
   * Creates an object with public data of this peer.
   *
   * @return {PeerObject}
   */
  toObj() {
    const obj = {};
    obj.name = this.name;
    obj.endpoint = this._endpoint;
    obj.headers = this.headers;
    obj.groups = [];

    Object.keys(this._groups).forEach((i) => {
      obj.groups.push(i);
    });

    obj.evasive = this._evasiveAt > 0;

    return obj;
  }

  /**
   * Sets the sequence number of the last received message. Disconnects from the peer and returns
   * false if the sequence number does not equal the expected one.
   *
   * @protected
   * @fires ZyrePeer#disconnect
   * @param {number} sequence - Sequence of the last received message
   * @return {boolean}
   */
  _setSequence(sequence) {
    if (typeof sequence !== 'undefined') {
      const expectedSeq = (this._sequenceIn + 1) % 65535;

      // Disconnect on wrong sequence number
      if (sequence !== expectedSeq) {
        debug(`${this.identity} sent wrong sequence (${sequence}), expected ${expectedSeq}`);
        this.disconnect();
        return false;
      }

      this._sequenceIn = sequence;
    }

    return true;
  }

  /**
   * Sets the endpoint of the peer if it isn't connected yet. Disconnects from the peer and
   * returns false if the received mailbox port equals 0.
   *
   * @protected
   * @fires ZyrePeer#disconnect
   * @param {string} address - IP of the peer
   * @param {number} mailbox - Network port of the peer
   * @return {boolean}
   */
  _setAddress(address, mailbox) {
    if (typeof address !== 'undefined' && typeof mailbox !== 'undefined') {
      // If received a message with mailbox set to 0, disconnect from the peer (zre beacon protocol)
      if (mailbox === 0) {
        debug(`received disconnect beacon from ${this.identity}`);
        this.disconnect();
        return false;
      }

      if (!this._connected) this._endpoint = `tcp://${address}:${mailbox}`;
    }

    return true;
  }

  /**
   * @protected
   * @param {string} endpoint - TCP address of the peer
   */
  _setEndpoint(endpoint) {
    if (typeof endpoint !== 'undefined' && !this._connected) this._endpoint = endpoint;
  }

  /**
   * @protected
   * @param {number} status - Group status of the peer
   */
  _setStatus(status) {
    if (typeof status !== 'undefined') this._status = status;
  }

  /**
   * @protected
   * @param {string} name - Name of the peer
   */
  _setName(name) {
    if (typeof name !== 'undefined') this.name = name;
  }

  /**
   * @protected
   * @param {object} headers - Headers of the peer
   */
  _setHeaders(headers) {
    if (typeof headers !== 'undefined') this.headers = headers;
  }

  /**
   * Sets timeouts for evasive and expired.
   *
   * @protected
   * @fires ZyrePeer#expired
   * @fires ZyrePeer#disconnect
   */
  _setTimeouts() {
    // Reset the evasive status when restarting the timeouts
    this._evasiveAt = 0;

    this._evasiveTimeout = setTimeout(this._evasiveHandler, this._evasive);
    this._expiredTimeout = setTimeout(this._expiredHandler, this._expired);
  }

  /**
   * Clears the evasive and expired timeouts.
   *
   * @protected
   */
  _clearTimeouts() {
    clearTimeout(this._evasiveTimeout);
    clearTimeout(this._expiredTimeout);
  }

  /**
   * Creates handler as object properties in a separate method to ensure proper scope via arrow
   * functions.
   *
   * @protected
   */
  _createHandler() {
    /**
     * Send PING message to evasive peer to check if it is still alive.
     *
     * @protected
     */
    this._evasiveHandler = () => {
      this._evasiveAt = Date.now();
      debug(`${this.identity} is evasive at ${this._evasiveAt}`);
      this.send(new ZreMsg(ZreMsg.PING));
    };

    /**
     * Disconnect from the peer when it is expired.
     *
     * @protected
     * @fires ZyrePeer#expired
     * @fires ZyrePeer#disconnect
     */
    this._expiredHandler = () => {
      debug(`${this.identity} is expired at ${Date.now()}`);
      /**
       * @event ZyrePeer#expired
       * @property {ZyrePeer} zyrePeer - ZyrePeer
       */
      this.emit('expired', this);
      this.disconnect();
    };
  }
}

module.exports = ZyrePeer;