Beekeeper
view release on metacpan or search on metacpan
lib/Beekeeper/Worker.pm view on Meta::CPAN
my $method = $request->{method};
unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
log_error "Received notification with invalid method '$method'";
return;
}
my $cb = $worker->{callbacks}->{"msg.$1.$2"} ||
$worker->{callbacks}->{"msg.$1.*"};
local $client->{caller_id} = $mqtt_properties->{'clid'};
local $client->{caller_addr} = $mqtt_properties->{'addr'};
local $client->{auth_data} = $mqtt_properties->{'auth'};
unless (($self->authorize_request($request) || "") eq BKPR_REQUEST_AUTHORIZED) {
log_error "Notification '$method' was not authorized";
return;
}
unless ($cb) {
log_error "No handler found for received notification '$method'";
return;
}
$cb->($self, $request->{params}, $request);
};
if ($@) {
# Got an exception while processing message
log_error $@;
$worker->{error_count}++;
}
}
if ($task = shift @{$worker->{task_queue_low}}) {
## RPC Call
my ($payload_ref, $mqtt_properties) = @$task;
$worker->{call_count}++;
my ($request, $request_id, $result, $response);
$result = eval {
$request = decode_json($$payload_ref);
unless (ref $request eq 'HASH' && $request->{jsonrpc} eq '2.0') {
log_error "Received invalid JSON-RPC 2.0 request";
die Beekeeper::JSONRPC::Error->invalid_request;
}
$request_id = $request->{id};
my $method = $request->{method};
bless $request, 'Beekeeper::JSONRPC::Request';
$request->{_mqtt_properties} = $mqtt_properties;
unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
log_error "Received request with invalid method '$method'";
die Beekeeper::JSONRPC::Error->method_not_found;
}
my $cb = $worker->{callbacks}->{"req.$1.$2"} ||
$worker->{callbacks}->{"req.$1.*"};
local $client->{caller_id} = $mqtt_properties->{'clid'};
local $client->{caller_addr} = $mqtt_properties->{'addr'};
local $client->{auth_data} = $mqtt_properties->{'auth'};
unless (($self->authorize_request($request) || "") eq BKPR_REQUEST_AUTHORIZED) {
log_error "Request '$method' was not authorized";
die Beekeeper::JSONRPC::Error->request_not_authorized;
}
unless ($cb) {
log_error "No handler found for received request '$method'";
die Beekeeper::JSONRPC::Error->method_not_found;
}
# Execute method handler
$cb->($self, $request->{params}, $request);
};
if ($@) {
# Got an exception while executing method handler
if (blessed($@) && $@->isa('Beekeeper::JSONRPC::Error')) {
# Handled exception
$response = $@;
$worker->{error_count}++;
}
else {
# Unhandled exception
log_error $@;
$worker->{error_count}++;
$response = Beekeeper::JSONRPC::Error->server_error;
# Sending exact error to caller is very handy, but it is also a security risk
$response->{error}->{data} = $@ if $worker->{debug};
$worker->{error_count}++;
}
}
elsif (blessed($result) && $result->isa('Beekeeper::JSONRPC::Error')) {
# Explicit error response
$response = $result;
$worker->{error_count}++;
}
elsif ($request->{_async_response}) {
# Response was deferred and will be sent later
$worker->{in_progress}++;
$request->{_worker} = $self;
}
else {
# Build a success response
$response = {
jsonrpc => '2.0',
result => $result,
};
}
if (defined $request_id && defined $response) {
# Send back response to caller
$response->{id} = $request_id;
my $json = eval { $JSON->encode( $response ) };
if ($@) {
# Probably response contains blessed references
log_error "Couldn't serialize response as JSON: $@";
$response = Beekeeper::JSONRPC::Error->server_error;
$response->{id} = $request_id;
$json = $JSON->encode( $response );
}
if ($request->{_deflate_response} && length($json) > $request->{_deflate_response}) {
my $compressed_json;
$DEFLATE->deflate(\$json, $compressed_json);
( run in 0.786 second using v1.01-cache-2.11-cpan-99c4e6809bf )