/*
* Copyright (c) 2017 Sebastian Rager
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
const zeromq = require('zeromq');
const ZRE_VERSION = 2;
const ZRE_HEADER = Buffer.from([0xAA, 0xA1]);
const HELLO = 1;
const WHISPER = 2;
const SHOUT = 3;
const JOIN = 4;
const LEAVE = 5;
const PING = 6;
const PING_OK = 7;
function putNumber1(num) {
const buf = Buffer.alloc(1);
buf.writeUInt8(num);
return buf;
}
function getNumber1(buf) {
return buf.readUInt8();
}
function putNumber2(num) {
const buf = Buffer.alloc(2);
buf.writeUInt16BE(num);
return buf;
}
function getNumber2(buf) {
return buf.readUInt16BE();
}
function putNumber4(num) {
const buf = Buffer.alloc(4);
buf.writeUInt32BE(num);
return buf;
}
function getNumber4(buf) {
return buf.readUInt32BE();
}
function putString(str) {
return Buffer.concat([
putNumber1(Buffer.byteLength(str, 'utf8')),
Buffer.from(str, 'utf8'),
]);
}
function getString(buf) {
const pointer = getNumber1(buf) + 1;
const value = buf.toString('utf8', 1, pointer);
return { value, pointer };
}
function putLongString(str) {
return Buffer.concat([
putNumber4(Buffer.byteLength(str, 'utf8')),
Buffer.from(str, 'utf8'),
]);
}
function getLongString(buf) {
const pointer = getNumber4(buf) + 4;
const value = buf.toString('utf8', 4, pointer);
return { value, pointer };
}
function putStrings(strArr) {
let stringsBuffer = putNumber4(strArr.length);
strArr.forEach((e) => {
stringsBuffer = Buffer.concat([
stringsBuffer,
putLongString(String(e)),
]);
});
return stringsBuffer;
}
function getStrings(buf) {
const count = getNumber4(buf);
const value = [];
let pointer = 4;
for (let i = 0; i < count; i += 1) {
const string = getLongString(buf.slice(pointer));
value.push(string.value);
pointer += string.pointer;
}
return { value, pointer };
}
function putDictionary(obj) {
let dictBuffer = putNumber4(Object.keys(obj).length);
Object.keys(obj).forEach((i) => {
dictBuffer = Buffer.concat([
dictBuffer,
putString(String(i)),
putLongString(String(obj[i])),
]);
});
return dictBuffer;
}
function getDictionary(buf) {
const count = getNumber4(buf);
const value = {};
let pointer = 4;
for (let i = 0; i < count; i += 1) {
const keyString = getString(buf.slice(pointer));
const valuePointer = pointer + keyString.pointer;
const valueString = getLongString(buf.slice(valuePointer));
value[keyString.value] = valueString.value;
pointer = valuePointer + valueString.pointer;
}
return { value, pointer };
}
/**
* ZreMsg represents a message in ZRE format.
*/
class ZreMsg {
/**
* @param {number} cmd - ZreMsg command as number
* @param {object} [options] - Options object
* @param {number} [options.sequence=1] - Sequence of the message
* @param {string} [options.group] - Group which the node/peer joins or leaves
* @param {Buffer} [options.content] - Content of the message
* @param {string} [options.endpoint] - TCP address of the node/peer
* @param {string[]} [options.groups] - Groups in which the node/peer participates
* @param {number} [options.status] - Groups status of the node/peer
* @param {string} [options.name] - Name of the node/peer
* @param {object} [options.headers] - Headers of the node/peer
*/
constructor(cmd, options = {}) {
this.cmd = cmd;
this.sequence = 1;
Object.keys(options).forEach((i) => {
this[i] = options[i];
});
}
/**
* Sends this ZreMsg with the given zeromq dealer socket.
*
* @param {zeromq.Socket} socket - Zeromq dealer socket
* @return {Promise}
*/
send(socket) {
return new Promise((resolve) => {
if (this.cmd === WHISPER || this.cmd === SHOUT) {
socket.send(this.toBuffer(), zeromq.ZMQ_SNDMORE);
socket.send(this.content, 0, () => {
resolve(this.cmd);
});
} else {
socket.send(this.toBuffer(), 0, () => {
resolve(this.cmd);
});
}
});
}
/**
* Creates a binary Buffer from this ZreMsg.
*
* @return {Buffer} Binary Buffer in ZreMsg format
*/
toBuffer() {
const bufArr = [];
bufArr.push(ZRE_HEADER);
bufArr.push(putNumber1(this.cmd));
bufArr.push(putNumber1(ZRE_VERSION));
bufArr.push(putNumber2(this.sequence));
if (typeof this.group !== 'undefined') bufArr.push(putString(this.group));
if (typeof this.endpoint !== 'undefined') bufArr.push(putString(this.endpoint));
if (typeof this.groups !== 'undefined') bufArr.push(putStrings(this.groups));
if (typeof this.status !== 'undefined') bufArr.push(putNumber1(this.status));
if (typeof this.name !== 'undefined') bufArr.push(putString(this.name));
if (typeof this.headers !== 'undefined') bufArr.push(putDictionary(this.headers));
return Buffer.concat(bufArr);
}
/**
* Reads, validates and creates a new ZreMsg from the given Buffer and frame.
*
* @param {Buffer} buffer - Binary Buffer in ZreMsg format
* @param {Buffer} frame - Message content as binary Buffer
* @return {ZreMsg}
*/
static read(buffer, frame) {
try {
if (buffer.compare(ZRE_HEADER, 0, 2, 0, 2) !== 0) throw Error;
if (getNumber1(buffer.slice(3)) !== ZRE_VERSION) throw Error;
switch (getNumber1(buffer.slice(2))) {
case HELLO: {
let pointer = 4;
const sequence = getNumber2(buffer.slice(pointer));
pointer += 2;
// A HELLO message always has to be the first message
if (sequence !== 1) throw Error;
const endpointString = getString(buffer.slice(pointer));
const endpoint = endpointString.value;
pointer += endpointString.pointer;
const groupsStrings = getStrings(buffer.slice(pointer));
const groups = groupsStrings.value;
pointer += groupsStrings.pointer;
const status = getNumber1(buffer.slice(pointer));
pointer += 1;
const nameString = getString(buffer.slice(pointer));
const name = nameString.value;
pointer += nameString.pointer;
const headers = getDictionary(buffer.slice(pointer)).value;
return new ZreMsg(HELLO, {
sequence,
endpoint,
groups,
status,
name,
headers,
});
}
case WHISPER: {
const pointer = 4;
const sequence = getNumber2(buffer.slice(pointer));
return new ZreMsg(WHISPER, {
sequence,
content: frame,
});
}
case SHOUT: {
let pointer = 4;
const sequence = getNumber2(buffer.slice(pointer));
pointer += 2;
const group = getString(buffer.slice(pointer)).value;
return new ZreMsg(SHOUT, {
sequence,
group,
content: frame,
});
}
case JOIN: {
let pointer = 4;
const sequence = getNumber2(buffer.slice(pointer));
pointer += 2;
const groupString = getString(buffer.slice(pointer));
const group = groupString.value;
pointer += groupString.pointer;
const status = getNumber1(buffer.slice(pointer));
return new ZreMsg(JOIN, {
sequence,
group,
status,
});
}
case LEAVE: {
let pointer = 4;
const sequence = getNumber2(buffer.slice(pointer));
pointer += 2;
const groupString = getString(buffer.slice(pointer));
const group = groupString.value;
pointer += groupString.pointer;
const status = getNumber1(buffer.slice(pointer));
return new ZreMsg(LEAVE, {
sequence,
group,
status,
});
}
case PING: {
const pointer = 4;
const sequence = getNumber2(buffer.slice(pointer));
return new ZreMsg(PING, {
sequence,
});
}
case PING_OK: {
const pointer = 4;
const sequence = getNumber2(buffer.slice(pointer));
return new ZreMsg(PING_OK, {
sequence,
});
}
default:
throw Error;
}
} catch (err) {
return undefined;
}
}
}
ZreMsg.HELLO = HELLO;
ZreMsg.WHISPER = WHISPER;
ZreMsg.SHOUT = SHOUT;
ZreMsg.JOIN = JOIN;
ZreMsg.LEAVE = LEAVE;
ZreMsg.PING = PING;
ZreMsg.PING_OK = PING_OK;
module.exports = ZreMsg;