Beekeeper
view release on metacpan or search on metacpan
lib/Beekeeper/Worker.pm view on Meta::CPAN
our $Logger = sub { warn(@_) }; # redefined later by __init_logger
our $LogLevel = LOG_INFO;
sub log_fatal (@) { $LogLevel >= LOG_FATAL && $Logger->( LOG_FATAL, @_ ) }
sub log_alert (@) { $LogLevel >= LOG_ALERT && $Logger->( LOG_ALERT, @_ ) }
sub log_critical (@) { $LogLevel >= LOG_CRIT && $Logger->( LOG_CRIT, @_ ) }
sub log_error (@) { $LogLevel >= LOG_ERROR && $Logger->( LOG_ERROR, @_ ) }
sub log_warn (@) { $LogLevel >= LOG_WARN && $Logger->( LOG_WARN, @_ ) }
sub log_warning (@) { $LogLevel >= LOG_WARN && $Logger->( LOG_WARN, @_ ) }
sub log_notice (@) { $LogLevel >= LOG_NOTICE && $Logger->( LOG_NOTICE, @_ ) }
sub log_info (@) { $LogLevel >= LOG_INFO && $Logger->( LOG_INFO, @_ ) }
sub log_debug (@) { $LogLevel >= LOG_DEBUG && $Logger->( LOG_DEBUG, @_ ) }
sub log_trace (@) { $LogLevel >= LOG_TRACE && $Logger->( LOG_TRACE, @_ ) }
sub log_level (;$) { $LogLevel = shift if scalar @_; return $LogLevel }
our $BUSY_SINCE; *BUSY_SINCE = \$Beekeeper::MQTT::BUSY_SINCE;
our $BUSY_TIME; *BUSY_TIME = \$Beekeeper::MQTT::BUSY_TIME;
our $REPORT_STATUS_PERIOD = 5;
our $UNSUBSCRIBE_LINGER = 2;
my %AUTH_TOKENS;
my $DEFLATE;
my $JSON;
sub new {
my ($class, %args) = @_;
# Parameters passed by WorkerPool->spawn_worker
my $self = {
_WORKER => undef,
_CLIENT => undef,
_BUS => undef,
_LOGGER => undef,
};
bless $self, $class;
$self->{_WORKER} = {
parent_pid => $args{'parent_pid'},
foreground => $args{'foreground'}, # --foreground option
debug => $args{'debug'}, # --debug option
bus_config => $args{'bus_config'}, # content of bus.config.json
pool_config => $args{'pool_config'}, # content of pool.config.json
pool_id => $args{'pool_id'},
bus_id => $args{'bus_id'},
config => $args{'config'},
hostname => Sys::Hostname::hostname(),
stop_cv => undef,
callbacks => {},
task_queue_high => [],
task_queue_low => [],
queued_tasks => 0,
in_progress => 0,
last_report => 0,
call_count => 0,
notif_count => 0,
error_count => 0,
busy_time => 0,
};
$JSON = JSON::XS->new;
$JSON->utf8; # encode result as utf8
$JSON->allow_blessed; # encode blessed references as null
$JSON->convert_blessed; # use TO_JSON methods to serialize objects
$DEFLATE = Compress::Raw::Zlib::Deflate->new( -AppendOutput => 1 );
if (defined $SIG{TERM} && $SIG{TERM} eq 'DEFAULT') {
# Stop working gracefully when TERM signal is received
$SIG{TERM} = sub { $self->stop_working };
}
if (defined $SIG{INT} && $SIG{INT} eq 'DEFAULT' && $args{'foreground'}) {
# In foreground mode also stop working gracefully when INT signal is received
$SIG{INT} = sub { $self->stop_working };
}
eval {
# Init logger as soon as possible
$self->__init_logger;
# Connect to broker
$self->__init_client;
# Pass broker connection to logger
$self->{_LOGGER}->{_BUS} = $self->{_BUS} if (exists $self->{_LOGGER}->{_BUS});
$self->__init_auth_tokens;
$self->__init_worker;
};
if ($@) {
log_fatal "Worker died while initialization: $@";
log_fatal "$class could not be started";
CORE::exit( COMPILE_ERROR_EXIT_CODE );
}
return $self;
}
sub __init_auth_tokens {
my ($self) = @_;
# Using a hashing function makes harder to access the wrong worker pool by mistake,
# but it is not an effective access restriction: anyone with access to the backend
# bus credentials can easily inspect and clone auth data tokens
my $salt = $self->{_CLIENT}->{auth_salt};
$AUTH_TOKENS{'BKPR_SYSTEM'} = md5_base64('BKPR_SYSTEM'. $salt);
$AUTH_TOKENS{'BKPR_ADMIN'} = md5_base64('BKPR_ADMIN' . $salt);
$AUTH_TOKENS{'BKPR_ROUTER'} = md5_base64('BKPR_ROUTER'. $salt);
}
sub __has_authorization_token {
my ($self, $auth_level) = @_;
lib/Beekeeper/Worker.pm view on Meta::CPAN
sub stop_accepting_calls {
my ($self, @methods) = @_;
my ($file, $line) = (caller)[1,2];
my $at = "at $file line $line\n";
die "No method specified $at" unless @methods;
foreach my $fq_meth (@methods) {
$fq_meth =~ m/^ ( [\w-]+ (?: \.[\w-]+ )* )
\. ( [\w-]+ | \* ) $/x or die "Invalid remote call method '$fq_meth' $at";
my ($service, $method) = ($1, $2);
unless ($method eq '*') {
# Known limitation. As all calls for an entire service class are received
# through a single MQTT subscription (in order to load balance them), it is
# not possible to reject a single method. A workaround is to use a different
# class for each method that need to be individually rejected.
die "Cannot stop accepting individual methods, only '$service.*' is allowed $at";
}
my $worker = $self->{_WORKER};
my $callbacks = $worker->{callbacks};
my @cb_keys = grep { $_ =~ m/^req.\Q$service\E\b/ } keys %$callbacks;
unless (@cb_keys) {
log_warn "Not previously accepting remote calls '$fq_meth' $at";
next;
}
my $local_bus = $self->{_BUS}->{bus_role};
my $topic = "\$share/BKPR/req/$local_bus/$service";
$topic =~ tr|.*|/#|;
# Cannot remove callbacks right now, as new requests could be in flight or be already
# queued. We must wait for unsubscription completion, and then until the task queue
# is empty to ensure that all received requests were processed. And even then wait a
# bit more, as some brokers may send requests *after* unsubscription.
my $postpone = sub {
$worker->{stop_cv}->begin;
my $unsub_tmr; $unsub_tmr = AnyEvent->timer(
after => $UNSUBSCRIBE_LINGER, cb => sub {
delete $worker->{callbacks}->{$_} foreach @cb_keys;
delete $worker->{subscriptions}->{$service};
undef $unsub_tmr;
return unless $worker->{shutting_down};
if ($worker->{in_progress} > 0) {
# The task queue is empty now, but an asynchronous method handler is
# still busy processing some requests received previously. Wait for
# these requests to be completed before telling _work_forever to stop
my $wait_time = 60;
$worker->{stop_cv}->begin;
my $busy_tmr; $busy_tmr = AnyEvent->timer( after => 1, interval => 1, cb => sub {
unless ($worker->{in_progress} > 0 && --$wait_time > 0) {
undef $busy_tmr;
$worker->{stop_cv}->end;
}
});
}
# Tell _work_forever to stop
$worker->{stop_cv}->end;
}
);
};
$self->{_BUS}->unsubscribe(
topic => $topic,
on_unsuback => sub {
my ($success, $prop) = @_;
log_error "Could not unsubscribe from topic '$topic' $at" unless $success;
my $postponed = $worker->{postponed} ||= [];
push @$postponed, $postpone;
AnyEvent::postpone { $self->__drain_task_queue };
}
);
}
}
sub __work_forever {
my $self = shift;
# Called by WorkerPool->spawn_worker
eval {
my $worker = $self->{_WORKER};
$worker->{stop_cv} = AnyEvent->condvar;
# Blocks here until stop_working is called
$worker->{stop_cv}->recv;
$self->on_shutdown;
$self->__report_exit;
};
if ($@) {
log_fatal "Worker died: $@";
CORE::exit(255);
}
if ($self->{_BUS}->{is_connected}) {
$self->{_BUS}->disconnect;
}
}
sub stop_working {
my ($self, %args) = @_;
my $worker = $self->{_WORKER};
# This is the default handler for TERM signal
return if $worker->{shutting_down};
$worker->{shutting_down} = 1;
unless (defined $worker->{stop_cv}) {
# Worker did not completed initialization yet
CORE::exit(0);
}
my %services;
foreach my $fq_meth (keys %{$worker->{callbacks}}) {
next unless $fq_meth =~ m/^req\.(?!_sync)(.*)\./;
$services{$1} = 1;
}
if (keys %services) {
# Cannot exit right now, as some requests could be in flight or already queued.
# So tell the broker to stop sending requests, and exit after the task queue is empty
foreach my $service (keys %services) {
$self->stop_accepting_calls( $service . '.*' );
}
}
else {
# Tell _work_forever to stop
$worker->{stop_cv}->send;
}
}
sub __report_status {
my $self = shift;
my $worker = $self->{_WORKER};
my $client = $self->{_CLIENT};
my $now = Time::HiRes::time;
my $period = $now - ($worker->{last_report} || ($now - 1));
$worker->{last_report} = $now;
# Average calls per second
my $cps = sprintf("%.2f", $worker->{call_count} / $period);
$worker->{call_count} = 0;
# Average notifications per second
my $nps = sprintf("%.2f", $worker->{notif_count} / $period);
$worker->{notif_count} = 0;
# Average errors per second
my $err = sprintf("%.2f", $worker->{error_count} / $period);
$worker->{error_count} = 0;
# Average load as percentage of wall clock busy time (not cpu usage)
my $load = sprintf("%.2f", ($BUSY_TIME - $worker->{busy_time}) / $period * 100);
$worker->{busy_time} = $BUSY_TIME;
# Queues
my %queues;
foreach my $queue (keys %{$worker->{callbacks}}) {
next unless $queue =~ m/^req\.(?!_sync)(.*)\./;
$queues{$1} = 1;
}
local $client->{auth_data} = $AUTH_TOKENS{'BKPR_SYSTEM'};
local $client->{caller_id};
# Tell any supervisor our stats
$self->fire_remote(
method => '_bkpr.supervisor.worker_status',
params => {
class => ref($self),
host => $worker->{hostname},
pool => $worker->{pool_id},
pid => $$,
cps => $cps,
nps => $nps,
err => $err,
load => $load,
queue => [ keys %queues ],
},
);
}
sub __report_exit {
my $self = shift;
return unless $self->{_BUS}->{is_connected};
my $worker = $self->{_WORKER};
my $client = $self->{_CLIENT};
local $client->{auth_data} = $AUTH_TOKENS{'BKPR_SYSTEM'};
local $client->{caller_id};
$self->fire_remote(
method => '_bkpr.supervisor.worker_exit',
params => {
class => ref($self),
host => $worker->{hostname},
pool => $worker->{pool_id},
pid => $$,
},
);
}
1;
__END__
=pod
=encoding utf8
=head1 NAME
( run in 3.030 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )