Beekeeper
view release on metacpan or search on metacpan
lib/Beekeeper/Worker.pm view on Meta::CPAN
stop_cv => undef,
callbacks => {},
task_queue_high => [],
task_queue_low => [],
queued_tasks => 0,
in_progress => 0,
last_report => 0,
call_count => 0,
notif_count => 0,
error_count => 0,
busy_time => 0,
};
$JSON = JSON::XS->new;
$JSON->utf8; # encode result as utf8
$JSON->allow_blessed; # encode blessed references as null
$JSON->convert_blessed; # use TO_JSON methods to serialize objects
$DEFLATE = Compress::Raw::Zlib::Deflate->new( -AppendOutput => 1 );
if (defined $SIG{TERM} && $SIG{TERM} eq 'DEFAULT') {
# Stop working gracefully when TERM signal is received
$SIG{TERM} = sub { $self->stop_working };
}
if (defined $SIG{INT} && $SIG{INT} eq 'DEFAULT' && $args{'foreground'}) {
# In foreground mode also stop working gracefully when INT signal is received
$SIG{INT} = sub { $self->stop_working };
}
eval {
# Init logger as soon as possible
$self->__init_logger;
# Connect to broker
$self->__init_client;
# Pass broker connection to logger
$self->{_LOGGER}->{_BUS} = $self->{_BUS} if (exists $self->{_LOGGER}->{_BUS});
$self->__init_auth_tokens;
$self->__init_worker;
};
if ($@) {
log_fatal "Worker died while initialization: $@";
log_fatal "$class could not be started";
CORE::exit( COMPILE_ERROR_EXIT_CODE );
}
return $self;
}
sub __init_auth_tokens {
my ($self) = @_;
# Using a hashing function makes harder to access the wrong worker pool by mistake,
# but it is not an effective access restriction: anyone with access to the backend
# bus credentials can easily inspect and clone auth data tokens
my $salt = $self->{_CLIENT}->{auth_salt};
$AUTH_TOKENS{'BKPR_SYSTEM'} = md5_base64('BKPR_SYSTEM'. $salt);
$AUTH_TOKENS{'BKPR_ADMIN'} = md5_base64('BKPR_ADMIN' . $salt);
$AUTH_TOKENS{'BKPR_ROUTER'} = md5_base64('BKPR_ROUTER'. $salt);
}
sub __has_authorization_token {
my ($self, $auth_level) = @_;
my $auth_data = $self->{_CLIENT}->{auth_data};
return 0 unless $auth_data && $auth_level;
return 0 unless exists $AUTH_TOKENS{$auth_level};
return 0 unless $AUTH_TOKENS{$auth_level} eq $auth_data;
return 1;
}
sub __init_logger {
my $self = shift;
# Honor --debug command line option and 'debug' config option from pool.config.json
$LogLevel = LOG_DEBUG if $self->{_WORKER}->{debug} || $self->{_WORKER}->{config}->{debug};
my $log_handler = $self->log_handler;
$self->{_LOGGER} = $log_handler;
$Logger = sub {
# ($level, @messages) = @_
$log_handler->log(@_);
};
$SIG{__WARN__} = sub { $Logger->( LOG_WARN, @_ ) };
}
sub log_handler {
my $self = shift;
Beekeeper::Logger->new(
worker_class => ref $self,
foreground => $self->{_WORKER}->{foreground},
log_file => $self->{_WORKER}->{config}->{log_file},
host => $self->{_WORKER}->{hostname},
pool => $self->{_WORKER}->{pool_id},
_BUS => $self->{_BUS},
@_
);
}
sub __init_client {
my $self = shift;
my $bus_id = $self->{_WORKER}->{bus_id};
my $config = $self->{_WORKER}->{bus_config}->{$bus_id};
my $client = Beekeeper::Client->new(
%$config,
timeout => 0, # retry forever
( run in 1.140 second using v1.01-cache-2.11-cpan-39bf76dae61 )