Beekeeper
view release on metacpan or search on metacpan
lib/Beekeeper/Worker.pm view on Meta::CPAN
# Build a success response
$response = {
jsonrpc => '2.0',
result => $result,
};
}
if (defined $request_id && defined $response) {
# Send back response to caller
$response->{id} = $request_id;
my $json = eval { $JSON->encode( $response ) };
if ($@) {
# Probably response contains blessed references
log_error "Couldn't serialize response as JSON: $@";
$response = Beekeeper::JSONRPC::Error->server_error;
$response->{id} = $request_id;
$json = $JSON->encode( $response );
}
if ($request->{_deflate_response} && length($json) > $request->{_deflate_response}) {
my $compressed_json;
$DEFLATE->deflate(\$json, $compressed_json);
$DEFLATE->flush(\$compressed_json);
$DEFLATE->deflateReset;
$json = $compressed_json;
}
# Request is acknowledged as received just after sending the response. So, if
# the process is abruptly interrupted here, the broker will send the request to
# another worker and it will be executed twice (acking the request just before
# processing it may cause unprocessed requests or undelivered responses)
$self->{_BUS}->publish(
topic => $mqtt_properties->{'response_topic'},
addr => $mqtt_properties->{'addr'},
payload => \$json,
buffer_id => 'response',
);
if (exists $mqtt_properties->{'packet_id'}) {
$self->{_BUS}->puback(
packet_id => $mqtt_properties->{'packet_id'},
buffer_id => 'response',
);
}
else {
# Should not happen (clients must publish with QoS 1)
log_warn "Request published with QoS 0 to topic " . $mqtt_properties->{'topic'};
}
$self->{_BUS}->flush_buffer( buffer_id => 'response' );
}
else {
# Acknowledge requests that doesn't send a response right now (fire & forget calls
# and requests handled asynchronously), signaling the broker to send more requests
$self->{_BUS}->puback(
packet_id => $mqtt_properties->{'packet_id'},
);
}
}
redo DRAIN if (@{$worker->{task_queue_high}} || @{$worker->{task_queue_low}});
# Execute tasks postponed until task queue is empty
if (exists $worker->{postponed}) {
$_->() foreach @{$worker->{postponed}};
delete $worker->{postponed};
}
}
$_TASK_QUEUE_DEPTH--;
if (defined $timing_tasks) {
$BUSY_TIME += Time::HiRes::time - $BUSY_SINCE;
undef $BUSY_SINCE;
}
$worker->{queued_tasks} = 0;
}
sub __send_response {
my ($self, $request, $result) = @_;
# Send back async response to caller
my ($timing_tasks, $response);
$self->{_WORKER}->{in_progress}--;
# fire & forget calls doesn't expect responses
return unless defined $request->{id};
unless (defined $BUSY_SINCE) {
$BUSY_SINCE = Time::HiRes::time;
$timing_tasks = 1;
}
if (blessed($result) && $result->isa('Beekeeper::JSONRPC::Error')) {
# Explicit error response
$response = $result;
$self->{_WORKER}->{error_count}++;
}
else {
# Build a success response
$response = {
jsonrpc => '2.0',
result => $result,
};
}
$response->{id} = $request->{id};
local $@;
my $json = eval { $JSON->encode( $response ) };
lib/Beekeeper/Worker.pm view on Meta::CPAN
}
sub stop_accepting_calls {
my ($self, @methods) = @_;
my ($file, $line) = (caller)[1,2];
my $at = "at $file line $line\n";
die "No method specified $at" unless @methods;
foreach my $fq_meth (@methods) {
$fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
\. ( [\w-]+ | \* ) $/x or die "Invalid remote call method '$fq_meth' $at";
my ($service, $method) = ($1, $2);
unless ($method eq '*') {
# Known limitation. As all calls for an entire service class are received
# through a single MQTT subscription (in order to load balance them), it is
# not possible to reject a single method. A workaround is to use a different
# class for each method that need to be individually rejected.
die "Cannot stop accepting individual methods, only '$service.*' is allowed $at";
}
my $worker = $self->{_WORKER};
my $callbacks = $worker->{callbacks};
my @cb_keys = grep { $_ =~ m/^req.\Q$service\E\b/ } keys %$callbacks;
unless (@cb_keys) {
log_warn "Not previously accepting remote calls '$fq_meth' $at";
next;
}
my $local_bus = $self->{_BUS}->{bus_role};
my $topic = "\$share/BKPR/req/$local_bus/$service";
$topic =~ tr|.*|/#|;
# Cannot remove callbacks right now, as new requests could be in flight or be already
# queued. We must wait for unsubscription completion, and then until the task queue
# is empty to ensure that all received requests were processed. And even then wait a
# bit more, as some brokers may send requests *after* unsubscription.
my $postpone = sub {
$worker->{stop_cv}->begin;
my $unsub_tmr; $unsub_tmr = AnyEvent->timer(
after => $UNSUBSCRIBE_LINGER, cb => sub {
delete $worker->{callbacks}->{$_} foreach @cb_keys;
delete $worker->{subscriptions}->{$service};
undef $unsub_tmr;
return unless $worker->{shutting_down};
if ($worker->{in_progress} > 0) {
# The task queue is empty now, but an asynchronous method handler is
# still busy processing some requests received previously. Wait for
# these requests to be completed before telling _work_forever to stop
my $wait_time = 60;
$worker->{stop_cv}->begin;
my $busy_tmr; $busy_tmr = AnyEvent->timer( after => 1, interval => 1, cb => sub {
unless ($worker->{in_progress} > 0 && --$wait_time > 0) {
undef $busy_tmr;
$worker->{stop_cv}->end;
}
});
}
# Tell _work_forever to stop
$worker->{stop_cv}->end;
}
);
};
$self->{_BUS}->unsubscribe(
topic => $topic,
on_unsuback => sub {
my ($success, $prop) = @_;
log_error "Could not unsubscribe from topic '$topic' $at" unless $success;
my $postponed = $worker->{postponed} ||= [];
push @$postponed, $postpone;
AnyEvent::postpone { $self->__drain_task_queue };
}
);
}
}
sub __work_forever {
my $self = shift;
# Called by WorkerPool->spawn_worker
eval {
my $worker = $self->{_WORKER};
$worker->{stop_cv} = AnyEvent->condvar;
# Blocks here until stop_working is called
$worker->{stop_cv}->recv;
$self->on_shutdown;
$self->__report_exit;
};
if ($@) {
log_fatal "Worker died: $@";
CORE::exit(255);
}
lib/Beekeeper/Worker.pm view on Meta::CPAN
The value or reference returned by the handler will be sent back to the caller
as response (unless the response is deferred with C<$req-E<gt>async_response>).
The handler is executed within an eval block. If it dies the error will be logged
and the caller will receive a generic error response, but the worker will continue
running.
Example:
package MyWorker;
use Beekeeper::Worker ':log';
use base 'Beekeeper::Worker';
sub on_startup {
my ($self) = @_;
$self->accept_remote_calls(
'foo.inc' => 'increment', # call $self->increment for requests to 'foo.inc'
'foo.baz' => $coderef, # call $coderef->() for requests to 'foo.baz'
'foo.*' => 'fallback', # call $self->fallback for any other 'foo.*'
);
}
sub increment {
my ($self, $params, $req) = @_;
# $self is a MyWorker object
# $params is a ref to the parameters of the request
# $req is a Beekeeper::JSONRPC::Request object
log_warn "Got a call to foo.inc";
return $params->{number} + 1;
}
Remote calls can be processed concurrently by means of calling C<$req-E<gt>async_response>
to tell Beekeeper that the response for the request will be deferred until it is
available, freeing the worker to accept more requests. Once the response is ready,
it must be sent back to the caller with C<$req-E<gt>send_response>.
This handler process requests concurrently:
sub increment {
my ($self, $params, $req) = @_;
my $number = $params->{number};
$req->async_response;
my $t; $t = AnyEvent->timer( after => 1, cb => sub {
undef $t;
$req->send_response( $number + 1 );
});
}
Note that callback closures will not be executed in Beekeeper scope but in the event loop
one, so uncatched exceptions in these closures will cause the worker to die and be respawn.
Asynchronous method handlers use system resources more efficiently, but are significantly
harder to write and debug.
=head3 stop_accepting_notifications ( $method, ... )
Makes a worker stop accepting the specified notifications from the message bus.
C<$method> must be one of the strings used previously in C<accept_notifications>.
=head3 stop_accepting_calls ( $method, ... )
Makes a worker stop accepting the specified RPC requests from the message bus.
C<$method> must be one of the strings used previously in C<accept_remote_calls>.
=head3 stop_working
Makes a worker stop accepting new RPC requests, process all requests already
received, execute C<on_shutdown> method, and then exit.
This is the default signal handler for C<TERM> signal.
Please note that it is not possible to stop worker pools calling this method, as
WorkerPool will immediately respawn another worker after the current one exits.
=head1 SEE ALSO
L<Beekeeper::Client>, L<Beekeeper::Config>, L<Beekeeper::Logger>, L<Beekeeper::WorkerPool>.
=head1 AUTHOR
José Micó, C<jose.mico@gmail.com>
=head1 COPYRIGHT AND LICENSE
Copyright 2015-2023 José Micó.
This is free software; you can redistribute it and/or modify it under the same
terms as the Perl 5 programming language itself.
This software is distributed in the hope that it will be useful, but it is
provided âas isâ and without any express or implied warranties. For details,
see the full text of the license in the file LICENSE.
=cut
( run in 2.156 seconds using v1.01-cache-2.11-cpan-5837b0d9d2c )