Daemonise

 view release on metacpan or  search on metacpan

lib/Daemonise/Plugin/JobQueue.pm  view on Meta::CPAN

    $job = {
        _id     => $id,
        created => $created,
        updated => $created,
        message => $msg,
        status  => 'new',
    };

    $id = $self->couchdb->put_doc({ doc => $job });
    $self->couchdb->db($old_db);

    unless ($id) {
        carp 'creating job failed: ' . $self->couchdb->error;
        return;
    }

    $self->job($job);
    $self->lock_job($msg);

    return $job;
}


sub start_job {
    my ($self, $workflow, $options, $priority) = @_;

    unless ($workflow) {
        carp 'workflow not defined';
        return;
    }

    $options = {} unless $options;

    my $frame = {
        meta => {
            lang => 'en',
            user => $options->{user_id} || undef,
        },
        data => {
            command => $workflow,
            options => $options,
        },
    };

    # add priority, default normal
    $frame->{meta}->{priority} = $priority
        if defined $priority and $priority =~ m/^(high|low)$/;

    # tell the new job who created it
    $frame->{meta}->{created_by} = $self->job->{message}->{meta}->{id}
        if exists $self->job->{message}->{meta}->{id};

    $self->log("starting '$workflow' workflow with:\n" . $self->dump($frame))
        if $self->debug;
    $self->queue('workflow', $frame);

    return;
}


sub update_job {
    my ($self, $msg, $status) = @_;

    unless ((ref($msg) eq 'HASH')
        and (exists $msg->{meta} and exists $msg->{meta}->{id}))
    {
        $self->log("not a JOB, just a message, nothing to see here")
            if $self->debug;
        return;
    }

    my $old_db = $self->couchdb->db;
    $self->couchdb->db($self->jobqueue_db);
    my $job = $self->get_job($msg->{meta}->{id});
    $self->couchdb->db($old_db);

    return unless $job;

    $self->job($job);

    $job->{updated} = time;
    $job->{status}  = $status || $job->{status};
    $job->{message} = $msg;

    $old_db = $self->couchdb->db;
    $self->couchdb->db($self->jobqueue_db);
    ($job->{_id}, $job->{_rev}) = $self->couchdb->put_doc({ doc => $job });
    $self->couchdb->db($old_db);

    if ($self->couchdb->has_error) {
        carp 'updating job failed! id: '
            . $msg->{meta}->{id}
            . ', error: '
            . $self->couchdb->error;
        return;
    }

    $self->job($job);

    return $job;
}


sub job_done {
    my ($self, $msg) = @_;

    return $self->update_job($msg, 'done');
}


sub job_failed {
    my ($self, $msg) = @_;

    return $self->update_job($msg, 'failed');
}


sub job_pending {
    my ($self, $msg) = @_;

    return $self->update_job($msg, 'pending');



( run in 2.788 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )