Beekeeper
view release on metacpan or search on metacpan
lib/Beekeeper/Client.pm view on Meta::CPAN
use strict;
use warnings;
our $VERSION = '0.10';
use Beekeeper::AnyEvent;
use Beekeeper::MQTT;
use Beekeeper::JSONRPC;
use Beekeeper::Config;
use JSON::XS;
use Compress::Raw::Zlib ();
use Digest::MD5 'md5_base64';
use Carp;
use constant QUEUE_LANES => 2;
use constant REQ_TIMEOUT => 60;
use Exporter 'import';
our @EXPORT_OK = qw(
send_notification
call_remote
call_remote_async
fire_remote
wait_async_calls
get_authentication_data
set_authentication_data
__do_rpc_request
__create_response_topic
__use_authorization_token
);
our %EXPORT_TAGS = ('worker' => \@EXPORT_OK );
our $singleton;
my $INFLATE;
sub new {
my ($class, %args) = @_;
my $self = {
_CLIENT => undef,
_BUS => undef,
};
$self->{_CLIENT} = {
forward_to => undef,
response_topic => undef,
in_progress => undef,
curr_request => undef,
caller_id => undef,
caller_addr => undef,
auth_data => undef,
auth_salt => undef,
async_cv => undef,
correlation_id => 1,
callbacks => {},
};
unless (exists $args{'username'} && exists $args{'password'}) {
# Get broker connection parameters from config file
my $bus_id = $args{'bus_id'};
if (defined $bus_id) {
# Use parameters for specific bus
my $config = Beekeeper::Config->get_bus_config( bus_id => $bus_id );
croak "Bus '$bus_id' is not defined into config file bus.config.json" unless $config;
%args = ( %$config, %args );
}
else {
my $config = Beekeeper::Config->get_bus_config( bus_id => '*');
if (scalar(keys %$config) == 1) {
# Use the only config present
($bus_id) = (keys %$config);
%args = ( %{$config->{$bus_id}}, bus_id => $bus_id, %args );
}
else {
# Use default parameters (if any)
my ($default) = grep { $config->{$_}->{default} } keys %$config;
croak "No default bus defined into config file bus.config.json" unless $default;
$bus_id = $config->{$default}->{'bus_id'};
%args = ( %{$config->{$default}}, bus_id => $bus_id, %args );
}
}
}
$INFLATE = Compress::Raw::Zlib::Inflate->new( -ConsumeInput => 0 );
$self->{_CLIENT}->{forward_to} = delete $args{'forward_to'};
$self->{_CLIENT}->{auth_salt} = delete $args{'auth_salt'} || $args{'bus_id'};
# Start a fresh new MQTT session on connect
$args{'clean_start'} = 1;
# Make the MQTT session ends when the connection is closed
$args{'session_expiry_interval'} = 0;
# Keep only 1 unacked message (of QoS 1) in flight
$args{'receive_maximum'} = 1;
# Do not use topic aliases
$args{'topic_alias_maximum'} = 0;
$self->{_BUS} = Beekeeper::MQTT->new( %args );
# Connect to MQTT broker
$self->{_BUS}->connect( blocking => 1 );
bless $self, $class;
return $self;
}
sub instance {
my $class = shift;
lib/Beekeeper/Client.pm view on Meta::CPAN
return $self;
}
sub send_notification {
my ($self, %args) = @_;
my $fq_meth = $args{'method'} or croak "Method was not specified";
$fq_meth .= '@' . $args{'address'} if (defined $args{'address'});
$fq_meth =~ m/^ ( [\w-]+ (?:\.[\w-]+)* )
\. ( [\w-]+ )
(?: \@ ( [\w-]+ ) (\.[\w-]+)* )? $/x or croak "Invalid method '$fq_meth'";
my ($service, $method, $remote_bus, $addr) = ($1, $2, $3, $4);
my $json = encode_json({
jsonrpc => '2.0',
method => "$service.$method",
params => $args{'params'},
});
my %send_args;
my $local_bus = $self->{_BUS}->{bus_role};
$remote_bus = $self->{_CLIENT}->{forward_to} unless (defined $remote_bus);
if (defined $remote_bus) {
$send_args{'topic'} = "msg/$remote_bus-" . int( rand(QUEUE_LANES) + 1 );
$send_args{'topic'} =~ tr|.|/|;
$send_args{'fwd_to'} = "msg/$remote_bus/$service/$method";
$send_args{'fwd_to'} .= "\@$addr" if (defined $addr && $addr =~ s/^\.//);
$send_args{'fwd_to'} =~ tr|.|/|;
}
else {
$send_args{'topic'} = "msg/$local_bus/$service/$method";
$send_args{'topic'} =~ tr|.|/|;
}
$send_args{'auth'} = $self->{_CLIENT}->{auth_data} if defined $self->{_CLIENT}->{auth_data};
$send_args{'clid'} = $self->{_CLIENT}->{caller_id} if defined $self->{_CLIENT}->{caller_id};
if (exists $args{'buffer_id'}) {
$send_args{'buffer_id'} = $args{'buffer_id'};
}
$self->{_BUS}->publish( payload => \$json, %send_args );
}
sub accept_notifications {
my ($self, %args) = @_;
my ($file, $line) = (caller)[1,2];
my $at = "at $file line $line\n";
my $callbacks = $self->{_CLIENT}->{callbacks};
foreach my $fq_meth (keys %args) {
$fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
\. ( [\w-]+ | \* ) $/x or croak "Invalid notification method '$fq_meth'";
my ($service, $method) = ($1, $2);
my $callback = $args{$fq_meth};
unless (ref $callback eq 'CODE') {
croak "Invalid callback for '$fq_meth'";
}
croak "Already accepting notifications '$fq_meth'" if exists $callbacks->{"msg.$fq_meth"};
$callbacks->{"msg.$fq_meth"} = $callback;
#TODO: Allow to accept private notifications without subscribing
my $local_bus = $self->{_BUS}->{bus_role};
my $topic = "msg/$local_bus/$service/$method";
$topic =~ tr|.*|/#|;
$self->{_BUS}->subscribe(
topic => $topic,
on_publish => sub {
my ($payload_ref, $mqtt_properties) = @_;
local $@;
my $request = eval { decode_json($$payload_ref) };
unless (ref $request eq 'HASH' && $request->{jsonrpc} eq '2.0') {
warn "Received invalid JSON-RPC 2.0 notification $at";
return;
}
bless $request, 'Beekeeper::JSONRPC::Notification';
$request->{_mqtt_properties} = $mqtt_properties;
my $method = $request->{method};
unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
warn "Received notification with invalid method '$method' $at";
return;
}
my $cb = $callbacks->{"msg.$1.$2"} ||
$callbacks->{"msg.$1.*"};
unless ($cb) {
warn "No callback found for received notification '$method' $at";
return;
}
$cb->($request->{params}, $request);
},
on_suback => sub {
my ($success, $prop) = @_;
die "Could not subscribe to topic '$topic' $at" unless $success;
}
);
}
}
sub stop_accepting_notifications {
my ($self, @methods) = @_;
my ($file, $line) = (caller)[1,2];
my $at = "at $file line $line\n";
croak "No method specified" unless @methods;
foreach my $fq_meth (@methods) {
$fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
\. ( [\w-]+ | \* ) $/x or croak "Invalid method '$fq_meth'";
my ($service, $method) = ($1, $2);
unless (defined $self->{_CLIENT}->{callbacks}->{"msg.$fq_meth"}) {
carp "Not previously accepting notifications '$fq_meth'";
next;
}
my $local_bus = $self->{_BUS}->{bus_role};
my $topic = "msg/$local_bus/$service/$method";
$topic =~ tr|.*|/#|;
$self->{_BUS}->unsubscribe(
topic => $topic,
on_unsuback => sub {
my ($success, $prop) = @_;
die "Could not unsubscribe from topic '$topic' $at" unless $success;
delete $self->{_CLIENT}->{callbacks}->{"msg.$fq_meth"};
},
);
}
}
our $AE_WAITING;
sub call_remote {
my $self = shift;
my $req = $self->__do_rpc_request( @_, req_type => 'SYNCHRONOUS' );
# Make AnyEvent allow one level of recursive condvar blocking, as we may
# block both in $worker->__work_forever and in $client->__do_rpc_request
$AE_WAITING && Carp::confess "Recursive blocking call attempted: " .
"trying to make a call_remote while another call_remote is still in progress, " .
"but it is not possible to make two blocking calls simultaneously " .
"(tip: one of the two calls must be made with call_remote_async)";
local $AE_WAITING = 1;
local $AnyEvent::CondVar::Base::WAITING = 0;
# Block until a response is received or request timed out
$req->{_waiting_response}->recv;
my $resp = $req->{_response};
if (!exists $resp->{result} && $req->{_raise_error}) {
my $errmsg = $resp->code . " " . $resp->message;
croak "Call to '$req->{method}' failed: $errmsg";
}
return $resp;
}
sub call_remote_async {
my $self = shift;
my $req = $self->__do_rpc_request( @_, req_type => 'ASYNCHRONOUS' );
return $req;
}
sub fire_remote {
my $self = shift;
# Send request to a worker, but do not wait for response
$self->__do_rpc_request( @_, req_type => 'FIRE_FORGET' );
return;
}
my $__now = 0;
sub __do_rpc_request {
my ($self, %args) = @_;
my $client = $self->{_CLIENT};
my $fq_meth = $args{'method'} or croak "Method was not specified";
lib/Beekeeper/Client.pm view on Meta::CPAN
my $resp;
local $@;
eval {
if (substr($$payload_ref,0,1) eq "\x78") {
my $decompressed_json;
$INFLATE->inflate($payload_ref, $decompressed_json);
$INFLATE->inflateReset();
$resp = decode_json($decompressed_json);
}
else {
$resp = decode_json($$payload_ref);
}
};
unless (ref $resp eq 'HASH' && $resp->{jsonrpc} eq '2.0') {
warn "Received invalid JSON-RPC 2.0 message $at";
return;
}
if (exists $resp->{'id'}) {
# Response of an RPC request
my $req_id = $resp->{'id'};
my $req = delete $client->{in_progress}->{$req_id};
# Ignore unexpected responses
return unless $req;
# Cancel request timeout
delete $req->{_timeout};
if (exists $resp->{'result'}) {
# Success response
$req->{_response} = bless $resp, 'Beekeeper::JSONRPC::Response';
$req->{_on_success_cb}->($resp) if $req->{_on_success_cb};
}
else {
# Error response
$req->{_response} = bless $resp, 'Beekeeper::JSONRPC::Error';
$req->{_on_error_cb}->($resp) if $req->{_on_error_cb};
}
$req->{_waiting_response}->end;
}
else {
# Unicasted notification
bless $resp, 'Beekeeper::JSONRPC::Notification';
$resp->{_headers} = $mqtt_properties;
my $method = $resp->{method};
unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
warn "Received notification with invalid method '$method' $at";
return;
}
my $cb = $client->{callbacks}->{"msg.$1.$2"} ||
$client->{callbacks}->{"msg.$1.*"};
unless ($cb) {
warn "No callback found for received notification '$method' $at";
return;
}
$cb->($resp->{params}, $resp);
}
},
on_suback => sub {
my ($success, $prop) = @_;
die "Could not subscribe to response topic '$response_topic' $at" unless $success;
}
);
return $response_topic;
}
sub wait_async_calls {
my ($self) = @_;
# Wait for all pending async requests
my $cv = delete $self->{_CLIENT}->{async_cv};
return unless defined $cv;
# Make AnyEvent to allow one level of recursive condvar blocking, as we may
# block both in $worker->__work_forever and here
$AE_WAITING && Carp::confess "Recursive condvar blocking wait attempted";
local $AE_WAITING = 1;
local $AnyEvent::CondVar::Base::WAITING = 0;
$cv->recv;
}
sub get_authentication_data {
my ($self) = @_;
$self->{_CLIENT}->{auth_data};
}
sub set_authentication_data {
my ($self, $data) = @_;
$self->{_CLIENT}->{auth_data} = $data;
}
sub __use_authorization_token {
my ($self, $token) = @_;
# Using a hashing function makes harder to access the wrong worker pool by mistake,
# but it is not an effective access restriction: anyone with access to the backend
# bus credentials can easily inspect and clone auth data tokens
my $salt = $self->{_CLIENT}->{auth_salt};
my $adata_ref = \$self->{_CLIENT}->{auth_data};
my $guard = Beekeeper::Client::Guard->new( $adata_ref );
lib/Beekeeper/Client.pm view on Meta::CPAN
âââââââââââââââââââââ¬âââââââââââââââ¬âââââââââ¬âââââââââ¬âââââââââ
â method â sent to â queued â result â blocks â
âââââââââââââââââââââ¼âââââââââââââââ¼âââââââââ¼âââââââââ¼âââââââââ¤
â call_remote â 1 worker â yes â yes â yes â
â call_remote_async â 1 worker â yes â yes â no â
â fire_remote â 1 worker â yes â no â no â
â send_notification â many workers â no â no â no â
âââââââââââââââââââââ´âââââââââââââââ´âââââââââ´âââââââââ´âââââââââ
All methods in this module are exported by default to L<Beekeeper::Worker>.
=head1 CONSTRUCTOR
=head3 instance( %args )
Connects to the message broker and returns a singleton instance.
Unless explicit connection parameters to the broker are provided it tries
to connect using the parameters defined in config file C<bus.config.json>.
=head1 METHODS
=head3 send_notification ( %args )
Broadcasts a notification to the message bus.
All clients and workers listening for given method will receive it.
If no one is listening for it the notification will be discarded.
=over
=item method
A string with the name of the notification being sent with format C<{service_class}.{method}>.
=item params
An arbitrary value or data structure sent with the notification. It could be undefined,
but it should not contain blessed references that cannot be serialized as JSON.
=item address
A string with the name of the remote bus when sending notifications to another logical
bus. Notifications to another bus need a router shoveling them.
=back
=head3 accept_notifications ( $method => $callback, ... )
Makes a client start accepting the specified notifications from the message bus.
C<$method> is a string with the format C<{service_class}.{method}>. A default
or fallback handler can be specified using a wildcard like C<{service_class}.*>.
C<$callback> is a coderef that will be called when a notification is received.
When executed, the callback will receive a parameter C<$params> which contains
the notification value or data structure sent.
Please note that callbacks will not be executed timely if AnyEvent loop is not running.
=head3 stop_accepting_notifications ( $method, ... )
Makes a client stop accepting the specified notifications from the message bus.
C<$method> must be one of the strings used previously in C<accept_notifications>.
=head3 call_remote ( %args )
Makes a synchronous RPC call to a service worker through the message bus.
It will wait (in the event loop) until a response is received, wich will be either
a L<Beekeeper::JSONRPC::Response> object or a L<Beekeeper::JSONRPC::Error>.
On error it will die unless C<raise_error> option is set to false.
This method accepts the following parameters:
=over
=item method
A string with the name of the method to be invoked with format C<{service_class}.{method}>.
=item params
An arbitrary value or data structure to be passed as parameters to the defined method.
It could be undefined, but it should not contain blessed references that cannot be
serialized as JSON.
=item address
A string with the name of the remote bus when calling methods of workers connected
to another logical bus. Requests to another bus need a router shoveling them.
=item timeout
Time in seconds before cancelling the request and returning an error response. If the
request takes too long but otherwise was executed successfully the response will
eventually arrive but it will be ignored.
=item raise_error
If set to true (the default) dies with the received error message when a call returns
an error response. If set to false returns a L<Beekeeper::JSONRPC::Error> instead.
=back
=head3 call_remote_async ( %args )
Makes an asynchronous RPC call to a service worker through the message bus.
It returns immediately a L<Beekeeper::JSONRPC::Request> object which, once completed,
will have a defined C<response>.
This method accepts parameters C<method>, C<params>, C<address> and C<timeout>
the same as C<call_remote>. Additionally two callbacks can be specified:
=over
=item on_success
Callback which will be executed after receiving a successful response with a
L<Beekeeper::JSONRPC::Response> object as parameter. Must be a coderef.
=item on_error
Callback which will be executed after receiving an error response with a
L<Beekeeper::JSONRPC::Error> object as parameter. Must be a coderef.
=back
=head3 fire_remote ( %args )
Makes a fire and forget RPC call to a service worker through the message bus.
It returns undef immediately. The worker receiving the call will not send back a response.
This method accepts parameters C<method>, C<params> and C<address> the same as C<call_remote>.
=head3 wait_async_calls
Waits (running the event loop) until all calls made by C<call_remote_async> are completed
either by success, error or timeout.
=head3 set_authentication_data ( $data )
Adds an arbitrary authentication data blob to subsequent calls or notifications sent.
This data persists for client lifetime in standalone clients. Within worker context
it persists until the end of current request only, and will be piggybacked on
calls made to another workers within the scope of current request.
The meaning of this data is application specific, this framework doesn't give
any special one to it.
=head3 get_authentication_data
Returns the current authentication data blob.
=head1 SEE ALSO
L<Beekeeper::Worker>, L<Beekeeper::Config>, L<Beekeeper::MQTT>.
=head1 AUTHOR
José Micó, C<jose.mico@gmail.com>
=head1 COPYRIGHT AND LICENSE
Copyright 2015-2023 José Micó.
This is free software; you can redistribute it and/or modify it under the same
terms as the Perl 5 programming language itself.
This software is distributed in the hope that it will be useful, but it is
provided âas isâ and without any express or implied warranties. For details,
( run in 0.593 second using v1.01-cache-2.11-cpan-39bf76dae61 )