Beekeeper

 view release on metacpan or  search on metacpan

lib/Beekeeper/AnyEvent.pm  view on Meta::CPAN


our $VERSION = '0.10';

use AnyEvent;
use AnyEvent::Socket;


USE_PERL_BACKEND: {

    # Prefer AnyEvent perl backend over default EV, as it is fast enough
    # and it does not ignore exceptions thrown from within callbacks

    $ENV{'PERL_ANYEVENT_MODEL'} ||= 'Perl' unless $AnyEvent::MODEL;
}

UNTAINT_IP_ADDR: {

    no strict 'refs';
    no warnings 'redefine';

    # Addresses resolved by AnyEvent::DNS are tainted, causing an "Insecure

lib/Beekeeper/AnyEvent.pm  view on Meta::CPAN


=head1 DESCRIPTION

This module alters the default behavior of AnyEvent as follows:

=over

=item *

Prefer the pure perl backend over default EV, as it is fast enough and
it does not ignore exceptions thrown from within callbacks.

=item *

Addresses resolved by AnyEvent::DNS are tainted, causing an "Insecure
dependency in connect" error as Beekeeper runs with taint mode enabled.
This module untaints resolved addresses, which can be done safely because
AnyEvent validates these addresses properly before using them.

=back

lib/Beekeeper/Client.pm  view on Meta::CPAN

        forward_to     => undef,
        response_topic => undef,
        in_progress    => undef,
        curr_request   => undef,
        caller_id      => undef,
        caller_addr    => undef,
        auth_data      => undef,
        auth_salt      => undef,
        async_cv       => undef,
        correlation_id => 1,
        callbacks      => {},
    };

    unless (exists $args{'username'} && exists $args{'password'}) {

        # Get broker connection parameters from config file

        my $bus_id = $args{'bus_id'};

        if (defined $bus_id) {
            # Use parameters for specific bus

lib/Beekeeper/Client.pm  view on Meta::CPAN

    $self->{_BUS}->publish( payload => \$json, %send_args );
}


sub accept_notifications {
    my ($self, %args) = @_;

    my ($file, $line) = (caller)[1,2];
    my $at = "at $file line $line\n";

    my $callbacks = $self->{_CLIENT}->{callbacks};

    foreach my $fq_meth (keys %args) {

        $fq_meth =~ m/^  ( [\w-]+ (?: \.[\w-]+ )* ) 
                      \. ( [\w-]+ | \* ) $/x or croak "Invalid notification method '$fq_meth'";

        my ($service, $method) = ($1, $2);

        my $callback = $args{$fq_meth};

        unless (ref $callback eq 'CODE') {
            croak "Invalid callback for '$fq_meth'";
        }

        croak "Already accepting notifications '$fq_meth'" if exists $callbacks->{"msg.$fq_meth"};
        $callbacks->{"msg.$fq_meth"} = $callback;

        #TODO: Allow to accept private notifications without subscribing

        my $local_bus = $self->{_BUS}->{bus_role};

        my $topic = "msg/$local_bus/$service/$method";
        $topic =~ tr|.*|/#|;

        $self->{_BUS}->subscribe(
            topic      => $topic,

lib/Beekeeper/Client.pm  view on Meta::CPAN

                bless $request, 'Beekeeper::JSONRPC::Notification';
                $request->{_mqtt_properties} = $mqtt_properties;

                my $method = $request->{method};

                unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
                    warn "Received notification with invalid method '$method' $at";
                    return;
                }

                my $cb = $callbacks->{"msg.$1.$2"} || 
                         $callbacks->{"msg.$1.*"};

                unless ($cb) {
                    warn "No callback found for received notification '$method' $at";
                    return;
                }

                $cb->($request->{params}, $request);
            },
            on_suback => sub {
                my ($success, $prop) = @_;

lib/Beekeeper/Client.pm  view on Meta::CPAN


    croak "No method specified" unless @methods;

    foreach my $fq_meth (@methods) {

        $fq_meth =~ m/^  ( [\w-]+ (?: \.[\w-]+ )* ) 
                      \. ( [\w-]+ | \* ) $/x or croak "Invalid method '$fq_meth'";

        my ($service, $method) = ($1, $2);

        unless (defined $self->{_CLIENT}->{callbacks}->{"msg.$fq_meth"}) {
            carp "Not previously accepting notifications '$fq_meth'";
            next;
        }

        my $local_bus = $self->{_BUS}->{bus_role};

        my $topic = "msg/$local_bus/$service/$method";
        $topic =~ tr|.*|/#|;

        $self->{_BUS}->unsubscribe(
            topic       => $topic,
            on_unsuback => sub {
                my ($success, $prop) = @_;

                die "Could not unsubscribe from topic '$topic' $at" unless $success; 

                delete $self->{_CLIENT}->{callbacks}->{"msg.$fq_meth"};
            },
        );
    }
}


our $AE_WAITING;

sub call_remote {
    my $self = shift;

lib/Beekeeper/Client.pm  view on Meta::CPAN

                bless $resp, 'Beekeeper::JSONRPC::Notification';
                $resp->{_headers} = $mqtt_properties;

                my $method = $resp->{method};

                unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
                    warn "Received notification with invalid method '$method' $at";
                    return;
                }

                my $cb = $client->{callbacks}->{"msg.$1.$2"} || 
                         $client->{callbacks}->{"msg.$1.*"};

                unless ($cb) {
                    warn "No callback found for received notification '$method' $at";
                    return;
                }

                $cb->($resp->{params}, $resp);
            }
        },
        on_suback => sub {

lib/Beekeeper/Client.pm  view on Meta::CPAN


Makes a client start accepting the specified notifications from the message bus.

C<$method> is a string with the format C<{service_class}.{method}>. A default
or fallback handler can be specified using a wildcard like C<{service_class}.*>.

C<$callback> is a coderef that will be called when a notification is received.
When executed, the callback will receive a parameter C<$params> which contains
the notification value or data structure sent.

Please note that callbacks will not be executed timely if AnyEvent loop is not running.

=head3 stop_accepting_notifications ( $method, ... )

Makes a client stop accepting the specified notifications from the message bus.

C<$method> must be one of the strings used previously in C<accept_notifications>.

=head3 call_remote ( %args )

Makes a synchronous RPC call to a service worker through the message bus.

lib/Beekeeper/Client.pm  view on Meta::CPAN

=back

=head3 call_remote_async ( %args )

Makes an asynchronous RPC call to a service worker through the message bus.

It returns immediately a L<Beekeeper::JSONRPC::Request> object which, once completed,
will have a defined C<response>.

This method  accepts parameters C<method>, C<params>, C<address> and C<timeout> 
the same as C<call_remote>. Additionally two callbacks can be specified:

=over

=item on_success

Callback which will be executed after receiving a successful response with a
L<Beekeeper::JSONRPC::Response> object as parameter. Must be a coderef.

=item on_error

lib/Beekeeper/MQTT.pm  view on Meta::CPAN

        connect_err     => undef,    # count of connection errors
        timeout_tmr     => undef,    # timer used for connection timeout
        reconnect_tmr   => undef,    # timer used for connection retry
        connack_cb      => undef,    # connack callback
        error_cb        => undef,    # error callback
        client_id       => undef,    # client id
        server_prop     => {},       # server properties
        server_alias    => {},       # server topic aliases
        client_alias    => {},       # client topic aliases
        subscriptions   => {},       # topic subscriptions
        subscr_cb       => {},       # subscription callbacks
        packet_cb       => {},       # packet callbacks 
        buffers         => {},       # raw mqtt buffers
        packet_seq      => 1,        # sequence used for packet ids
        subscr_seq      => 1,        # sequence used for subscription ids
        alias_seq       => 1,        # sequence used for topic alias ids
        use_alias       => 0,        # topic alias enabled
        config          => \%args,
    };

    $self->{bus_id}   = delete $args{'bus_id'};
    $self->{bus_role} = delete $args{'bus_role'} || $self->{bus_id};

lib/Beekeeper/MQTT.pm  view on Meta::CPAN


    # Trim variable header from packet, the remaining is the payload
    substr($$packet, 0, $prop_end, '');

    if ($prop{'payload_format'}) {
        # Payload is UTF-8 Encoded Character Data
        utf8::decode( $$packet );
    }

    foreach (@subscr_ids) {
        # Execute subscriptions callbacks

        $self->{subscr_cb}->{$_}->($packet, \%prop);
    }
}


sub puback {
    my ($self, %args) = @_;

    croak "Missing packet_id" unless $args{'packet_id'};

lib/Beekeeper/MQTT.pm  view on Meta::CPAN

}

sub discard_buffer {
    my ($self, %args) = @_;

    my $buffer = delete $self->{buffers}->{$args{'buffer_id'}};

    # Nothing to do if nothing was buffered
    return unless $buffer;

    # Remove all pending puback callbacks, as those will never be executed
    foreach my $packet_id (keys %{$buffer->{packet_ids}}) {
        delete $self->{packet_cb}->{$packet_id};
    }

    1;
}


sub DESTROY {
    my $self = shift;

lib/Beekeeper/Worker.pm  view on Meta::CPAN

        parent_pid      => $args{'parent_pid'},
        foreground      => $args{'foreground'},   # --foreground option
        debug           => $args{'debug'},        # --debug option
        bus_config      => $args{'bus_config'},   # content of bus.config.json
        pool_config     => $args{'pool_config'},  # content of pool.config.json
        pool_id         => $args{'pool_id'},
        bus_id          => $args{'bus_id'},
        config          => $args{'config'},
        hostname        => Sys::Hostname::hostname(),
        stop_cv         => undef,
        callbacks       => {},
        task_queue_high => [],
        task_queue_low  => [],
        queued_tasks    => 0,
        in_progress     => 0,
        last_report     => 0,
        call_count      => 0,
        notif_count     => 0,
        error_count     => 0,
        busy_time       => 0,
    };

lib/Beekeeper/Worker.pm  view on Meta::CPAN

    my $class = ref $_[0];
    log_fatal "Worker class $class doesn't define authorize_request() method";
    return undef; # do not authorize
}


sub accept_notifications {
    my ($self, %args) = @_;

    my $worker    = $self->{_WORKER};
    my $callbacks = $worker->{callbacks};

    my ($file, $line) = (caller)[1,2];
    my $at = "at $file line $line\n";

    foreach my $fq_meth (keys %args) {

        $fq_meth =~ m/^  ( [\w-]+ (?: \.[\w-]+ )* ) 
                      \. ( [\w-]+ | \* ) $/x or die "Invalid notification method '$fq_meth' $at";

        my ($service, $method) = ($1, $2);

        my $callback = $self->__get_cb_coderef($fq_meth, $args{$fq_meth});

        die "Already accepting notifications '$fq_meth' $at" if exists $callbacks->{"msg.$fq_meth"};
        $callbacks->{"msg.$fq_meth"} = $callback;

        my $local_bus = $self->{_BUS}->{bus_role};

        my $topic = "msg/$local_bus/$service/$method";
        $topic =~ tr|.*|/#|;

        $self->{_BUS}->subscribe(
            topic      => $topic,
            on_publish => sub {
                # ($payload_ref, $properties) = @_;

lib/Beekeeper/Worker.pm  view on Meta::CPAN

        my $at = "at $file line $line\n";
        die "Invalid handler '$callback' for '$method' $at";
    }
}


sub accept_remote_calls {
    my ($self, %args) = @_;

    my $worker = $self->{_WORKER};
    my $callbacks = $worker->{callbacks};
    my %subscribed_to;

    my ($file, $line) = (caller)[1,2];
    my $at = "at $file line $line\n";

    foreach my $fq_meth (keys %args) {

        $fq_meth =~ m/^  ( [\w-]+ (?: \.[\w-]+ )* ) 
                      \. ( [\w-]+ | \* ) $/x or die "Invalid remote call method '$fq_meth' $at";

        my ($service, $method) = ($1, $2);

        my $callback = $self->__get_cb_coderef($fq_meth, $args{$fq_meth});

        die "Already accepting remote calls '$fq_meth' $at" if exists $callbacks->{"req.$fq_meth"};
        $callbacks->{"req.$fq_meth"} = $callback;

        next if $subscribed_to{$service};
        $subscribed_to{$service} = 1;

        if (keys %subscribed_to == 2) {
            log_warn "Running multiple services within a single worker hurts load balancing $at";
        }

        my $local_bus = $self->{_BUS}->{bus_role};

lib/Beekeeper/Worker.pm  view on Meta::CPAN

                bless $request, 'Beekeeper::JSONRPC::Notification';
                $request->{_mqtt_properties} = $mqtt_properties;

                my $method = $request->{method};

                unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
                    log_error "Received notification with invalid method '$method'";
                    return;
                }

                my $cb = $worker->{callbacks}->{"msg.$1.$2"} || 
                         $worker->{callbacks}->{"msg.$1.*"};

                local $client->{caller_id}   = $mqtt_properties->{'clid'};
                local $client->{caller_addr} = $mqtt_properties->{'addr'};
                local $client->{auth_data}   = $mqtt_properties->{'auth'};

                unless (($self->authorize_request($request) || "") eq BKPR_REQUEST_AUTHORIZED) {
                    log_error "Notification '$method' was not authorized";
                    return;
                }

lib/Beekeeper/Worker.pm  view on Meta::CPAN

                my $method  = $request->{method};

                bless $request, 'Beekeeper::JSONRPC::Request';
                $request->{_mqtt_properties} = $mqtt_properties;

                unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
                    log_error "Received request with invalid method '$method'";
                    die Beekeeper::JSONRPC::Error->method_not_found;
                }

                my $cb = $worker->{callbacks}->{"req.$1.$2"} || 
                         $worker->{callbacks}->{"req.$1.*"};

                local $client->{caller_id}   = $mqtt_properties->{'clid'};
                local $client->{caller_addr} = $mqtt_properties->{'addr'};
                local $client->{auth_data}   = $mqtt_properties->{'auth'};

                unless (($self->authorize_request($request) || "") eq BKPR_REQUEST_AUTHORIZED) {
                    log_error "Request '$method' was not authorized";
                    die Beekeeper::JSONRPC::Error->request_not_authorized;
                }

lib/Beekeeper/Worker.pm  view on Meta::CPAN


    foreach my $fq_meth (@methods) {

        $fq_meth =~ m/^  ( [\w-]+ (?: \.[\w-]+ )* ) 
                      \. ( [\w-]+ | \* ) $/x or die "Invalid method '$fq_meth' $at";

        my ($service, $method) = ($1, $2);

        my $worker = $self->{_WORKER};

        unless (defined $worker->{callbacks}->{"msg.$fq_meth"}) {
            log_warn "Not previously accepting notifications '$fq_meth' $at";
            next;
        }

        my $local_bus = $self->{_BUS}->{bus_role};

        my $topic = "msg/$local_bus/$service/$method";
        $topic =~ tr|.*|/#|;

        # Cannot remove callbacks right now, as new notifications could be in flight or be 
        # already queued. We must wait for unsubscription completion, and then until the 
        # notification queue is empty to ensure that all received ones were processed. And 
        # even then wait a bit more, as some brokers may send messages *after* unsubscription.
        my $postpone = sub {

           my $unsub_tmr; $unsub_tmr = AnyEvent->timer( 
                after => $UNSUBSCRIBE_LINGER, cb => sub {

                    delete $worker->{callbacks}->{"msg.$fq_meth"};
                    undef $unsub_tmr;
                }
            );
        };

        $self->{_BUS}->unsubscribe(
            topic       => $topic,
            on_unsuback => sub {
                my ($success, $prop) = @_;

lib/Beekeeper/Worker.pm  view on Meta::CPAN


        unless ($method eq '*') {
            # Known limitation. As all calls for an entire service class are received
            # through a single MQTT subscription (in order to load balance them), it is 
            # not possible to reject a single method. A workaround is to use a different
            # class for each method that need to be individually rejected.
            die "Cannot stop accepting individual methods, only '$service.*' is allowed $at";
        }

        my $worker    = $self->{_WORKER};
        my $callbacks = $worker->{callbacks};

        my @cb_keys = grep { $_ =~ m/^req.\Q$service\E\b/ } keys %$callbacks;

        unless (@cb_keys) {
            log_warn "Not previously accepting remote calls '$fq_meth' $at";
            next;
        }

        my $local_bus = $self->{_BUS}->{bus_role};

        my $topic = "\$share/BKPR/req/$local_bus/$service";
        $topic =~ tr|.*|/#|;

        # Cannot remove callbacks right now, as new requests could be in flight or be already 
        # queued. We must wait for unsubscription completion, and then until the task queue 
        # is empty to ensure that all received requests were processed. And even then wait a
        # bit more, as some brokers may send requests *after* unsubscription.
        my $postpone = sub {

            $worker->{stop_cv}->begin;

            my $unsub_tmr; $unsub_tmr = AnyEvent->timer( 
                after => $UNSUBSCRIBE_LINGER, cb => sub {

                    delete $worker->{callbacks}->{$_} foreach @cb_keys;
                    delete $worker->{subscriptions}->{$service};
                    undef $unsub_tmr;

                    return unless $worker->{shutting_down};

                    if ($worker->{in_progress} > 0) {

                        # The task queue is empty now, but an asynchronous method handler is
                        # still busy processing some requests received previously. Wait for
                        # these requests to be completed before telling _work_forever to stop

lib/Beekeeper/Worker.pm  view on Meta::CPAN


    return if $worker->{shutting_down};
    $worker->{shutting_down} = 1;

    unless (defined $worker->{stop_cv}) {
        # Worker did not completed initialization yet
        CORE::exit(0);
    }

    my %services;
    foreach my $fq_meth (keys %{$worker->{callbacks}}) {
        next unless $fq_meth =~ m/^req\.(?!_sync)(.*)\./;
        $services{$1} = 1;
    }

    if (keys %services) {

        # Cannot exit right now, as some requests could be in flight or already queued.
        # So tell the broker to stop sending requests, and exit after the task queue is empty
        foreach my $service (keys %services) {

lib/Beekeeper/Worker.pm  view on Meta::CPAN

    # Average errors per second
    my $err = sprintf("%.2f", $worker->{error_count} / $period);
    $worker->{error_count} = 0;

    # Average load as percentage of wall clock busy time (not cpu usage)
    my $load = sprintf("%.2f", ($BUSY_TIME - $worker->{busy_time}) / $period * 100);
    $worker->{busy_time} = $BUSY_TIME;

    # Queues
    my %queues;
    foreach my $queue (keys %{$worker->{callbacks}}) {
        next unless $queue =~ m/^req\.(?!_sync)(.*)\./;
        $queues{$1} = 1;
    }

    local $client->{auth_data} = $AUTH_TOKENS{'BKPR_SYSTEM'};
    local $client->{caller_id};

    # Tell any supervisor our stats
    $self->fire_remote(
        method => '_bkpr.supervisor.worker_status',



( run in 0.696 second using v1.01-cache-2.11-cpan-9b1e4054eb1 )