Beekeeper
view release on metacpan or search on metacpan
examples/chat/js/beekeeper.js view on Meta::CPAN
on_connect: function() {...}
});
bkpr.send_notification({
method: "test.foo",
params: { foo: "bar" }
});
bkpr.call_remote_method({
method: "test.bar",
params: { foo: "baz" },
on_success: function(result) {...},
on_error: function(error) {...}
});
bkpr.accept_notifications({
method: "test.foo.*",
on_receive: function(params) {...}
});
bkpr.accept_remote_calls({
method: "test.bar",
on_receive: function(params) {...}
});
*/
function BeekeeperClient () { return {
mqtt: null,
host: null,
client_id: null,
response_topic: null,
request_seq: 1,
subscr_seq: 1,
pending_req: {},
subscr_cb: {},
subscr_re: {},
connect: function(args) {
const This = this;
if (!this.client_id) this._generate_client_id();
if ('debug' in args) this.debug(args.debug);
this._debug(`Connecting to MQTT broker at ${args.url}`);
// It is possible to iterate over a list of servers specifying:
// url: [{ host: 'localhost', port: 1883 }, ... ]
// Connect to MQTT broker using websockets
this.mqtt = mqtt.connect( args.url, {
username: args.username || 'guest',
password: args.password || 'guest',
clientId: this.client_id,
protocolVersion: 5,
clean: true,
keepalive: 60,
reconnectPeriod: 1000,
connectTimeout: 30 * 1000
});
this.mqtt.on('connect', function (connack) {
This.host = This.mqtt.options.host;
This._debug("Connected to MQTT broker at " + This.host);
This._create_response_topic();
if (args.on_connect) args.on_connect(connack.properties);
});
this.mqtt.on('reconnect', function () {
// Emitted when a reconnect starts
This._debug("Reconnecting...");
});
this.mqtt.on('close', function () {
// Emitted after a disconnection
This._debug("Disconnected");
});
this.mqtt.on('disconnect', function (packet) {
// Emitted after receiving disconnect packet from broker
This._debug("Disconnected by broker");
});
this.mqtt.on('offline', function () {
// Emitted when the client goes offline
This._debug("Client offline");
});
this.mqtt.on('error', function (error) {
// Emitted when the client cannot connect
This._debug(error);
});
this.mqtt.on('message', function (topic, message, packet) {
let jsonrpc;
try {
if (message[0] == 0x78) {
// Deflated JSON
let json = pako.inflate(message, {to:'string'});
jsonrpc = JSON.parse(json);
}
else {
jsonrpc = JSON.parse(message.toString());
}
} catch (e) { throw `Received invalid JSON: ${e}` }
This._debug(`Got << ${message}`);
const subscr_id = packet.properties.subscriptionIdentifier;
const subscr_cb = This.subscr_cb[subscr_id];
subscr_cb(jsonrpc, packet.properties);
});
},
_generate_client_id: function() {
// Generate a random client id (128+ bits)
this.client_id = '';
const chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz';
examples/chat/js/beekeeper.js view on Meta::CPAN
// Incoming notification
try { on_receive( jsonrpc.params, packet_prop ) }
catch(e) { This._debug(`Uncaught exception into on_receive callback of ${jsonrpc.method}: ${e}`) }
};
this.subscr_re[subscr_id] = new RegExp('^' + args.method.replace(/\./g,'\\.').replace(/\*/g,'.+') + '$');
// Private notifications are received on response_topic subscription
if (args.private) return;
const topic = 'msg/frontend/' + args.method.replace(/\./g,'/').replace(/\*/g,'#');
this.mqtt.subscribe(
topic,
{ properties: { subscriptionIdentifier: subscr_id }},
function (err, granted) {
if (err) throw `Failed to subscribe to ${topic}: ${err}`;
}
);
},
call_remote_method: function(args) {
if (!this.mqtt.connected) throw "Not connected to MQTT broker";
const req_id = this.request_seq++;
const json = JSON.stringify({
jsonrpc: "2.0",
method: args.method,
params: args.params,
id: req_id
});
const QUEUE_LANES = 2;
const topic = 'req/backend-' + Math.floor( Math.random() * QUEUE_LANES + 1 );
const fwd_to = 'req/backend/' + args.method.replace(/\.[\w-]+$/,'').replace(/\./g,'/');
this.mqtt.publish(
topic,
json,
{ properties: {
responseTopic: this.response_topic,
userProperties: { fwd_to: fwd_to }
}}
);
this._debug("Sent >> " + json);
this.pending_req[req_id] = {
method: args.method,
on_success: args.on_success,
on_error: args.on_error,
timeout: null
};
const This = this;
this.pending_req[req_id].timeout = setTimeout( function() {
delete This.pending_req[req_id];
This._debug(`Call to ${args.method} timed out`);
const err_resp = { code: -32603, message: "Request timeout" };
if (args.on_error) {
try { args.on_error(err_resp) }
catch(e) { This._debug(`Uncaught exception into on_error callback of ${args.method}: ${e}`) }
}
else {
This._on_error(err_resp);
}
}, (args.timeout || 30) * 1000);
},
_create_response_topic: function() {
const response_topic = 'priv/' + this.client_id;
this.response_topic = response_topic;
const subscr_id = this.subscr_seq++;
const This = this;
this.subscr_cb[subscr_id] = function(jsonrpc, packet_prop) {
if (!jsonrpc.id) {
// Incoming private notification
let on_receive;
for (let subscr_id in This.subscr_re) {
if (jsonrpc.method.match( This.subscr_re[subscr_id] )) {
on_receive = This.subscr_cb[subscr_id];
break;
}
}
if (on_receive) {
try { on_receive( jsonrpc, packet_prop ) }
catch(e) { This._debug(`Uncaught exception into on_receive callback of ${jsonrpc.method}: ${e}`) }
}
else {
This._debug(`Received unhandled private notification ${jsonrpc.method}`);
}
return;
}
// Incoming remote call response
const resp = jsonrpc;
const req = This.pending_req[resp.id];
delete This.pending_req[resp.id];
if (!req) return;
clearTimeout(req.timeout);
if ('result' in resp) {
if (req.on_success) {
try { req.on_success( resp.result, packet_prop ) }
catch(e) { This._debug(`Uncaught exception into on_success callback of ${req.method}: ${e}`) }
}
}
else {
This._debug(`Error response from ${req.method} call: ${resp.error.message}`);
if (req.on_error) {
try { req.on_error( resp.error, packet_prop ) }
catch(e) { This._debug(`Uncaught exception into on_error callback of ${req.method}: ${e}`) }
}
else {
This._on_error(resp.error);
}
}
};
this.mqtt.subscribe(
response_topic,
{ properties: { subscriptionIdentifier: subscr_id }},
function (err, granted) {
if (err) throw `Failed to subscribe to ${response_topic}: ${err}`;
}
);
},
accept_remote_calls: function(args) {
// This is included for reference, but please note that frontend clients
// should *not* be allowed to even connect to the backend broker, let alone
// consume from req/backend/*, as that would allow a malicious actor to
// disrupt services or steal other users credentials
if (!this.mqtt.connected) throw "Not connected to MQTT broker";
const subscr_id = this.subscr_seq++;
const on_receive = args.on_receive;
const This = this;
this.subscr_cb[subscr_id] = function(jsonrpc, packet_prop) {
// Incoming remote request
let json;
try {
let result = on_receive( jsonrpc.params, packet_prop );
json = JSON.stringify({
jsonrpc: "2.0",
result: result,
id: req.id
});
}
catch (e) {
json = JSON.stringify({
jsonrpc: "2.0",
error: { code: -32603, message: e.message },
id: req.id
( run in 1.553 second using v1.01-cache-2.11-cpan-437f7b0c052 )