Beekeeper
view release on metacpan or search on metacpan
lib/Beekeeper/Worker.pm view on Meta::CPAN
sub stop_working {
my ($self, %args) = @_;
my $worker = $self->{_WORKER};
# This is the default handler for TERM signal
return if $worker->{shutting_down};
$worker->{shutting_down} = 1;
unless (defined $worker->{stop_cv}) {
# Worker did not completed initialization yet
CORE::exit(0);
}
my %services;
foreach my $fq_meth (keys %{$worker->{callbacks}}) {
next unless $fq_meth =~ m/^req\.(?!_sync)(.*)\./;
$services{$1} = 1;
}
if (keys %services) {
# Cannot exit right now, as some requests could be in flight or already queued.
# So tell the broker to stop sending requests, and exit after the task queue is empty
foreach my $service (keys %services) {
$self->stop_accepting_calls( $service . '.*' );
}
}
else {
# Tell _work_forever to stop
$worker->{stop_cv}->send;
}
}
sub __report_status {
my $self = shift;
my $worker = $self->{_WORKER};
my $client = $self->{_CLIENT};
my $now = Time::HiRes::time;
my $period = $now - ($worker->{last_report} || ($now - 1));
$worker->{last_report} = $now;
# Average calls per second
my $cps = sprintf("%.2f", $worker->{call_count} / $period);
$worker->{call_count} = 0;
# Average notifications per second
my $nps = sprintf("%.2f", $worker->{notif_count} / $period);
$worker->{notif_count} = 0;
# Average errors per second
my $err = sprintf("%.2f", $worker->{error_count} / $period);
$worker->{error_count} = 0;
# Average load as percentage of wall clock busy time (not cpu usage)
my $load = sprintf("%.2f", ($BUSY_TIME - $worker->{busy_time}) / $period * 100);
$worker->{busy_time} = $BUSY_TIME;
# Queues
my %queues;
foreach my $queue (keys %{$worker->{callbacks}}) {
next unless $queue =~ m/^req\.(?!_sync)(.*)\./;
$queues{$1} = 1;
}
local $client->{auth_data} = $AUTH_TOKENS{'BKPR_SYSTEM'};
local $client->{caller_id};
# Tell any supervisor our stats
$self->fire_remote(
method => '_bkpr.supervisor.worker_status',
params => {
class => ref($self),
host => $worker->{hostname},
pool => $worker->{pool_id},
pid => $$,
cps => $cps,
nps => $nps,
err => $err,
load => $load,
queue => [ keys %queues ],
},
);
}
sub __report_exit {
my $self = shift;
return unless $self->{_BUS}->{is_connected};
my $worker = $self->{_WORKER};
my $client = $self->{_CLIENT};
local $client->{auth_data} = $AUTH_TOKENS{'BKPR_SYSTEM'};
local $client->{caller_id};
$self->fire_remote(
method => '_bkpr.supervisor.worker_exit',
params => {
class => ref($self),
host => $worker->{hostname},
pool => $worker->{pool_id},
pid => $$,
},
);
}
1;
__END__
=pod
=encoding utf8
( run in 3.069 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )