Beekeeper
view release on metacpan or search on metacpan
lib/Beekeeper/Client.pm view on Meta::CPAN
payload => \$json,
qos => 1,
%send_args,
);
if ($FIRE_FORGET) {
# Nothing else to do
return;
}
elsif ($SYNCHRONOUS) {
$req->{_raise_error} = (defined $raise_error) ? $raise_error : 1;
# Wait until a response is received in the reply queue
$req->{_waiting_response} = AnyEvent->condvar;
$req->{_waiting_response}->begin;
}
else {
$req->{_on_success_cb} = $args{'on_success'};
$req->{_on_error_cb} = $args{'on_error'};
if ($raise_error && !$req->{_on_error_cb}) {
$req->{_on_error_cb} = sub {
my $errmsg = $_[0]->code . " " . $_[0]->message;
croak "Call to '$service.$method' failed: $errmsg";
};
}
# Use shared cv for all requests
if (!$client->{async_cv} || $client->{async_cv}->ready) {
$client->{async_cv} = AnyEvent->condvar;
}
$req->{_waiting_response} = $client->{async_cv};
$req->{_waiting_response}->begin;
}
$client->{in_progress}->{$req_id} = $req;
# Ensure that timeout is set properly when the event loop was blocked
if ($__now != CORE::time) { $__now = CORE::time; AnyEvent->now_update }
# Request timeout timer
my $timeout = $args{'timeout'} || REQ_TIMEOUT;
$req->{_timeout} = AnyEvent->timer( after => $timeout, cb => sub {
my $req = delete $client->{in_progress}->{$req_id};
$req->{_response} = Beekeeper::JSONRPC::Error->request_timeout;
$req->{_on_error_cb}->($req->{_response}) if $req->{_on_error_cb};
$req->{_waiting_response}->end;
});
bless $req, 'Beekeeper::JSONRPC::Request';
return $req;
}
sub __create_response_topic {
my $self = shift;
my $client = $self->{_CLIENT};
my ($file, $line) = (caller(2))[1,2];
my $at = "at $file line $line\n";
# Subscribe to an exclusive topic for receiving RPC responses
my $response_topic = 'priv/' . $self->{_BUS}->{client_id};
$client->{response_topic} = $response_topic;
$self->{_BUS}->subscribe(
topic => $response_topic,
maximum_qos => 0,
on_publish => sub {
my ($payload_ref, $mqtt_properties) = @_;
my $resp;
local $@;
eval {
if (substr($$payload_ref,0,1) eq "\x78") {
my $decompressed_json;
$INFLATE->inflate($payload_ref, $decompressed_json);
$INFLATE->inflateReset();
$resp = decode_json($decompressed_json);
}
else {
$resp = decode_json($$payload_ref);
}
};
unless (ref $resp eq 'HASH' && $resp->{jsonrpc} eq '2.0') {
warn "Received invalid JSON-RPC 2.0 message $at";
return;
}
if (exists $resp->{'id'}) {
# Response of an RPC request
my $req_id = $resp->{'id'};
my $req = delete $client->{in_progress}->{$req_id};
# Ignore unexpected responses
return unless $req;
# Cancel request timeout
delete $req->{_timeout};
if (exists $resp->{'result'}) {
# Success response
$req->{_response} = bless $resp, 'Beekeeper::JSONRPC::Response';
$req->{_on_success_cb}->($resp) if $req->{_on_success_cb};
}
else {
# Error response
$req->{_response} = bless $resp, 'Beekeeper::JSONRPC::Error';
$req->{_on_error_cb}->($resp) if $req->{_on_error_cb};
}
$req->{_waiting_response}->end;
}
else {
( run in 1.931 second using v1.01-cache-2.11-cpan-75ffa21a3d4 )