view release on metacpan or search on metacpan
- Add instructions for running pools as systemd services.
- Update install instructions for Mosquitto broker.
- Honor PATH and PERL5LIB environment variables.
- Add links to live examples.
0.08 2021-07-16
- Introduce HTML dashboard.
- Reorganize worker extensions.
0.07 2021-07-02
- Add support for asynchronous method handlers in workers.
- Introduce scraper example demonstrating asynchronous workers.
- Track and report the number of errors per second of workers.
- Revised documentation and performance estimation.
- Fix "Insecure dependency in connect" error.
0.06 2021-06-18
- Show errors from perspective of caller.
- Fix example configuration for Mosquitto broker.
0.05 2021-06-09
- Fix tests failing on smoke testers.
- The broker is an MQTT messaging server, like Mosquitto, HiveMQ or EMQ X.
- The messaging protocol is MQTT 5 (see the [specification](https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html)).
- The RPC protocol is JSON-RPC 2.0 (see the [specification](https://www.jsonrpc.org/specification)).
- There is no message persistence in the broker, it just passes on messages.
- There is no routing logic defined in the broker.
- Synchronous and asynchronous workers or clients can be integrated seamlessly.
- Efficient multicast and unicast push notifications.
- Inherent load balancing.
**What does this framework provides:**
- [`Beekeeper::Worker`](https://metacpan.org/pod/Beekeeper::Worker), to create service workers.
- [`bkpr-restart`](https://metacpan.org/dist/Beekeeper/view/bin/bkpr-restart) gracefully restarts
worker pools.
## Performance
Beekeeper is pretty lightweight for being pure Perl, but the performance depends mostly on the broker
performance, particularly on the broker introduced latency. The following are conservative performance
estimations:
- A `call_remote` synchronous call to a remote method involves 4 MQTT messages and takes 0.7 ms.
This limits a client to make a maximum of 1400 synchronous calls per second. The CPU load will be
very low, as the client spends most of the time just waiting for the response.
- A `call_remote_async` asynchronous call to a remote method also involves 4 MQTT messages, but it
can sustain a rate of 8000 calls per second because it does not block waiting for responses.
- Launching a remote task with `fire_remote` involves 1 MQTT message and takes 0.1 ms. This implies
a maximum of 10000 calls per second.
- Sending a notification with `send_notification` involves 1 MQTT message and takes 0.1 ms. A worker
can emit more than 10000 notifications per second, up to 15000 if these are smaller than 1 KiB.
- A worker processing remote calls can handle a maximum of 4000 requests per second. It will be I/O
bound, the CPU load will be low for simple tasks, as the worker will spend a significant chunk of
## Examples
This distribution includes some examples that can be run out of the box using an internal `ToyBroker`
(so no install of a proper broker is needed):
[examples/basic](./examples/basic) is a barebones example of the usage of Beekeper.
[examples/flood](./examples/flood) allows to estimate the performance of a Beekeper setup.
[examples/scraper](./examples/scraper) demonstrates asynchronous workers and clients.
[examples/websocket](./examples/websocket) uses a service from a browser using WebSockets.
([live demo](https://beekeeper.net.ar/examples/calculator.html))
[examples/chat](./examples/chat) implements a real world setup with isolated buses and redundancy.
([live demo](https://beekeeper.net.ar/examples/chat.html))
[examples/dashboard](./examples/dashboard) is an HTML dashboard for Beekeeper projects.
([live demo](https://beekeeper.net.ar/dashboard/))
bin/bkpr-top view on Meta::CPAN
Percentage of CPU load (100 indicates a full utilization of one core thread).
For example, supposing that there are 4 core threads available, a service that
shows a C<cpu> load of 200 is using half of the CPU resources.
=item load
Percentage of busy time (100% indicates that there are no idle workers).
Note that workers can have a high load with very little CPU usage when being
blocked by synchronous operations (like slow SQL queries).
For example, supposing that there are 10 workers running, a service that shows a
C<load> of 50% is working at half capacity. Spawning 10 additional workers will
lower the load to 25%.
Due to inaccuracies of measurement the actual maximum may be slightly below 100%.
=back
Unless the option C<--host> is passed the values shown are the aggregate of all
examples/scraper/README.md view on Meta::CPAN
## Scraper example
This example demonstrates asynchronous workers and clients.
To run this example start the worker pool:
```
cd beekeper/examples/scraper
source setup.sh
./run.sh
```
Then use the command line client:
```
examples/scraper/client.pl view on Meta::CPAN
if ($opt_help || $no_args) {
print $Help;
exit;
}
if (!$opt_async) {
foreach my $url (@urls) {
# Using the synchronous client: urls will be processed one after another
my $response = MyApp::Service::Scraper->get_title( $url );
if ($response->success) {
my $title = $response->result;
print qq'\n$url\n"$title"\n';
}
else {
print "\n$url\n". $response->code ." ". $response->message ."\n";
}
}
}
else {
my $cv = AnyEvent->condvar;
foreach my $url (@urls) {
# Using the asynchronous client: urls will be processed concurrently
$cv->begin;
MyApp::Service::Scraper->get_title_async( $url, sub {
my ($response) = @_;
if ($response->success) {
my $title = $response->result;
print qq'\n$url\n"$title"\n';
}
examples/scraper/lib/MyApp/Service/Scraper.pm view on Meta::CPAN
params => { url => $url },
raise_error => 0,
);
return $response;
}
sub get_title_async {
my ($self, $url, $on_complete) = @_;
# This will return immediately. The response will be received asynchronously
my $client = Beekeeper::Client->instance;
my $request = $client->call_remote_async(
method => 'myapp.scraper.get_title',
params => { url => $url },
on_success => $on_complete,
on_error => $on_complete,
);
lib/Beekeeper.pm view on Meta::CPAN
- The broker is an MQTT messaging server, like Mosquitto, HiveMQ or EMQ X.
- The messaging protocol is MQTT 5 (see the L<specification|https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html>).
- The RPC protocol is JSON-RPC 2.0 (see the L<specification|https://www.jsonrpc.org/specification>).
- There is no message persistence in the broker, it just passes on messages.
- There is no routing logic defined in the broker.
- Synchronous and asynchronous workers or clients can be integrated seamlessly.
- Efficient multicast and unicast push notifications.
- Inherent load balancing.
B<What does this framework provides:>
- L<Beekeeper::Worker>, to create service workers.
lib/Beekeeper.pm view on Meta::CPAN
- L<bkpr-restart> gracefully restarts worker pools.
=head1 Performance
Beekeeper is pretty lightweight for being pure Perl, but the performance depends mostly on the broker
performance, particularly on the broker introduced latency. The following are conservative performance
estimations:
- A C<call_remote> synchronous call to a remote method involves 4 MQTT messages and takes 0.7 ms.
This limits a client to make a maximum of 1400 synchronous calls per second. The CPU load will be
very low, as the client spends most of the time just waiting for the response.
- A C<call_remote_async> asynchronous call to a remote method also involves 4 MQTT messages, but it
can sustain a rate of 8000 calls per second because it does not block waiting for responses.
- Launching a remote task with C<fire_remote> involves 1 MQTT message and takes 0.1 ms. This implies
a maximum of 10000 calls per second.
- Sending a notification with C<send_notification> involves 1 MQTT message and takes 0.1 ms. A worker
can emit more than 10000 notifications per second, up to 15000 if these are smaller than 1 KiB.
- A worker processing remote calls can handle a maximum of 4000 requests per second. It will be I/O
bound, the CPU load will be low for simple tasks, as the worker will spend a significant chunk of
lib/Beekeeper.pm view on Meta::CPAN
=head1 Examples
This distribution includes some examples that can be run out of the box using an internal C<ToyBroker>
(so no install of a proper broker is needed):
C<examples/basic> is a barebones example of the usage of Beekeper.
C<examples/flood> allows to estimate the performance of a Beekeper setup.
C<examples/scraper> demonstrates asynchronous workers and clients.
C<examples/websocket> uses a service from a browser using WebSockets.
C<examples/chat> implements a real world setup with isolated buses and redundancy.
C<examples/dashboard> is an HTML dashboard for Beekeeper projects.
=head1 SEE ALSO
lib/Beekeeper/Client.pm view on Meta::CPAN
Please note that callbacks will not be executed timely if AnyEvent loop is not running.
=head3 stop_accepting_notifications ( $method, ... )
Makes a client stop accepting the specified notifications from the message bus.
C<$method> must be one of the strings used previously in C<accept_notifications>.
=head3 call_remote ( %args )
Makes a synchronous RPC call to a service worker through the message bus.
It will wait (in the event loop) until a response is received, wich will be either
a L<Beekeeper::JSONRPC::Response> object or a L<Beekeeper::JSONRPC::Error>.
On error it will die unless C<raise_error> option is set to false.
This method accepts the following parameters:
=over
lib/Beekeeper/Client.pm view on Meta::CPAN
=item raise_error
If set to true (the default) dies with the received error message when a call returns
an error response. If set to false returns a L<Beekeeper::JSONRPC::Error> instead.
=back
=head3 call_remote_async ( %args )
Makes an asynchronous RPC call to a service worker through the message bus.
It returns immediately a L<Beekeeper::JSONRPC::Request> object which, once completed,
will have a defined C<response>.
This method accepts parameters C<method>, C<params>, C<address> and C<timeout>
the same as C<call_remote>. Additionally two callbacks can be specified:
=over
=item on_success
lib/Beekeeper/MQTT.pm view on Meta::CPAN
1;
__END__
=pod
=encoding utf8
=head1 NAME
Beekeeper::MQTT - Asynchronous MQTT 5.0 client
=head1 VERSION
Version 0.09
=head1 SYNOPSIS
my $mqtt = Beekeeper::MQTT->new(
host => 'localhost',
username => 'guest',
lib/Beekeeper/Service/LogTail.pm view on Meta::CPAN
C<pool>: Regex that applies to worker pool.
C<service>: Regex that applies to service name.
C<message>: Regex that applies to error messages.
C<after>: Return only entries generated after given timestamp.
=head3 tail_async ( %filters, on_success => $cb, on_error => $cb )
Asynchronous version of C<tail> method.
Callbacks C<on_success> and C<on_error> must be coderefs and will receive respectively
L<Beekeeper::JSONRPC::Response> and L<Beekeeper::JSONRPC::Error> objects as arguments.
=head1 SEE ALSO
L<bkpr-log>, L<Beekeeper::Service::LogTail::Worker>.
=head1 AUTHOR
lib/Beekeeper/Service/Supervisor.pm view on Meta::CPAN
=item cpu
Percentage of CPU load (100 indicates a full utilization of one core thread).
=item load
Percentage of busy time (100 indicates no idle time).
Note that workers can have a high load with very little CPU usage when being
blocked by synchronous operations (like slow SQL queries, for example).
Due to inaccuracies of measurement the actual maximum may be slightly below 100.
=back
=head1 METHODS
=head3 get_services_status ( %filters )
Returns the aggregate performance metrics of all active services.
lib/Beekeeper/Service/Supervisor.pm view on Meta::CPAN
Services can be filtered by C<host>, C<pool> and C<class>.
=head3 get_workers_status ( %filters )
Returns the individual performance metrics of every worker of all active services.
Services can be filtered by C<host>, C<pool> and C<class>.
=head3 get_services_status_async ( %filters, on_success => $cb, on_error => $cb )
Asynchronous version of C<get_services_status> method.
Callbacks C<on_success> and C<on_error> must be coderefs and will receive respectively
L<Beekeeper::JSONRPC::Response> and L<Beekeeper::JSONRPC::Error> objects as arguments.
=head3 get_workers_status_async ( %filters, on_success => $cb, on_error => $cb )
Asynchronous version of C<get_workers_status> method.
=head1 SEE ALSO
L<bkpr-top>, L<bkpr-restart>, L<Beekeeper::Service::Supervisor::Worker>.
=head1 AUTHOR
José Micó, C<jose.mico@gmail.com>
=head1 COPYRIGHT AND LICENSE
lib/Beekeeper/Service/Supervisor/Worker.pm view on Meta::CPAN
=item cpu
Percentage of CPU load (100 indicates a full utilization of one core thread).
=item load
Percentage of busy time (100 indicates no idle time).
Note that workers can have a high load with very little CPU usage when being
blocked by synchronous operations (like slow SQL queries, for example).
Due to inaccuracies of measurement the actual maximum may be slightly below 100.
=back
=head1 SEE ALSO
L<Beekeeper::Service::Supervisor>, which is the interface to the RPC methods
exposed by this worker class.
lib/Beekeeper/Worker.pm view on Meta::CPAN
else {
# Should not happen (clients must publish with QoS 1)
log_warn "Request published with QoS 0 to topic " . $mqtt_properties->{'topic'};
}
$self->{_BUS}->flush_buffer( buffer_id => 'response' );
}
else {
# Acknowledge requests that doesn't send a response right now (fire & forget calls
# and requests handled asynchronously), signaling the broker to send more requests
$self->{_BUS}->puback(
packet_id => $mqtt_properties->{'packet_id'},
);
}
}
redo DRAIN if (@{$worker->{task_queue_high}} || @{$worker->{task_queue_low}});
# Execute tasks postponed until task queue is empty
lib/Beekeeper/Worker.pm view on Meta::CPAN
after => $UNSUBSCRIBE_LINGER, cb => sub {
delete $worker->{callbacks}->{$_} foreach @cb_keys;
delete $worker->{subscriptions}->{$service};
undef $unsub_tmr;
return unless $worker->{shutting_down};
if ($worker->{in_progress} > 0) {
# The task queue is empty now, but an asynchronous method handler is
# still busy processing some requests received previously. Wait for
# these requests to be completed before telling _work_forever to stop
my $wait_time = 60;
$worker->{stop_cv}->begin;
my $busy_tmr; $busy_tmr = AnyEvent->timer( after => 1, interval => 1, cb => sub {
unless ($worker->{in_progress} > 0 && --$wait_time > 0) {
undef $busy_tmr;
$worker->{stop_cv}->end;
lib/Beekeeper/Worker.pm view on Meta::CPAN
my $t; $t = AnyEvent->timer( after => 1, cb => sub {
undef $t;
$req->send_response( $number + 1 );
});
}
Note that callback closures will not be executed in Beekeeper scope but in the event loop
one, so uncatched exceptions in these closures will cause the worker to die and be respawn.
Asynchronous method handlers use system resources more efficiently, but are significantly
harder to write and debug.
=head3 stop_accepting_notifications ( $method, ... )
Makes a worker stop accepting the specified notifications from the message bus.
C<$method> must be one of the strings used previously in C<accept_notifications>.
=head3 stop_accepting_calls ( $method, ... )
lib/Beekeeper/Worker/Extension/RemoteSession.pm view on Meta::CPAN
=head3 unbind_remote_address ( address => $address )
Clear the authorization data and address assignment of all remote clients which were
assigned the given address.
This is intended to implement "logout from all devices" functionality.
=head3 bind_remote_session_async ( address => $address, on_success => $cb, on_error => $cb )
Asynchronous version of C<bind_remote_session> method.
Callbacks C<on_success> and C<on_error> must be coderefs and will receive respectively
L<Beekeeper::JSONRPC::Response> and L<Beekeeper::JSONRPC::Error> objects as arguments.
=head3 unbind_remote_session_async ( on_success => $cb, on_error => $cb )
Asynchronous version of C<unbind_remote_session> method.
=head3 unbind_remote_address_async ( address => $address, on_success => $cb, on_error => $cb )
Asynchronous version of C<unbind_remote_address> method.
=head1 SEE ALSO
L<Beekeeper::Service::Router::Worker>
=head1 AUTHOR
José Micó, C<jose.mico@gmail.com>
=head1 COPYRIGHT AND LICENSE
t/lib/Tests/Recursion.pm view on Meta::CPAN
for (my $i = 3; $i <= 4; $i++) {
my $resp = $cli->call_remote(
method => 'test.fib2',
params => $i,
);
is( $resp->result, $fib[$i], "fib($i)");
}
# Asynchronous workers are only limited by available memory
for (my $i = 3; $i <= 10; $i++) {
my $resp = $cli->call_remote(
method => 'test.fib3',
params => $i,
);
is( $resp->result, $fib[$i], "fib($i)");
}
# Asynchronous workers are only limited by available memory
for (my $i = 3; $i <= 10; $i++) {
my $resp = $cli->call_remote(
method => 'test.fib4',
params => $i,
);
is( $resp->result, $fib[$i], "fib($i)");
}
}