/*
* Copyright (c) 2017 Sebastian Rager
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
const EventEmitter = require('events');
const debug = require('debug')('zyre:zyre_node');
const zeromq = require('zeromq');
const ZreMsg = require('./zre_msg');
/**
* ZyreNode represents the local node which handles incoming messages from other peers.
*
* @extends EventEmitter
*/
class ZyreNode extends EventEmitter {
/**
* @param {object} options - Options object
* @param {Buffer} options.identity - 16 byte UUID as Buffer
* @param {string} options.name - Name of the zyre node
* @param {string} options.address - Address of the zyre node
* @param {number} options.mailbox - Network port of the zyre node
* @param {object} [options.headers={}] - Headers of the zyre node
* @param {ZyrePeers} options.zyrePeers - Global ZyrePeers object
* @param {ZyreGroups} options.zyreGroups - Global ZyreGroups object
*/
constructor({
identity,
name,
address,
mailbox,
headers = {},
zyrePeers,
zyreGroups,
}) {
super();
this._identity = identity;
this._name = name;
this._endpoint = `tcp://${address}:${mailbox}`;
this._headers = headers;
this._zyrePeers = zyrePeers;
this._zyreGroups = zyreGroups;
this._groups = [];
this._status = 0;
this._createHandler();
}
/**
* Starts listening for foreign messages, manages incoming messages as defined in ZRE.
*
* @fires ZyreNode#hello
* @fires ZyreNode#whisper
* @fires ZyreNode#shout
* @fires ZyreNode#join
* @fires ZyreNode#leave
* @return {Promise}
*/
startListening() {
this._zyrePeers.on('new', this._newPeerHandler);
this._socket = zeromq.socket('router');
this._socket.setsockopt(zeromq.ZMQ_ROUTER_HANDOVER, 1);
this._socket.on('message', this._messageHandler);
return new Promise((resolve) => {
this._socket.bind(this._endpoint, () => {
debug(`listening on ${this._endpoint}`);
resolve();
});
});
}
/**
* Stops listening for messages and closes the socket.
*
* @return {Promise}
*/
stopListening() {
this._zyrePeers.removeListener('new', this._newPeerHandler);
this._socket.removeListener('message', this._messageHandler);
return new Promise((resolve) => {
this._socket.unbind(this._endpoint, () => {
this._socket.close();
resolve();
});
});
}
/**
* Joins a group.
*
* @param {string} group - Name of the group
*/
join(group) {
this._groups.push(group);
this._status = (this._status + 1) % 255;
this._zyrePeers.send(new ZreMsg(ZreMsg.JOIN, {
group,
status: this._status,
}));
}
/**
* Leaves a group.
*
* @param {string} group - Name of the group
*/
leave(group) {
const index = this._groups.indexOf(group);
if (index > -1) {
this._groups.splice(index, 1);
this._status = (this._status + 1) % 255;
this._zyrePeers.send(new ZreMsg(ZreMsg.LEAVE, {
group,
status: this._status,
}));
}
}
/**
* Creates handler as object properties in a separate method to ensure proper scope via arrow
* functions.
*
* @protected
*/
_createHandler() {
/**
* Connects to the given ZyrePeer and sends a HELLO message.
*
* @protected
* @param {ZyrePeer} zyrePeer - ZyrePeer
*/
this._newPeerHandler = (zyrePeer) => {
zyrePeer.connect();
zyrePeer.send(new ZreMsg(ZreMsg.HELLO, {
endpoint: this._endpoint,
groups: this._groups,
status: this._status,
name: this._name,
headers: this._headers,
}));
};
/**
* Parses the given id and message, updates the peer information found in the message and takes
* over message handling.
*
* @protected
* @fires ZyreNode#hello
* @fires ZyreNode#whisper
* @fires ZyreNode#shout
* @fires ZyreNode#join
* @fires ZyreNode#leave
* @param {Buffer} id - 16 byte UUID as Buffer with leading byte 01
* @param {Buffer} msg - Message as binary Buffer
* @param {Buffer} frame - Message content as binary Buffer
*/
this._messageHandler = (id, msg, frame) => {
const zreMsg = ZreMsg.read(msg, frame);
if (typeof zreMsg === 'undefined') {
debug('received malformed message');
return;
}
// Remove the leading byte from the id buffer
const identity = id.slice(1).toString('hex');
// Reject messages from unknown peers that are not HELLO messages
if (!this._zyrePeers.exists(identity) && zreMsg.cmd !== ZreMsg.HELLO) {
debug(`unknown peer (${identity}) wants to send message (${zreMsg.cmd})`);
return;
}
debug(`received message (${zreMsg.cmd}) from ${identity}, seq ${zreMsg.sequence}`);
const zyrePeer = this._zyrePeers.push({
identity,
sequence: zreMsg.sequence,
endpoint: zreMsg.endpoint,
status: zreMsg.status,
name: zreMsg.name,
headers: zreMsg.headers,
});
// If an error occured in pushing the peer, prevent further event handling
if (typeof zyrePeer === 'undefined') return;
// Message handling
switch (zreMsg.cmd) {
case ZreMsg.HELLO:
zreMsg.groups.forEach((group) => {
this._zyreGroups.push(group, zyrePeer);
this.emit('join', zyrePeer, group);
});
/**
* @event ZyreNode#hello
* @property {ZyrePeer} zyrePeer - ZyrePeer
*/
this.emit('hello', zyrePeer);
break;
case ZreMsg.WHISPER:
/**
* @event ZyreNode#whisper
* @property {ZyrePeer} zyrePeer - ZyrePeer
* @property {Buffer} content - Content of the message
*/
this.emit('whisper', zyrePeer, zreMsg.content);
break;
case ZreMsg.SHOUT:
if (this._groups.includes(zreMsg.group)) {
/**
* @event ZyreNode#shout
* @property {ZyrePeer} zyrePeer - ZyrePeer
* @property {Buffer} content - Content of the message
* @property {string} group - Name of the group
*/
this.emit('shout', zyrePeer, zreMsg.content, zreMsg.group);
}
break;
case ZreMsg.JOIN:
this._zyreGroups.push(zreMsg.group, zyrePeer);
/**
* @event ZyreNode#join
* @property {ZyrePeer} zyrePeer - ZyrePeer
* @property {string} group - Name of the group
*/
this.emit('join', zyrePeer, zreMsg.group);
break;
case ZreMsg.LEAVE:
this._zyreGroups.remove(zreMsg.group, zyrePeer);
/**
* @event ZyreNode#leave
* @property {ZyrePeer} zyrePeer - ZyrePeer
* @property {string} group - Name of the group
*/
this.emit('leave', zyrePeer, zreMsg.group);
break;
case ZreMsg.PING:
zyrePeer.send(new ZreMsg(ZreMsg.PING_OK));
break;
default:
}
};
}
}
module.exports = ZyreNode;