Daemonise
view release on metacpan or search on metacpan
lib/Daemonise/Plugin/JobQueue.pm view on Meta::CPAN
$job = {
_id => $id,
created => $created,
updated => $created,
message => $msg,
status => 'new',
};
$id = $self->couchdb->put_doc({ doc => $job });
$self->couchdb->db($old_db);
unless ($id) {
carp 'creating job failed: ' . $self->couchdb->error;
return;
}
$self->job($job);
$self->lock_job($msg);
return $job;
}
sub start_job {
my ($self, $workflow, $options, $priority) = @_;
unless ($workflow) {
carp 'workflow not defined';
return;
}
$options = {} unless $options;
my $frame = {
meta => {
lang => 'en',
user => $options->{user_id} || undef,
},
data => {
command => $workflow,
options => $options,
},
};
# add priority, default normal
$frame->{meta}->{priority} = $priority
if defined $priority and $priority =~ m/^(high|low)$/;
# tell the new job who created it
$frame->{meta}->{created_by} = $self->job->{message}->{meta}->{id}
if exists $self->job->{message}->{meta}->{id};
$self->log("starting '$workflow' workflow with:\n" . $self->dump($frame))
if $self->debug;
$self->queue('workflow', $frame);
return;
}
sub update_job {
my ($self, $msg, $status) = @_;
unless ((ref($msg) eq 'HASH')
and (exists $msg->{meta} and exists $msg->{meta}->{id}))
{
$self->log("not a JOB, just a message, nothing to see here")
if $self->debug;
return;
}
my $old_db = $self->couchdb->db;
$self->couchdb->db($self->jobqueue_db);
my $job = $self->get_job($msg->{meta}->{id});
$self->couchdb->db($old_db);
return unless $job;
$self->job($job);
$job->{updated} = time;
$job->{status} = $status || $job->{status};
$job->{message} = $msg;
$old_db = $self->couchdb->db;
$self->couchdb->db($self->jobqueue_db);
($job->{_id}, $job->{_rev}) = $self->couchdb->put_doc({ doc => $job });
$self->couchdb->db($old_db);
if ($self->couchdb->has_error) {
carp 'updating job failed! id: '
. $msg->{meta}->{id}
. ', error: '
. $self->couchdb->error;
return;
}
$self->job($job);
return $job;
}
sub job_done {
my ($self, $msg) = @_;
return $self->update_job($msg, 'done');
}
sub job_failed {
my ($self, $msg) = @_;
return $self->update_job($msg, 'failed');
}
sub job_pending {
my ($self, $msg) = @_;
return $self->update_job($msg, 'pending');
( run in 2.788 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )