Beekeeper

 view release on metacpan or  search on metacpan

examples/chat/js/beekeeper.js  view on Meta::CPAN

                let on_receive;
                for (let subscr_id in This.subscr_re) {
                    if (jsonrpc.method.match( This.subscr_re[subscr_id] )) {
                        on_receive = This.subscr_cb[subscr_id];
                        break;
                    }
                }

                if (on_receive) {
                    try { on_receive( jsonrpc, packet_prop ) }
                    catch(e) { This._debug(`Uncaught exception into on_receive callback of ${jsonrpc.method}: ${e}`) }
                }
                else {
                    This._debug(`Received unhandled private notification ${jsonrpc.method}`);
                }

                return;
            }

            // Incoming remote call response

            const resp = jsonrpc;
            const req = This.pending_req[resp.id];
            delete This.pending_req[resp.id];
            if (!req) return;

            clearTimeout(req.timeout);

            if ('result' in resp) {
                if (req.on_success) {
                    try { req.on_success( resp.result, packet_prop ) }
                    catch(e) { This._debug(`Uncaught exception into on_success callback of ${req.method}: ${e}`) }
                }
            }
            else {
                This._debug(`Error response from ${req.method} call: ${resp.error.message}`);
                if (req.on_error) {
                    try { req.on_error( resp.error, packet_prop ) }
                    catch(e) { This._debug(`Uncaught exception into on_error callback of ${req.method}: ${e}`) }
                }
                else {
                    This._on_error(resp.error);
                }
            }
        };

        this.mqtt.subscribe(
            response_topic,
            { properties: { subscriptionIdentifier: subscr_id }},
            function (err, granted) {
                if (err) throw `Failed to subscribe to ${response_topic}: ${err}`;
            }
        );
    },

    accept_remote_calls: function(args) {

        // This is included for reference, but please note that frontend clients
        // should *not* be allowed to even connect to the backend broker, let alone
        // consume from req/backend/*, as that would allow a malicious actor to 
        // disrupt services or steal other users credentials

        if (!this.mqtt.connected) throw "Not connected to MQTT broker";

        const subscr_id = this.subscr_seq++;
        const on_receive = args.on_receive;
        const This = this;

        this.subscr_cb[subscr_id] = function(jsonrpc, packet_prop) {

            // Incoming remote request

            let json;

            try {
                let result = on_receive( jsonrpc.params, packet_prop );
                json = JSON.stringify({
                    jsonrpc: "2.0",
                    result: result,
                    id: req.id
                });
            }
            catch (e) {
                json = JSON.stringify({
                    jsonrpc: "2.0",
                    error: { code: -32603, message: e.message },
                    id: req.id
                });
            }

            This.mqtt.publish(
                packet_prop.responseTopic,
                json,
                {}
            );

            This._debug(`Sent >> ${json}`);
        };

        const topic = '$share/BKPR/req/backend/' + args.method.replace(/\./g,'/');

        this.mqtt.subscribe(
            topic,
            { properties: { subscriptionIdentifier: subscr_id }},
            function (err, granted) {
                if (err) throw `Failed to subscribe to ${topic}: ${err}`;
            }
        );
    },
}};



( run in 0.460 second using v1.01-cache-2.11-cpan-39bf76dae61 )