Gearman-JobScheduler
view release on metacpan or search on metacpan
lib/Gearman/JobScheduler/Worker.pm view on Meta::CPAN
package Gearman::JobScheduler::Worker;
#
# GJS worker helpers
#
use strict;
use warnings;
use Modern::Perl "2012";
use Gearman::JobScheduler;
use Gearman::JobScheduler::Configuration;
use Gearman::XS qw(:constants);
use Gearman::XS::Worker;
use Log::Log4perl qw(:easy);
Log::Log4perl->easy_init({ level => $DEBUG, utf8=>1, layout => "%d{ISO8601} [%P]: %m%n" });
# Import Gearman function Perl module by path or name
sub import_gearman_function($)
{
my ($path_or_name) = shift;
eval {
if ($path_or_name =~ /\.pm$/) {
# /somewhere/Foo/Bar.pm
# Expect the package to return its name so that we'll know how to call it:
# http://stackoverflow.com/a/9850017/200603
$path_or_name = require $path_or_name;
if ($path_or_name . '' eq '1') {
LOGDIE("The function package should return __PACKAGE__ at the end of the file instead of just 1.");
}
$path_or_name->import();
1;
} else {
# Foo::Bar
( my $file = $path_or_name ) =~ s|::|/|g;
require $file . '.pm';
$path_or_name->import();
1;
}
} or do
{
LOGDIE("Unable to find Gearman function in '$path_or_name': $@");
};
return $path_or_name;
}
# Run a single worker (it should be imported already)
sub _worker($)
{
my ($gearman_function_name) = @_;
my $config = $gearman_function_name->configuration();
INFO("Will use Gearman servers: " . join(' ', @{$config->gearman_servers}));
INFO("Will write logs to: " . $config->worker_log_dir);
if (scalar @{$config->notifications_emails}) {
INFO('Will send notifications about failed jobs to: ' . join(' ', @{$config->notifications_emails}));
INFO('(emails will be sent from "' . $config->notifications_from_address
. '" and prefixed with "' . $config->notifications_subject_prefix . '")');
} else {
INFO('Will not send notifications anywhere about failed jobs.');
}
my $ret;
my $worker = new Gearman::XS::Worker;
$ret = $worker->add_servers(join(',', @{$config->gearman_servers}));
unless ($ret == GEARMAN_SUCCESS) {
LOGDIE("Unable to add Gearman servers: " . $worker->error());
}
INFO("Job priority: " . $gearman_function_name->priority());
$ret = $worker->add_function(
$gearman_function_name,
0,
sub {
my ($gearman_job) = shift;
my $job_handle = $gearman_job->handle();
my $result;
eval {
$result = $gearman_function_name->_run_locally_from_gearman_worker($config, $gearman_job);
};
if ($@) {
( run in 3.267 seconds using v1.01-cache-2.11-cpan-437f7b0c052 )