Daemonise

 view release on metacpan or  search on metacpan

lib/Daemonise/Plugin/JobQueue.pm  view on Meta::CPAN



has 'log_worker_enabled' => (
    is      => 'rw',
    isa     => 'Bool',
    lazy    => 1,
    default => sub { 1 },
);


has 'job_locked' => (
    is      => 'rw',
    isa     => 'Bool',
    default => sub { 0 },
);


has 'jobqueue_sync_delay' => (
    is      => 'rw',
    isa     => 'Int',
    default => sub { 1 },

lib/Daemonise/Plugin/JobQueue.pm  view on Meta::CPAN

    if (defined $code) {
        unless (ref $code eq 'CODE') {
            $self->log(
                "first argument of start() must be a CODEREF! existing...");
            $self->stop;
        }
    }

    my $wrapper = sub {

        # reset job_locked attribute to unlocked to start with
        $self->job_locked(0);

        # get the next message from the queue (includes locking)
        my $msg = $self->dequeue;

        # error if message was empty or not a hashref
        $msg = { error => "message must not be empty!" }
            unless ref $msg eq 'HASH';

        # don't process anything if we already have an error
        # e.g. locking failed, empty message, ...

lib/Daemonise/Plugin/JobQueue.pm  view on Meta::CPAN

        return;
    };

    return $self->$orig($wrapper);
};


before 'stop' => sub {
    my ($self) = @_;

    if (exists $self->job->{message} and $self->job_locked) {
        $self->unlock_job($self->job->{message});
    }

    return;
};


around 'queue' => sub {
    my ($orig, $self, $queue, $msg, $reply_queue, $exchange) = @_;

lib/Daemonise/Plugin/JobQueue.pm  view on Meta::CPAN

        and exists $self->job->{message}->{meta})
    {
        for my $key ('user', 'account', 'session') {
            next if exists $msg->{meta}->{$key};

            $msg->{meta}->{$key} = $self->job->{message}->{meta}->{$key}
                if exists $self->job->{message}->{meta}->{$key};
        }
    }

    $self->unlock_job($msg) if $self->job_locked;

    return $self->$orig($queue, $msg, $reply_queue, $exchange);
};


around 'dequeue' => sub {
    my ($orig, $self, $tag) = @_;

    my $msg = $self->$orig($tag);

lib/Daemonise/Plugin/JobQueue.pm  view on Meta::CPAN

        $self->log("ERROR: " . $msg->{error}) if exists $msg->{error};

        # log if workflow stops and will be started by event or cron
        $self->log("waiting for " . $msg->{meta}->{wait_for} . " event/cron")
            if (exists $msg->{meta}
            and exists $msg->{meta}->{wait_for}
            and not $self->wants_reply);

        # log worker and update job if we have to
        my $status = delete $msg->{status};
        if ($self->log_worker_enabled and $self->job_locked) {
            $self->log_worker($msg);
            $self->update_job($msg, $status);
        }

        $self->unlock_job($msg) if $self->job_locked;

        # reply if needed and wanted
        $self->queue($self->reply_queue, $msg) if $self->wants_reply;
    }

    # at last, ack message
    $self->ack;

    return;
}

lib/Daemonise/Plugin/JobQueue.pm  view on Meta::CPAN

        and ref $msg->{meta} eq 'HASH'
        and exists $msg->{meta}->{id})
    {
        my $key     = 'activejob:' . $msg->{meta}->{id};
        my $value   = $self->name . ':' . $$;
        my $success = $self->$mode($key, $value);

        # return error to prevent any further processing in around 'start' wrapper
        if ($mode eq 'lock') {
            if ($success) {
                $self->job_locked(1);
                return 1;
            }
            else {
                # we need to take into account that KT may be slow
                # replicating so we need a way to re-try here once after
                # a set timeout.
                sleep($self->jobqueue_sync_delay);
                if ($self->$mode($key, $value)) {
                    $self->job_locked(1);
                    return 1;
                }
                $msg->{error} = "job locked by different worker process";
                $self->job_locked(0);
                return;
            }
        }
        else {
            $self->job_locked(0);
            return 1;
        }
    }

    return 1;
}


sub unlock_job { return $_[0]->lock_job($_[1], 'unlock'); }    ## no critic

lib/Daemonise/Plugin/JobQueue.pm  view on Meta::CPAN

=head2 jobqueue_db

=head2 job

=head2 items_key

=head2 item_key

=head2 log_worker_enabled

=head2 job_locked

=head2 jobqueue_sync_delay

=head1 SUBROUTINES/METHODS provided

=head2 configure

=head2 log

log additional meta info of a job if present. this adds C<job> (from C<<meta->id>>,

lib/Daemonise/Plugin/JobQueue.pm  view on Meta::CPAN


=head2 start

=head2 stop

unlock job before we get terminated

=head2 queue

pass on some meta information if needed (user, account, session).
unlock job ID before sending it off unless it's already unlocked or locking failed.

=head2 dequeue

store rabbitMQ message in job attribute after receiving.
try to lock job ID if applicable

=head2 ack

empty job attribute before acknowledging a rabbitMQ message



( run in 1.167 second using v1.01-cache-2.11-cpan-49f99fa48dc )