Beekeeper
view release on metacpan or search on metacpan
lib/Beekeeper/Client.pm view on Meta::CPAN
# Keep a global reference to $self
$singleton = $self;
return $self;
}
sub send_notification {
my ($self, %args) = @_;
my $fq_meth = $args{'method'} or croak "Method was not specified";
$fq_meth .= '@' . $args{'address'} if (defined $args{'address'});
$fq_meth =~ m/^ ( [\w-]+ (?:\.[\w-]+)* )
\. ( [\w-]+ )
(?: \@ ( [\w-]+ ) (\.[\w-]+)* )? $/x or croak "Invalid method '$fq_meth'";
my ($service, $method, $remote_bus, $addr) = ($1, $2, $3, $4);
my $json = encode_json({
jsonrpc => '2.0',
method => "$service.$method",
params => $args{'params'},
});
my %send_args;
my $local_bus = $self->{_BUS}->{bus_role};
$remote_bus = $self->{_CLIENT}->{forward_to} unless (defined $remote_bus);
if (defined $remote_bus) {
$send_args{'topic'} = "msg/$remote_bus-" . int( rand(QUEUE_LANES) + 1 );
$send_args{'topic'} =~ tr|.|/|;
$send_args{'fwd_to'} = "msg/$remote_bus/$service/$method";
$send_args{'fwd_to'} .= "\@$addr" if (defined $addr && $addr =~ s/^\.//);
$send_args{'fwd_to'} =~ tr|.|/|;
}
else {
$send_args{'topic'} = "msg/$local_bus/$service/$method";
$send_args{'topic'} =~ tr|.|/|;
}
$send_args{'auth'} = $self->{_CLIENT}->{auth_data} if defined $self->{_CLIENT}->{auth_data};
$send_args{'clid'} = $self->{_CLIENT}->{caller_id} if defined $self->{_CLIENT}->{caller_id};
if (exists $args{'buffer_id'}) {
$send_args{'buffer_id'} = $args{'buffer_id'};
}
$self->{_BUS}->publish( payload => \$json, %send_args );
}
sub accept_notifications {
my ($self, %args) = @_;
my ($file, $line) = (caller)[1,2];
my $at = "at $file line $line\n";
my $callbacks = $self->{_CLIENT}->{callbacks};
foreach my $fq_meth (keys %args) {
$fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
\. ( [\w-]+ | \* ) $/x or croak "Invalid notification method '$fq_meth'";
my ($service, $method) = ($1, $2);
my $callback = $args{$fq_meth};
unless (ref $callback eq 'CODE') {
croak "Invalid callback for '$fq_meth'";
}
croak "Already accepting notifications '$fq_meth'" if exists $callbacks->{"msg.$fq_meth"};
$callbacks->{"msg.$fq_meth"} = $callback;
#TODO: Allow to accept private notifications without subscribing
my $local_bus = $self->{_BUS}->{bus_role};
my $topic = "msg/$local_bus/$service/$method";
$topic =~ tr|.*|/#|;
$self->{_BUS}->subscribe(
topic => $topic,
on_publish => sub {
my ($payload_ref, $mqtt_properties) = @_;
local $@;
my $request = eval { decode_json($$payload_ref) };
unless (ref $request eq 'HASH' && $request->{jsonrpc} eq '2.0') {
warn "Received invalid JSON-RPC 2.0 notification $at";
return;
}
bless $request, 'Beekeeper::JSONRPC::Notification';
$request->{_mqtt_properties} = $mqtt_properties;
my $method = $request->{method};
unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
warn "Received notification with invalid method '$method' $at";
return;
}
my $cb = $callbacks->{"msg.$1.$2"} ||
$callbacks->{"msg.$1.*"};
unless ($cb) {
warn "No callback found for received notification '$method' $at";
return;
}
$cb->($request->{params}, $request);
},
on_suback => sub {
my ($success, $prop) = @_;
die "Could not subscribe to topic '$topic' $at" unless $success;
}
);
}
}
sub stop_accepting_notifications {
my ($self, @methods) = @_;
my ($file, $line) = (caller)[1,2];
my $at = "at $file line $line\n";
croak "No method specified" unless @methods;
foreach my $fq_meth (@methods) {
$fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
\. ( [\w-]+ | \* ) $/x or croak "Invalid method '$fq_meth'";
my ($service, $method) = ($1, $2);
unless (defined $self->{_CLIENT}->{callbacks}->{"msg.$fq_meth"}) {
carp "Not previously accepting notifications '$fq_meth'";
next;
}
my $local_bus = $self->{_BUS}->{bus_role};
my $topic = "msg/$local_bus/$service/$method";
$topic =~ tr|.*|/#|;
$self->{_BUS}->unsubscribe(
topic => $topic,
on_unsuback => sub {
my ($success, $prop) = @_;
die "Could not unsubscribe from topic '$topic' $at" unless $success;
delete $self->{_CLIENT}->{callbacks}->{"msg.$fq_meth"};
},
);
}
}
our $AE_WAITING;
sub call_remote {
my $self = shift;
my $req = $self->__do_rpc_request( @_, req_type => 'SYNCHRONOUS' );
# Make AnyEvent allow one level of recursive condvar blocking, as we may
# block both in $worker->__work_forever and in $client->__do_rpc_request
$AE_WAITING && Carp::confess "Recursive blocking call attempted: " .
"trying to make a call_remote while another call_remote is still in progress, " .
"but it is not possible to make two blocking calls simultaneously " .
"(tip: one of the two calls must be made with call_remote_async)";
local $AE_WAITING = 1;
local $AnyEvent::CondVar::Base::WAITING = 0;
# Block until a response is received or request timed out
$req->{_waiting_response}->recv;
my $resp = $req->{_response};
if (!exists $resp->{result} && $req->{_raise_error}) {
my $errmsg = $resp->code . " " . $resp->message;
croak "Call to '$req->{method}' failed: $errmsg";
( run in 0.698 second using v1.01-cache-2.11-cpan-5511b514fd6 )