Daemonise
view release on metacpan or search on metacpan
lib/Daemonise/Plugin/JobQueue.pm view on Meta::CPAN
has 'log_worker_enabled' => (
is => 'rw',
isa => 'Bool',
lazy => 1,
default => sub { 1 },
);
has 'job_locked' => (
is => 'rw',
isa => 'Bool',
default => sub { 0 },
);
has 'jobqueue_sync_delay' => (
is => 'rw',
isa => 'Int',
default => sub { 1 },
lib/Daemonise/Plugin/JobQueue.pm view on Meta::CPAN
if (defined $code) {
unless (ref $code eq 'CODE') {
$self->log(
"first argument of start() must be a CODEREF! existing...");
$self->stop;
}
}
my $wrapper = sub {
# reset job_locked attribute to unlocked to start with
$self->job_locked(0);
# get the next message from the queue (includes locking)
my $msg = $self->dequeue;
# error if message was empty or not a hashref
$msg = { error => "message must not be empty!" }
unless ref $msg eq 'HASH';
# don't process anything if we already have an error
# e.g. locking failed, empty message, ...
lib/Daemonise/Plugin/JobQueue.pm view on Meta::CPAN
return;
};
return $self->$orig($wrapper);
};
before 'stop' => sub {
my ($self) = @_;
if (exists $self->job->{message} and $self->job_locked) {
$self->unlock_job($self->job->{message});
}
return;
};
around 'queue' => sub {
my ($orig, $self, $queue, $msg, $reply_queue, $exchange) = @_;
lib/Daemonise/Plugin/JobQueue.pm view on Meta::CPAN
and exists $self->job->{message}->{meta})
{
for my $key ('user', 'account', 'session') {
next if exists $msg->{meta}->{$key};
$msg->{meta}->{$key} = $self->job->{message}->{meta}->{$key}
if exists $self->job->{message}->{meta}->{$key};
}
}
$self->unlock_job($msg) if $self->job_locked;
return $self->$orig($queue, $msg, $reply_queue, $exchange);
};
around 'dequeue' => sub {
my ($orig, $self, $tag) = @_;
my $msg = $self->$orig($tag);
lib/Daemonise/Plugin/JobQueue.pm view on Meta::CPAN
$self->log("ERROR: " . $msg->{error}) if exists $msg->{error};
# log if workflow stops and will be started by event or cron
$self->log("waiting for " . $msg->{meta}->{wait_for} . " event/cron")
if (exists $msg->{meta}
and exists $msg->{meta}->{wait_for}
and not $self->wants_reply);
# log worker and update job if we have to
my $status = delete $msg->{status};
if ($self->log_worker_enabled and $self->job_locked) {
$self->log_worker($msg);
$self->update_job($msg, $status);
}
$self->unlock_job($msg) if $self->job_locked;
# reply if needed and wanted
$self->queue($self->reply_queue, $msg) if $self->wants_reply;
}
# at last, ack message
$self->ack;
return;
}
lib/Daemonise/Plugin/JobQueue.pm view on Meta::CPAN
and ref $msg->{meta} eq 'HASH'
and exists $msg->{meta}->{id})
{
my $key = 'activejob:' . $msg->{meta}->{id};
my $value = $self->name . ':' . $$;
my $success = $self->$mode($key, $value);
# return error to prevent any further processing in around 'start' wrapper
if ($mode eq 'lock') {
if ($success) {
$self->job_locked(1);
return 1;
}
else {
# we need to take into account that KT may be slow
# replicating so we need a way to re-try here once after
# a set timeout.
sleep($self->jobqueue_sync_delay);
if ($self->$mode($key, $value)) {
$self->job_locked(1);
return 1;
}
$msg->{error} = "job locked by different worker process";
$self->job_locked(0);
return;
}
}
else {
$self->job_locked(0);
return 1;
}
}
return 1;
}
sub unlock_job { return $_[0]->lock_job($_[1], 'unlock'); } ## no critic
lib/Daemonise/Plugin/JobQueue.pm view on Meta::CPAN
=head2 jobqueue_db
=head2 job
=head2 items_key
=head2 item_key
=head2 log_worker_enabled
=head2 job_locked
=head2 jobqueue_sync_delay
=head1 SUBROUTINES/METHODS provided
=head2 configure
=head2 log
log additional meta info of a job if present. this adds C<job> (from C<<meta->id>>,
lib/Daemonise/Plugin/JobQueue.pm view on Meta::CPAN
=head2 start
=head2 stop
unlock job before we get terminated
=head2 queue
pass on some meta information if needed (user, account, session).
unlock job ID before sending it off unless it's already unlocked or locking failed.
=head2 dequeue
store rabbitMQ message in job attribute after receiving.
try to lock job ID if applicable
=head2 ack
empty job attribute before acknowledging a rabbitMQ message
( run in 1.167 second using v1.01-cache-2.11-cpan-49f99fa48dc )