Daemonise
view release on metacpan or search on metacpan
lib/Daemonise/Plugin/JobQueue.pm view on Meta::CPAN
package Daemonise::Plugin::JobQueue;
use Mouse::Role;
# ABSTRACT: Daemonise JobQueue plugin
use Data::Dumper;
use Digest::MD5 'md5_hex';
use DateTime;
use Carp;
use Basket::Calc;
use Scalar::Util qw(looks_like_number);
BEGIN {
with("Daemonise::Plugin::CouchDB");
with("Daemonise::Plugin::RabbitMQ");
with("Daemonise::Plugin::KyotoTycoon");
}
has 'jobqueue_db' => (
is => 'rw',
isa => 'Str',
lazy => 1,
default => sub { 'jobqueue' },
);
has 'job' => (
is => 'rw',
isa => 'HashRef',
lazy => 1,
default => sub { {} },
);
has 'items_key' => (
is => 'rw',
isa => 'Str',
lazy => 1,
default => sub { 'domains' },
);
has 'item_key' => (
is => 'rw',
isa => 'Str',
lazy => 1,
default => sub { 'domain' },
);
has 'log_worker_enabled' => (
is => 'rw',
isa => 'Bool',
lazy => 1,
default => sub { 1 },
);
has 'job_locked' => (
is => 'rw',
isa => 'Bool',
default => sub { 0 },
);
has 'jobqueue_sync_delay' => (
is => 'rw',
isa => 'Int',
default => sub { 1 },
);
# internal attribute to store all command hooks
has '_hooks' => (
is => 'rw',
isa => 'HashRef[CodeRef]',
);
after 'configure' => sub {
my ($self, $reconfig) = @_;
$self->log("configuring JobQueue plugin") if $self->debug;
if (ref($self->config->{jobqueue}) eq 'HASH') {
foreach my $conf_key ('db', 'sync_delay') {
my $attr = "jobqueue_" . $conf_key;
$self->$attr($self->config->{jobqueue}->{$conf_key})
if defined $self->config->{jobqueue}->{$conf_key};
}
}
return;
};
around 'log' => sub {
my ($orig, $self, $msg) = @_;
if (ref $self->job->{message} eq 'HASH'
and exists $self->job->{message}->{meta})
{
foreach my $meta (qw/account user session id/) {
my $log_meta = $meta eq 'id' ? 'job' : $meta;
$msg =
"$log_meta="
. $self->job->{message}->{meta}->{$meta} . ' '
. $msg
if (exists $self->job->{message}->{meta}->{$meta}
and defined $self->job->{message}->{meta}->{$meta});
}
}
$self->$orig($msg);
return;
};
around 'start' => sub {
my ($orig, $self, $code) = @_;
# check whether we have an optional custom CODEREF
if (defined $code) {
unless (ref $code eq 'CODE') {
$self->log(
"first argument of start() must be a CODEREF! existing...");
$self->stop;
}
}
my $wrapper = sub {
# reset job_locked attribute to unlocked to start with
$self->job_locked(0);
# get the next message from the queue (includes locking)
my $msg = $self->dequeue;
# error if message was empty or not a hashref
$msg = { error => "message must not be empty!" }
unless ref $msg eq 'HASH';
# don't process anything if we already have an error
# e.g. locking failed, empty message, ...
if (exists $msg->{error}) {
$self->_finish_processing($msg);
return;
}
unless (exists $msg->{data}
and ref $msg->{data} eq 'HASH'
and exists $msg->{data}->{command}
and $msg->{data}->{command})
{
$msg->{error} = "msg->data->command missing or empty!";
$self->_finish_processing($msg);
return;
}
my $command = $msg->{data}->{command};
$self->log("[$command]") unless $command eq 'graph';
# give custom code ref preference over hooks
if ($code) {
$msg = $code->($msg);
}
else {
# fallback to 'default' command hook
$command = 'default' unless (exists $self->hooks->{$command});
if (exists $self->hooks->{$command}) {
$msg = $self->hooks->{$command}->($msg);
}
else {
$msg->{error} =
'Command "'
. $msg->{data}->{command}
. '" not defined and no default/fallback found!';
}
}
$self->_finish_processing($msg);
return;
};
return $self->$orig($wrapper);
};
before 'stop' => sub {
my ($self) = @_;
if (exists $self->job->{message} and $self->job_locked) {
$self->unlock_job($self->job->{message});
}
return;
};
around 'queue' => sub {
my ($orig, $self, $queue, $msg, $reply_queue, $exchange) = @_;
if (ref $self->job->{message} eq 'HASH'
and exists $self->job->{message}->{meta})
{
for my $key ('user', 'account', 'session') {
next if exists $msg->{meta}->{$key};
$msg->{meta}->{$key} = $self->job->{message}->{meta}->{$key}
if exists $self->job->{message}->{meta}->{$key};
}
}
$self->unlock_job($msg) if $self->job_locked;
return $self->$orig($queue, $msg, $reply_queue, $exchange);
};
around 'dequeue' => sub {
my ($orig, $self, $tag) = @_;
my $msg = $self->$orig($tag);
# don't lock and store job if it's an RPC response
unless ($tag) {
$self->job({ message => $msg });
$self->lock_job($msg);
}
return $msg;
};
before 'ack' => sub {
my ($self) = @_;
$self->job({});
return;
};
sub hooks {
my ($self, %hooks) = @_;
$self->_hooks(\%hooks) if %hooks;
return $self->_hooks;
}
sub _finish_processing {
my ($self, $msg) = @_;
# methods may return void to prevent further processing
if (ref $msg eq 'HASH') {
$self->log("ERROR: " . $msg->{error}) if exists $msg->{error};
# log if workflow stops and will be started by event or cron
$self->log("waiting for " . $msg->{meta}->{wait_for} . " event/cron")
if (exists $msg->{meta}
and exists $msg->{meta}->{wait_for}
and not $self->wants_reply);
# log worker and update job if we have to
my $status = delete $msg->{status};
if ($self->log_worker_enabled and $self->job_locked) {
$self->log_worker($msg);
$self->update_job($msg, $status);
}
$self->unlock_job($msg) if $self->job_locked;
# reply if needed and wanted
$self->queue($self->reply_queue, $msg) if $self->wants_reply;
}
# at last, ack message
$self->ack;
return;
}
sub lock_job {
my ($self, $msg, $mode) = @_;
# default to locking
$mode = (defined $mode and $mode eq 'unlock') ? 'unlock' : 'lock';
if ( ref $msg eq 'HASH'
and exists $msg->{meta}
and ref $msg->{meta} eq 'HASH'
and exists $msg->{meta}->{id})
{
my $key = 'activejob:' . $msg->{meta}->{id};
my $value = $self->name . ':' . $$;
my $success = $self->$mode($key, $value);
# return error to prevent any further processing in around 'start' wrapper
if ($mode eq 'lock') {
if ($success) {
$self->job_locked(1);
return 1;
}
else {
# we need to take into account that KT may be slow
# replicating so we need a way to re-try here once after
# a set timeout.
sleep($self->jobqueue_sync_delay);
if ($self->$mode($key, $value)) {
$self->job_locked(1);
return 1;
}
$msg->{error} = "job locked by different worker process";
$self->job_locked(0);
return;
}
}
else {
$self->job_locked(0);
return 1;
}
}
return 1;
}
sub unlock_job { return $_[0]->lock_job($_[1], 'unlock'); } ## no critic
sub dont_log_worker { $_[0]->log_worker_enabled(0); return; } ## no critic
sub get_job {
my ($self, $id) = @_;
unless ($id) {
carp "Job ID missing";
return;
}
my $old_db = $self->couchdb->db;
$self->couchdb->db($self->jobqueue_db);
my $job = $self->couchdb->get_doc({ id => $id });
$self->couchdb->db($old_db);
unless ($job) {
carp "job ID not existing: $id";
return;
}
# store job in attribute
$self->job($job);
return $job;
}
sub create_job {
my ($self, $msg) = @_;
unless (ref $msg eq 'HASH') {
carp "msg must be a HASH";
return;
}
# set testmode when running in debug mode
$msg->{data}->{options}->{testmode} = 1 if $self->debug;
# kill duplicate identical jobs with a hashsum over the input data
# and a 2 min caching time
my $created = time;
my $cached = DateTime->from_epoch(epoch => $created);
$cached->truncate(to => 'minute');
$cached->set_minute($cached->minute - ($cached->minute % 2));
my $dumper = Data::Dumper->new([ $msg->{data} ]);
$dumper->Terse(1);
$dumper->Sortkeys(1);
$dumper->Indent(0);
lib/Daemonise/Plugin/JobQueue.pm view on Meta::CPAN
Daemonise::Plugin::JobQueue - Daemonise JobQueue plugin
=head1 VERSION
version 2.13
=head1 SYNOPSIS
use Daemonise;
my $d = Daemonise->new();
$d->debug(1);
$d->foreground(1) if $d->debug;
$d->config_file('/path/to/some.conf');
$d->load_plugin('JobQueue');
$d->configure;
# fetch job from "jobqueue_db" and put it in $d->job
my $job_id = '585675aab87f878c9e98779e9e9c9ccadff';
my $job = $d->get_job($job_id);
# creates a new job in "jobqueue_db"
$job = $d->create_job({ some => { structured => 'hash' } });
# starts new job by sending a new job message to workflow worker
$d->start_job('workflow_name', { user => "kevin", command => "play" });
# searches for job in couchdb using view 'find/by_something' and key provided
$job = $d->find_job('by_bottles', "bottle_id");
# if you REALLY have to persist something in jobqueue right now, rather don't
$d->update_job($d->job->{message} || $job->{message});
# mark job as done and persist
$d->job_done($d->job->{message});
# stops workflow here if it is a job (if it has a message->meta->job_id)
$d->stop_here;
# recalculate totals
$d->recalculate;
# remove items from a job
$d->remove_items($d->job->{message}->{data}->{options}, qw(item1 item2 item3));
=head1 ATTRIBUTES
=head2 jobqueue_db
=head2 job
=head2 items_key
=head2 item_key
=head2 log_worker_enabled
=head2 job_locked
=head2 jobqueue_sync_delay
=head1 SUBROUTINES/METHODS provided
=head2 configure
=head2 log
log additional meta info of a job if present. this adds C<job> (from C<<meta->id>>,
C<session>, C<user>, C<account> from the C<meta> hash in front of each log message
for easy tracking in any kind of log analyzer later.
=head2 start
=head2 stop
unlock job before we get terminated
=head2 queue
pass on some meta information if needed (user, account, session).
unlock job ID before sending it off unless it's already unlocked or locking failed.
=head2 dequeue
store rabbitMQ message in job attribute after receiving.
try to lock job ID if applicable
=head2 ack
empty job attribute before acknowledging a rabbitMQ message
=head2 hooks
method wrapper around _hooks attribute to accept hashes instead of a hash
reference for convenience.
=head2 _finish_processing
this method exists to collect all common tasks needed to finish up a message
1. log error if exists
2. log wait_for key if job stops here
3. log worker & update job unless locking failed (job only)
4. unlock job (job only)
5. reply to calling worker/rabbit if needed
6. acknowledge AMQP message
=head2 lock_job
if message is a job, (un)lock rabbit on it using "activejob:job_id" as key and
"some.rabbit.name:PID" as lock value.
if locking fails, throws error and returns undef, otherwise returns true.
=head2 unlock_job
call lock_job in 'unlock' mode and set boolean attribute
=head2 dont_log_worker
disable worker logging in msg->meta->log array
=head2 get_job
=head2 create_job
=head2 start_job
=head2 update_job
=head2 job_done
=head2 job_failed
=head2 job_pending
=head2 log_worker
=head2 find_job
=head2 find_all_jobs
( run in 1.219 second using v1.01-cache-2.11-cpan-39bf76dae61 )