Beekeeper

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

  - Add instructions for running pools as systemd services.
  - Update install instructions for Mosquitto broker.
  - Honor PATH and PERL5LIB environment variables.
  - Add links to live examples.

0.08    2021-07-16
  - Introduce HTML dashboard.
  - Reorganize worker extensions.

0.07    2021-07-02
  - Add support for asynchronous method handlers in workers.
  - Introduce scraper example demonstrating asynchronous workers.
  - Track and report the number of errors per second of workers.
  - Revised documentation and performance estimation.
  - Fix "Insecure dependency in connect" error.

0.06    2021-06-18
  - Show errors from perspective of caller.
  - Fix example configuration for Mosquitto broker.

0.05    2021-06-09
  - Fix tests failing on smoke testers.

README.md  view on Meta::CPAN

- The broker is an MQTT messaging server, like Mosquitto, HiveMQ or EMQ X.

- The messaging protocol is MQTT 5 (see the [specification](https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html)).

- The RPC protocol is JSON-RPC 2.0 (see the [specification](https://www.jsonrpc.org/specification)).

- There is no message persistence in the broker, it just passes on messages.

- There is no routing logic defined in the broker.

- Synchronous and asynchronous workers or clients can be integrated seamlessly.

- Efficient multicast and unicast push notifications.

- Inherent load balancing.


**What does this framework provides:**

- [`Beekeeper::Worker`](https://metacpan.org/pod/Beekeeper::Worker), to create service workers.

README.md  view on Meta::CPAN

- [`bkpr-restart`](https://metacpan.org/dist/Beekeeper/view/bin/bkpr-restart) gracefully restarts 
  worker pools.


## Performance

Beekeeper is pretty lightweight for being pure Perl, but the performance depends mostly on the broker
performance, particularly on the broker introduced latency. The following are conservative performance
estimations:

- A `call_remote` synchronous call to a remote method involves 4 MQTT messages and takes 0.7 ms.
  This limits a client to make a maximum of 1400 synchronous calls per second. The CPU load will be
  very low, as the client spends most of the time just waiting for the response.

- A `call_remote_async` asynchronous call to a remote method also involves 4 MQTT messages, but it
  can sustain a rate of 8000 calls per second because it does not block waiting for responses.

- Launching a remote task with `fire_remote` involves 1 MQTT message and takes 0.1 ms. This implies
  a maximum of 10000 calls per second.

- Sending a notification with `send_notification` involves 1 MQTT message and takes 0.1 ms. A worker
  can emit more than 10000 notifications per second, up to 15000 if these are smaller than 1 KiB.

- A worker processing remote calls can handle a maximum of 4000 requests per second. It will be I/O
  bound, the CPU load will be low for simple tasks, as the worker will spend a significant chunk of

README.md  view on Meta::CPAN


## Examples

This distribution includes some examples that can be run out of the box using an internal `ToyBroker`
(so no install of a proper broker is needed):

[examples/basic](./examples/basic) is a barebones example of the usage of Beekeper.

[examples/flood](./examples/flood) allows to estimate the performance of a Beekeper setup.

[examples/scraper](./examples/scraper) demonstrates asynchronous workers and clients.

[examples/websocket](./examples/websocket) uses a service from a browser using WebSockets. 
([live demo](https://beekeeper.net.ar/examples/calculator.html))

[examples/chat](./examples/chat) implements a real world setup with isolated buses and redundancy. 
([live demo](https://beekeeper.net.ar/examples/chat.html))

[examples/dashboard](./examples/dashboard) is an HTML dashboard for Beekeeper projects. 
([live demo](https://beekeeper.net.ar/dashboard/))

bin/bkpr-top  view on Meta::CPAN

Percentage of CPU load (100 indicates a full utilization of one core thread).

For example, supposing that there are 4 core threads available, a service that
shows a C<cpu> load of 200 is using half of the CPU resources.

=item load

Percentage of busy time (100% indicates that there are no idle workers).

Note that workers can have a high load with very little CPU usage when being
blocked by synchronous operations (like slow SQL queries).

For example, supposing that there are 10 workers running, a service that shows a 
C<load> of 50% is working at half capacity. Spawning 10 additional workers will 
lower the load to 25%.

Due to inaccuracies of measurement the actual maximum may be slightly below 100%.

=back

Unless the option C<--host> is passed the values shown are the aggregate of all

examples/scraper/README.md  view on Meta::CPAN

## Scraper example

This example demonstrates asynchronous workers and clients. 


To run this example start the worker pool:
```
cd beekeper/examples/scraper
source setup.sh
./run.sh
```
Then use the command line client:
```

examples/scraper/client.pl  view on Meta::CPAN


if ($opt_help || $no_args) {
    print $Help;
    exit;
}

if (!$opt_async) {

    foreach my $url (@urls) {

        # Using the synchronous client: urls will be processed one after another

        my $response = MyApp::Service::Scraper->get_title( $url );

        if ($response->success) {
            my $title = $response->result;
            print qq'\n$url\n"$title"\n';
        }
        else {
            print "\n$url\n". $response->code ." ". $response->message ."\n";
        }
    }
}
else {

    my $cv = AnyEvent->condvar;

    foreach my $url (@urls) {

        # Using the asynchronous client: urls will be processed concurrently

        $cv->begin;

        MyApp::Service::Scraper->get_title_async( $url, sub {
            my ($response) = @_;

            if ($response->success) {
                my $title = $response->result;
                print qq'\n$url\n"$title"\n';
            }

examples/scraper/lib/MyApp/Service/Scraper.pm  view on Meta::CPAN

        params      => { url => $url },
        raise_error => 0,
    );

    return $response;
}

sub get_title_async {
    my ($self, $url, $on_complete) = @_;

    # This will return immediately. The response will be received asynchronously

    my $client = Beekeeper::Client->instance;

    my $request = $client->call_remote_async(
        method     => 'myapp.scraper.get_title',
        params     => { url => $url },
        on_success => $on_complete,
        on_error   => $on_complete,
    );

lib/Beekeeper.pm  view on Meta::CPAN

- The broker is an MQTT messaging server, like Mosquitto, HiveMQ or EMQ X.

- The messaging protocol is MQTT 5 (see the L<specification|https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html>).

- The RPC protocol is JSON-RPC 2.0 (see the L<specification|https://www.jsonrpc.org/specification>).

- There is no message persistence in the broker, it just passes on messages.

- There is no routing logic defined in the broker.

- Synchronous and asynchronous workers or clients can be integrated seamlessly.

- Efficient multicast and unicast push notifications.

- Inherent load balancing.


B<What does this framework provides:>

- L<Beekeeper::Worker>, to create service workers.

lib/Beekeeper.pm  view on Meta::CPAN


- L<bkpr-restart> gracefully restarts worker pools.


=head1 Performance

Beekeeper is pretty lightweight for being pure Perl, but the performance depends mostly on the broker
performance, particularly on the broker introduced latency. The following are conservative performance
estimations:

- A C<call_remote> synchronous call to a remote method involves 4 MQTT messages and takes 0.7 ms.
  This limits a client to make a maximum of 1400 synchronous calls per second. The CPU load will be
  very low, as the client spends most of the time just waiting for the response.

- A C<call_remote_async> asynchronous call to a remote method also involves 4 MQTT messages, but it
  can sustain a rate of 8000 calls per second because it does not block waiting for responses.

- Launching a remote task with C<fire_remote> involves 1 MQTT message and takes 0.1 ms. This implies
  a maximum of 10000 calls per second.

- Sending a notification with C<send_notification> involves 1 MQTT message and takes 0.1 ms. A worker
  can emit more than 10000 notifications per second, up to 15000 if these are smaller than 1 KiB.

- A worker processing remote calls can handle a maximum of 4000 requests per second. It will be I/O
  bound, the CPU load will be low for simple tasks, as the worker will spend a significant chunk of

lib/Beekeeper.pm  view on Meta::CPAN


=head1 Examples

This distribution includes some examples that can be run out of the box using an internal C<ToyBroker>
(so no install of a proper broker is needed):

C<examples/basic> is a barebones example of the usage of Beekeper.

C<examples/flood> allows to estimate the performance of a Beekeper setup.

C<examples/scraper> demonstrates asynchronous workers and clients.

C<examples/websocket> uses a service from a browser using WebSockets.

C<examples/chat> implements a real world setup with isolated buses and redundancy.

C<examples/dashboard> is an HTML dashboard for Beekeeper projects.


=head1 SEE ALSO

lib/Beekeeper/Client.pm  view on Meta::CPAN

Please note that callbacks will not be executed timely if AnyEvent loop is not running.

=head3 stop_accepting_notifications ( $method, ... )

Makes a client stop accepting the specified notifications from the message bus.

C<$method> must be one of the strings used previously in C<accept_notifications>.

=head3 call_remote ( %args )

Makes a synchronous RPC call to a service worker through the message bus.

It will wait (in the event loop) until a response is received, wich will be either
a L<Beekeeper::JSONRPC::Response> object or a L<Beekeeper::JSONRPC::Error>.

On error it will die unless C<raise_error> option is set to false.

This method accepts the following parameters:

=over

lib/Beekeeper/Client.pm  view on Meta::CPAN


=item raise_error

If set to true (the default) dies with the received error message when a call returns
an error response. If set to false returns a L<Beekeeper::JSONRPC::Error> instead.

=back

=head3 call_remote_async ( %args )

Makes an asynchronous RPC call to a service worker through the message bus.

It returns immediately a L<Beekeeper::JSONRPC::Request> object which, once completed,
will have a defined C<response>.

This method  accepts parameters C<method>, C<params>, C<address> and C<timeout> 
the same as C<call_remote>. Additionally two callbacks can be specified:

=over

=item on_success

lib/Beekeeper/MQTT.pm  view on Meta::CPAN

1;

__END__

=pod

=encoding utf8

=head1 NAME
 
Beekeeper::MQTT - Asynchronous MQTT 5.0 client
 
=head1 VERSION
 
Version 0.09

=head1 SYNOPSIS

  my $mqtt = Beekeeper::MQTT->new(
      host     => 'localhost',
      username => 'guest',

lib/Beekeeper/Service/LogTail.pm  view on Meta::CPAN

C<pool>: Regex that applies to worker pool.

C<service>: Regex that applies to service name.

C<message>: Regex that applies to error messages.

C<after>: Return only entries generated after given timestamp.

=head3 tail_async ( %filters, on_success => $cb, on_error => $cb  )

Asynchronous version of C<tail> method.

Callbacks C<on_success> and C<on_error> must be coderefs and will receive respectively 
L<Beekeeper::JSONRPC::Response> and L<Beekeeper::JSONRPC::Error> objects as arguments.

=head1 SEE ALSO

L<bkpr-log>, L<Beekeeper::Service::LogTail::Worker>.

=head1 AUTHOR

lib/Beekeeper/Service/Supervisor.pm  view on Meta::CPAN


=item cpu

Percentage of CPU load (100 indicates a full utilization of one core thread).

=item load

Percentage of busy time (100 indicates no idle time).

Note that workers can have a high load with very little CPU usage when being
blocked by synchronous operations (like slow SQL queries, for example).

Due to inaccuracies of measurement the actual maximum may be slightly below 100.

=back

=head1 METHODS

=head3 get_services_status ( %filters )

Returns the aggregate performance metrics of all active services.

lib/Beekeeper/Service/Supervisor.pm  view on Meta::CPAN

Services can be filtered by C<host>, C<pool> and  C<class>.

=head3 get_workers_status ( %filters )

Returns the individual performance metrics of every worker of all active services.

Services can be filtered by C<host>, C<pool> and  C<class>.

=head3 get_services_status_async ( %filters, on_success => $cb, on_error => $cb )

Asynchronous version of C<get_services_status> method.

Callbacks C<on_success> and C<on_error> must be coderefs and will receive respectively 
L<Beekeeper::JSONRPC::Response> and L<Beekeeper::JSONRPC::Error> objects as arguments.

=head3 get_workers_status_async ( %filters, on_success => $cb, on_error => $cb )

Asynchronous version of C<get_workers_status> method.

=head1 SEE ALSO
 
L<bkpr-top>, L<bkpr-restart>, L<Beekeeper::Service::Supervisor::Worker>.

=head1 AUTHOR

José Micó, C<jose.mico@gmail.com>

=head1 COPYRIGHT AND LICENSE

lib/Beekeeper/Service/Supervisor/Worker.pm  view on Meta::CPAN


=item cpu

Percentage of CPU load (100 indicates a full utilization of one core thread).

=item load

Percentage of busy time (100 indicates no idle time).

Note that workers can have a high load with very little CPU usage when being
blocked by synchronous operations (like slow SQL queries, for example).

Due to inaccuracies of measurement the actual maximum may be slightly below 100.

=back

=head1 SEE ALSO

L<Beekeeper::Service::Supervisor>, which is the interface to the RPC methods
exposed by this worker class.

lib/Beekeeper/Worker.pm  view on Meta::CPAN

                else {
                    # Should not happen (clients must publish with QoS 1)
                    log_warn "Request published with QoS 0 to topic " . $mqtt_properties->{'topic'};
                }

                $self->{_BUS}->flush_buffer( buffer_id => 'response' );
            }
            else {

                # Acknowledge requests that doesn't send a response right now (fire & forget calls
                # and requests handled asynchronously), signaling the broker to send more requests

                $self->{_BUS}->puback(
                    packet_id => $mqtt_properties->{'packet_id'},
                );
            }
        }

        redo DRAIN if (@{$worker->{task_queue_high}} || @{$worker->{task_queue_low}});

        # Execute tasks postponed until task queue is empty

lib/Beekeeper/Worker.pm  view on Meta::CPAN

                after => $UNSUBSCRIBE_LINGER, cb => sub {

                    delete $worker->{callbacks}->{$_} foreach @cb_keys;
                    delete $worker->{subscriptions}->{$service};
                    undef $unsub_tmr;

                    return unless $worker->{shutting_down};

                    if ($worker->{in_progress} > 0) {

                        # The task queue is empty now, but an asynchronous method handler is
                        # still busy processing some requests received previously. Wait for
                        # these requests to be completed before telling _work_forever to stop

                        my $wait_time = 60;
                        $worker->{stop_cv}->begin;

                        my $busy_tmr; $busy_tmr = AnyEvent->timer( after => 1, interval => 1, cb => sub {
                            unless ($worker->{in_progress} > 0 && --$wait_time > 0) {
                                undef $busy_tmr;
                                $worker->{stop_cv}->end;

lib/Beekeeper/Worker.pm  view on Meta::CPAN

  
      my $t; $t = AnyEvent->timer( after => 1, cb => sub {
          undef $t;
          $req->send_response( $number + 1 );
      });
  }

Note that callback closures will not be executed in Beekeeper scope but in the event loop
one, so uncatched exceptions in these closures will cause the worker to die and be respawn.

Asynchronous method handlers use system resources more efficiently, but are significantly 
harder to write and debug.

=head3 stop_accepting_notifications ( $method, ... )

Makes a worker stop accepting the specified notifications from the message bus.

C<$method> must be one of the strings used previously in C<accept_notifications>.

=head3 stop_accepting_calls ( $method, ... )

lib/Beekeeper/Worker/Extension/RemoteSession.pm  view on Meta::CPAN


=head3 unbind_remote_address ( address => $address )

Clear the authorization data and address assignment of all remote clients which were
assigned the given address.

This is intended to implement "logout from all devices" functionality.

=head3 bind_remote_session_async ( address => $address, on_success => $cb, on_error => $cb )

Asynchronous version of C<bind_remote_session> method.

Callbacks C<on_success> and C<on_error> must be coderefs and will receive respectively 
L<Beekeeper::JSONRPC::Response> and L<Beekeeper::JSONRPC::Error> objects as arguments.

=head3 unbind_remote_session_async ( on_success => $cb, on_error => $cb )

Asynchronous version of C<unbind_remote_session> method.

=head3 unbind_remote_address_async ( address => $address, on_success => $cb, on_error => $cb )

Asynchronous version of C<unbind_remote_address> method.

=head1 SEE ALSO
 
L<Beekeeper::Service::Router::Worker>

=head1 AUTHOR

José Micó, C<jose.mico@gmail.com>

=head1 COPYRIGHT AND LICENSE

t/lib/Tests/Recursion.pm  view on Meta::CPAN

    for (my $i = 3; $i <= 4; $i++) {

        my $resp = $cli->call_remote(
            method  => 'test.fib2',
            params  => $i,
        );

        is( $resp->result, $fib[$i], "fib($i)");
    }

    # Asynchronous workers are only limited by available memory
    for (my $i = 3; $i <= 10; $i++) {

        my $resp = $cli->call_remote(
            method  => 'test.fib3',
            params  => $i,
        );

        is( $resp->result, $fib[$i], "fib($i)");
    }

    # Asynchronous workers are only limited by available memory
    for (my $i = 3; $i <= 10; $i++) {

        my $resp = $cli->call_remote(
            method  => 'test.fib4',
            params  => $i,
        );

        is( $resp->result, $fib[$i], "fib($i)");
    }
}



( run in 0.614 second using v1.01-cache-2.11-cpan-0d8aa00de5b )