Beekeeper
view release on metacpan or search on metacpan
lib/Beekeeper/Client.pm view on Meta::CPAN
};
# Reuse or create a private topic which will receive responses
$send_args{'response_topic'} = $client->{response_topic} ||
$self->__create_response_topic;
unless ($FIRE_FORGET) {
# Assign an unique request id (unique only for this client)
$req_id = $client->{correlation_id}++;
$req->{'id'} = $req_id;
}
my $json = encode_json($req);
if (exists $args{'buffer_id'}) {
$send_args{'buffer_id'} = $args{'buffer_id'};
}
# Send request
$self->{_BUS}->publish(
payload => \$json,
qos => 1,
%send_args,
);
if ($FIRE_FORGET) {
# Nothing else to do
return;
}
elsif ($SYNCHRONOUS) {
$req->{_raise_error} = (defined $raise_error) ? $raise_error : 1;
# Wait until a response is received in the reply queue
$req->{_waiting_response} = AnyEvent->condvar;
$req->{_waiting_response}->begin;
}
else {
$req->{_on_success_cb} = $args{'on_success'};
$req->{_on_error_cb} = $args{'on_error'};
if ($raise_error && !$req->{_on_error_cb}) {
$req->{_on_error_cb} = sub {
my $errmsg = $_[0]->code . " " . $_[0]->message;
croak "Call to '$service.$method' failed: $errmsg";
};
}
# Use shared cv for all requests
if (!$client->{async_cv} || $client->{async_cv}->ready) {
$client->{async_cv} = AnyEvent->condvar;
}
$req->{_waiting_response} = $client->{async_cv};
$req->{_waiting_response}->begin;
}
$client->{in_progress}->{$req_id} = $req;
# Ensure that timeout is set properly when the event loop was blocked
if ($__now != CORE::time) { $__now = CORE::time; AnyEvent->now_update }
# Request timeout timer
my $timeout = $args{'timeout'} || REQ_TIMEOUT;
$req->{_timeout} = AnyEvent->timer( after => $timeout, cb => sub {
my $req = delete $client->{in_progress}->{$req_id};
$req->{_response} = Beekeeper::JSONRPC::Error->request_timeout;
$req->{_on_error_cb}->($req->{_response}) if $req->{_on_error_cb};
$req->{_waiting_response}->end;
});
bless $req, 'Beekeeper::JSONRPC::Request';
return $req;
}
sub __create_response_topic {
my $self = shift;
my $client = $self->{_CLIENT};
my ($file, $line) = (caller(2))[1,2];
my $at = "at $file line $line\n";
# Subscribe to an exclusive topic for receiving RPC responses
my $response_topic = 'priv/' . $self->{_BUS}->{client_id};
$client->{response_topic} = $response_topic;
$self->{_BUS}->subscribe(
topic => $response_topic,
maximum_qos => 0,
on_publish => sub {
my ($payload_ref, $mqtt_properties) = @_;
my $resp;
local $@;
eval {
if (substr($$payload_ref,0,1) eq "\x78") {
my $decompressed_json;
$INFLATE->inflate($payload_ref, $decompressed_json);
$INFLATE->inflateReset();
$resp = decode_json($decompressed_json);
}
else {
$resp = decode_json($$payload_ref);
}
};
unless (ref $resp eq 'HASH' && $resp->{jsonrpc} eq '2.0') {
warn "Received invalid JSON-RPC 2.0 message $at";
return;
}
if (exists $resp->{'id'}) {
# Response of an RPC request
my $req_id = $resp->{'id'};
my $req = delete $client->{in_progress}->{$req_id};
( run in 0.960 second using v1.01-cache-2.11-cpan-e1769b4cff6 )