view release on metacpan or search on metacpan
lib/Beekeeper/AnyEvent.pm view on Meta::CPAN
our $VERSION = '0.10';
use AnyEvent;
use AnyEvent::Socket;
USE_PERL_BACKEND: {
# Prefer AnyEvent perl backend over default EV, as it is fast enough
# and it does not ignore exceptions thrown from within callbacks
$ENV{'PERL_ANYEVENT_MODEL'} ||= 'Perl' unless $AnyEvent::MODEL;
}
UNTAINT_IP_ADDR: {
no strict 'refs';
no warnings 'redefine';
# Addresses resolved by AnyEvent::DNS are tainted, causing an "Insecure
lib/Beekeeper/AnyEvent.pm view on Meta::CPAN
=head1 DESCRIPTION
This module alters the default behavior of AnyEvent as follows:
=over
=item *
Prefer the pure perl backend over default EV, as it is fast enough and
it does not ignore exceptions thrown from within callbacks.
=item *
Addresses resolved by AnyEvent::DNS are tainted, causing an "Insecure
dependency in connect" error as Beekeeper runs with taint mode enabled.
This module untaints resolved addresses, which can be done safely because
AnyEvent validates these addresses properly before using them.
=back
lib/Beekeeper/Client.pm view on Meta::CPAN
forward_to => undef,
response_topic => undef,
in_progress => undef,
curr_request => undef,
caller_id => undef,
caller_addr => undef,
auth_data => undef,
auth_salt => undef,
async_cv => undef,
correlation_id => 1,
callbacks => {},
};
unless (exists $args{'username'} && exists $args{'password'}) {
# Get broker connection parameters from config file
my $bus_id = $args{'bus_id'};
if (defined $bus_id) {
# Use parameters for specific bus
lib/Beekeeper/Client.pm view on Meta::CPAN
$self->{_BUS}->publish( payload => \$json, %send_args );
}
sub accept_notifications {
my ($self, %args) = @_;
my ($file, $line) = (caller)[1,2];
my $at = "at $file line $line\n";
my $callbacks = $self->{_CLIENT}->{callbacks};
foreach my $fq_meth (keys %args) {
$fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
\. ( [\w-]+ | \* ) $/x or croak "Invalid notification method '$fq_meth'";
my ($service, $method) = ($1, $2);
my $callback = $args{$fq_meth};
unless (ref $callback eq 'CODE') {
croak "Invalid callback for '$fq_meth'";
}
croak "Already accepting notifications '$fq_meth'" if exists $callbacks->{"msg.$fq_meth"};
$callbacks->{"msg.$fq_meth"} = $callback;
#TODO: Allow to accept private notifications without subscribing
my $local_bus = $self->{_BUS}->{bus_role};
my $topic = "msg/$local_bus/$service/$method";
$topic =~ tr|.*|/#|;
$self->{_BUS}->subscribe(
topic => $topic,
lib/Beekeeper/Client.pm view on Meta::CPAN
bless $request, 'Beekeeper::JSONRPC::Notification';
$request->{_mqtt_properties} = $mqtt_properties;
my $method = $request->{method};
unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
warn "Received notification with invalid method '$method' $at";
return;
}
my $cb = $callbacks->{"msg.$1.$2"} ||
$callbacks->{"msg.$1.*"};
unless ($cb) {
warn "No callback found for received notification '$method' $at";
return;
}
$cb->($request->{params}, $request);
},
on_suback => sub {
my ($success, $prop) = @_;
lib/Beekeeper/Client.pm view on Meta::CPAN
croak "No method specified" unless @methods;
foreach my $fq_meth (@methods) {
$fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
\. ( [\w-]+ | \* ) $/x or croak "Invalid method '$fq_meth'";
my ($service, $method) = ($1, $2);
unless (defined $self->{_CLIENT}->{callbacks}->{"msg.$fq_meth"}) {
carp "Not previously accepting notifications '$fq_meth'";
next;
}
my $local_bus = $self->{_BUS}->{bus_role};
my $topic = "msg/$local_bus/$service/$method";
$topic =~ tr|.*|/#|;
$self->{_BUS}->unsubscribe(
topic => $topic,
on_unsuback => sub {
my ($success, $prop) = @_;
die "Could not unsubscribe from topic '$topic' $at" unless $success;
delete $self->{_CLIENT}->{callbacks}->{"msg.$fq_meth"};
},
);
}
}
our $AE_WAITING;
sub call_remote {
my $self = shift;
lib/Beekeeper/Client.pm view on Meta::CPAN
bless $resp, 'Beekeeper::JSONRPC::Notification';
$resp->{_headers} = $mqtt_properties;
my $method = $resp->{method};
unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
warn "Received notification with invalid method '$method' $at";
return;
}
my $cb = $client->{callbacks}->{"msg.$1.$2"} ||
$client->{callbacks}->{"msg.$1.*"};
unless ($cb) {
warn "No callback found for received notification '$method' $at";
return;
}
$cb->($resp->{params}, $resp);
}
},
on_suback => sub {
lib/Beekeeper/Client.pm view on Meta::CPAN
Makes a client start accepting the specified notifications from the message bus.
C<$method> is a string with the format C<{service_class}.{method}>. A default
or fallback handler can be specified using a wildcard like C<{service_class}.*>.
C<$callback> is a coderef that will be called when a notification is received.
When executed, the callback will receive a parameter C<$params> which contains
the notification value or data structure sent.
Please note that callbacks will not be executed timely if AnyEvent loop is not running.
=head3 stop_accepting_notifications ( $method, ... )
Makes a client stop accepting the specified notifications from the message bus.
C<$method> must be one of the strings used previously in C<accept_notifications>.
=head3 call_remote ( %args )
Makes a synchronous RPC call to a service worker through the message bus.
lib/Beekeeper/Client.pm view on Meta::CPAN
=back
=head3 call_remote_async ( %args )
Makes an asynchronous RPC call to a service worker through the message bus.
It returns immediately a L<Beekeeper::JSONRPC::Request> object which, once completed,
will have a defined C<response>.
This method accepts parameters C<method>, C<params>, C<address> and C<timeout>
the same as C<call_remote>. Additionally two callbacks can be specified:
=over
=item on_success
Callback which will be executed after receiving a successful response with a
L<Beekeeper::JSONRPC::Response> object as parameter. Must be a coderef.
=item on_error
lib/Beekeeper/MQTT.pm view on Meta::CPAN
connect_err => undef, # count of connection errors
timeout_tmr => undef, # timer used for connection timeout
reconnect_tmr => undef, # timer used for connection retry
connack_cb => undef, # connack callback
error_cb => undef, # error callback
client_id => undef, # client id
server_prop => {}, # server properties
server_alias => {}, # server topic aliases
client_alias => {}, # client topic aliases
subscriptions => {}, # topic subscriptions
subscr_cb => {}, # subscription callbacks
packet_cb => {}, # packet callbacks
buffers => {}, # raw mqtt buffers
packet_seq => 1, # sequence used for packet ids
subscr_seq => 1, # sequence used for subscription ids
alias_seq => 1, # sequence used for topic alias ids
use_alias => 0, # topic alias enabled
config => \%args,
};
$self->{bus_id} = delete $args{'bus_id'};
$self->{bus_role} = delete $args{'bus_role'} || $self->{bus_id};
lib/Beekeeper/MQTT.pm view on Meta::CPAN
# Trim variable header from packet, the remaining is the payload
substr($$packet, 0, $prop_end, '');
if ($prop{'payload_format'}) {
# Payload is UTF-8 Encoded Character Data
utf8::decode( $$packet );
}
foreach (@subscr_ids) {
# Execute subscriptions callbacks
$self->{subscr_cb}->{$_}->($packet, \%prop);
}
}
sub puback {
my ($self, %args) = @_;
croak "Missing packet_id" unless $args{'packet_id'};
lib/Beekeeper/MQTT.pm view on Meta::CPAN
}
sub discard_buffer {
my ($self, %args) = @_;
my $buffer = delete $self->{buffers}->{$args{'buffer_id'}};
# Nothing to do if nothing was buffered
return unless $buffer;
# Remove all pending puback callbacks, as those will never be executed
foreach my $packet_id (keys %{$buffer->{packet_ids}}) {
delete $self->{packet_cb}->{$packet_id};
}
1;
}
sub DESTROY {
my $self = shift;
lib/Beekeeper/Worker.pm view on Meta::CPAN
parent_pid => $args{'parent_pid'},
foreground => $args{'foreground'}, # --foreground option
debug => $args{'debug'}, # --debug option
bus_config => $args{'bus_config'}, # content of bus.config.json
pool_config => $args{'pool_config'}, # content of pool.config.json
pool_id => $args{'pool_id'},
bus_id => $args{'bus_id'},
config => $args{'config'},
hostname => Sys::Hostname::hostname(),
stop_cv => undef,
callbacks => {},
task_queue_high => [],
task_queue_low => [],
queued_tasks => 0,
in_progress => 0,
last_report => 0,
call_count => 0,
notif_count => 0,
error_count => 0,
busy_time => 0,
};
lib/Beekeeper/Worker.pm view on Meta::CPAN
my $class = ref $_[0];
log_fatal "Worker class $class doesn't define authorize_request() method";
return undef; # do not authorize
}
sub accept_notifications {
my ($self, %args) = @_;
my $worker = $self->{_WORKER};
my $callbacks = $worker->{callbacks};
my ($file, $line) = (caller)[1,2];
my $at = "at $file line $line\n";
foreach my $fq_meth (keys %args) {
$fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
\. ( [\w-]+ | \* ) $/x or die "Invalid notification method '$fq_meth' $at";
my ($service, $method) = ($1, $2);
my $callback = $self->__get_cb_coderef($fq_meth, $args{$fq_meth});
die "Already accepting notifications '$fq_meth' $at" if exists $callbacks->{"msg.$fq_meth"};
$callbacks->{"msg.$fq_meth"} = $callback;
my $local_bus = $self->{_BUS}->{bus_role};
my $topic = "msg/$local_bus/$service/$method";
$topic =~ tr|.*|/#|;
$self->{_BUS}->subscribe(
topic => $topic,
on_publish => sub {
# ($payload_ref, $properties) = @_;
lib/Beekeeper/Worker.pm view on Meta::CPAN
my $at = "at $file line $line\n";
die "Invalid handler '$callback' for '$method' $at";
}
}
sub accept_remote_calls {
my ($self, %args) = @_;
my $worker = $self->{_WORKER};
my $callbacks = $worker->{callbacks};
my %subscribed_to;
my ($file, $line) = (caller)[1,2];
my $at = "at $file line $line\n";
foreach my $fq_meth (keys %args) {
$fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
\. ( [\w-]+ | \* ) $/x or die "Invalid remote call method '$fq_meth' $at";
my ($service, $method) = ($1, $2);
my $callback = $self->__get_cb_coderef($fq_meth, $args{$fq_meth});
die "Already accepting remote calls '$fq_meth' $at" if exists $callbacks->{"req.$fq_meth"};
$callbacks->{"req.$fq_meth"} = $callback;
next if $subscribed_to{$service};
$subscribed_to{$service} = 1;
if (keys %subscribed_to == 2) {
log_warn "Running multiple services within a single worker hurts load balancing $at";
}
my $local_bus = $self->{_BUS}->{bus_role};
lib/Beekeeper/Worker.pm view on Meta::CPAN
bless $request, 'Beekeeper::JSONRPC::Notification';
$request->{_mqtt_properties} = $mqtt_properties;
my $method = $request->{method};
unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
log_error "Received notification with invalid method '$method'";
return;
}
my $cb = $worker->{callbacks}->{"msg.$1.$2"} ||
$worker->{callbacks}->{"msg.$1.*"};
local $client->{caller_id} = $mqtt_properties->{'clid'};
local $client->{caller_addr} = $mqtt_properties->{'addr'};
local $client->{auth_data} = $mqtt_properties->{'auth'};
unless (($self->authorize_request($request) || "") eq BKPR_REQUEST_AUTHORIZED) {
log_error "Notification '$method' was not authorized";
return;
}
lib/Beekeeper/Worker.pm view on Meta::CPAN
my $method = $request->{method};
bless $request, 'Beekeeper::JSONRPC::Request';
$request->{_mqtt_properties} = $mqtt_properties;
unless (defined $method && $method =~ m/^([\.\w-]+)\.([\w-]+)$/) {
log_error "Received request with invalid method '$method'";
die Beekeeper::JSONRPC::Error->method_not_found;
}
my $cb = $worker->{callbacks}->{"req.$1.$2"} ||
$worker->{callbacks}->{"req.$1.*"};
local $client->{caller_id} = $mqtt_properties->{'clid'};
local $client->{caller_addr} = $mqtt_properties->{'addr'};
local $client->{auth_data} = $mqtt_properties->{'auth'};
unless (($self->authorize_request($request) || "") eq BKPR_REQUEST_AUTHORIZED) {
log_error "Request '$method' was not authorized";
die Beekeeper::JSONRPC::Error->request_not_authorized;
}
lib/Beekeeper/Worker.pm view on Meta::CPAN
foreach my $fq_meth (@methods) {
$fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
\. ( [\w-]+ | \* ) $/x or die "Invalid method '$fq_meth' $at";
my ($service, $method) = ($1, $2);
my $worker = $self->{_WORKER};
unless (defined $worker->{callbacks}->{"msg.$fq_meth"}) {
log_warn "Not previously accepting notifications '$fq_meth' $at";
next;
}
my $local_bus = $self->{_BUS}->{bus_role};
my $topic = "msg/$local_bus/$service/$method";
$topic =~ tr|.*|/#|;
# Cannot remove callbacks right now, as new notifications could be in flight or be
# already queued. We must wait for unsubscription completion, and then until the
# notification queue is empty to ensure that all received ones were processed. And
# even then wait a bit more, as some brokers may send messages *after* unsubscription.
my $postpone = sub {
my $unsub_tmr; $unsub_tmr = AnyEvent->timer(
after => $UNSUBSCRIBE_LINGER, cb => sub {
delete $worker->{callbacks}->{"msg.$fq_meth"};
undef $unsub_tmr;
}
);
};
$self->{_BUS}->unsubscribe(
topic => $topic,
on_unsuback => sub {
my ($success, $prop) = @_;
lib/Beekeeper/Worker.pm view on Meta::CPAN
unless ($method eq '*') {
# Known limitation. As all calls for an entire service class are received
# through a single MQTT subscription (in order to load balance them), it is
# not possible to reject a single method. A workaround is to use a different
# class for each method that need to be individually rejected.
die "Cannot stop accepting individual methods, only '$service.*' is allowed $at";
}
my $worker = $self->{_WORKER};
my $callbacks = $worker->{callbacks};
my @cb_keys = grep { $_ =~ m/^req.\Q$service\E\b/ } keys %$callbacks;
unless (@cb_keys) {
log_warn "Not previously accepting remote calls '$fq_meth' $at";
next;
}
my $local_bus = $self->{_BUS}->{bus_role};
my $topic = "\$share/BKPR/req/$local_bus/$service";
$topic =~ tr|.*|/#|;
# Cannot remove callbacks right now, as new requests could be in flight or be already
# queued. We must wait for unsubscription completion, and then until the task queue
# is empty to ensure that all received requests were processed. And even then wait a
# bit more, as some brokers may send requests *after* unsubscription.
my $postpone = sub {
$worker->{stop_cv}->begin;
my $unsub_tmr; $unsub_tmr = AnyEvent->timer(
after => $UNSUBSCRIBE_LINGER, cb => sub {
delete $worker->{callbacks}->{$_} foreach @cb_keys;
delete $worker->{subscriptions}->{$service};
undef $unsub_tmr;
return unless $worker->{shutting_down};
if ($worker->{in_progress} > 0) {
# The task queue is empty now, but an asynchronous method handler is
# still busy processing some requests received previously. Wait for
# these requests to be completed before telling _work_forever to stop
lib/Beekeeper/Worker.pm view on Meta::CPAN
return if $worker->{shutting_down};
$worker->{shutting_down} = 1;
unless (defined $worker->{stop_cv}) {
# Worker did not completed initialization yet
CORE::exit(0);
}
my %services;
foreach my $fq_meth (keys %{$worker->{callbacks}}) {
next unless $fq_meth =~ m/^req\.(?!_sync)(.*)\./;
$services{$1} = 1;
}
if (keys %services) {
# Cannot exit right now, as some requests could be in flight or already queued.
# So tell the broker to stop sending requests, and exit after the task queue is empty
foreach my $service (keys %services) {
lib/Beekeeper/Worker.pm view on Meta::CPAN
# Average errors per second
my $err = sprintf("%.2f", $worker->{error_count} / $period);
$worker->{error_count} = 0;
# Average load as percentage of wall clock busy time (not cpu usage)
my $load = sprintf("%.2f", ($BUSY_TIME - $worker->{busy_time}) / $period * 100);
$worker->{busy_time} = $BUSY_TIME;
# Queues
my %queues;
foreach my $queue (keys %{$worker->{callbacks}}) {
next unless $queue =~ m/^req\.(?!_sync)(.*)\./;
$queues{$1} = 1;
}
local $client->{auth_data} = $AUTH_TOKENS{'BKPR_SYSTEM'};
local $client->{caller_id};
# Tell any supervisor our stats
$self->fire_remote(
method => '_bkpr.supervisor.worker_status',