MangoX-Queue

 view release on metacpan or  search on metacpan

README.pod  view on Meta::CPAN

            });
            return undef;
        }
    });
}

sub requeue {
    my ($self, $job, $callback) = @_;

    my $pending = $self->{_pending_status} // $self->init_status->{_pending_status};
    $job->{status} = ref($pending) eq 'ARRAY' ? $pending->[0] : $pending;
    return $self->update($job, $callback);
}

sub dequeue {
    my ($self, $id_or_job, $callback) = @_;

    # TODO option to not remove on dequeue?

    my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job;

    if($callback) {
        $self->collection->remove({'_id' => $id} => sub {
            my ($collection, $error, $doc) = @_;

            if($error) {
                $self->emit_safe(error => qq(Error removing job from collection: $error), $id_or_job, $error) if $self->has_subscribers('error');
                $self->run_callback($callback, $id_or_job, $error);
                return;
            }

            $self->run_callback($callback, $id_or_job, undef);
            $self->emit_safe(dequeued => $id_or_job) if $self->has_subscribers('dequeued');
        });
    } else {
        $self->collection->remove({'_id' => $id});
        $self->emit_safe(dequeued => $id_or_job) if $self->has_subscribers('dequeued');
    }
}

sub get {
    my ($self, $id_or_job, $callback) = @_;

    my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job;

    if($callback) {
        return $self->collection->find_one({'_id' => $id} => sub {
            my ($collection, $error, $doc) = @_;

            if($error) {
                $self->emit_safe(error => qq(Error retrieving job: $error), $id_or_job, $error) if $self->has_subscribers('error');
            }

            $self->run_callback($callback, $doc, $error);
        });
    } else {
        return $self->collection->find_one({'_id' => $id});
    }
}

sub update {
    my ($self, $job, $callback) = @_;

    # FIXME Temporary fix to remove has_finished indicator from MangoX::Queue::Job
    $job = { map { $_ => $job->{$_} } grep { $_ !~ /^(?:has_finished|events)$/ } keys %$job };
    $job->{_id} = Mango::BSON::ObjectID->new($job->{_id}) if $self->no_binary_oid;

    if($callback) {
        return $self->collection->update({'_id' => $job->{_id}}, $job => sub {
            my ($collection, $error, $doc) = @_;
            if($error) {
                $self->emit_safe(error => qq(Error updating job: $error), $job, $error) if $self->has_subscribers('error');
            }
            $self->run_callback($callback, $doc, $error);
        });
    } else {
        return $self->collection->update({'_id' => $job->{_id}}, $job, {upsert => 1}) or croak qq{Error updating collection: $@};
    }
}

sub fetch {
    my ($self, @args) = @_;

    # fetch $queue status => 'Complete', sub { my $job = shift; }

    my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef;
    my %args;
    %args = (@args) if scalar @args;

    $self->log->debug("In fetch");

    if($callback) {
        $self->log->debug("Fetching in non-blocking mode");
        my $consumer_id = (scalar keys %{$self->consumers}) + 1;
        $self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking(\%args, $consumer_id, $callback, 1) });
        return $consumer_id;
    } else {
        $self->log->debug("Fetching in blocking mode");
        return $self->_consume_blocking(\%args, 1);
    }
}

sub consume {
    my ($self, @args) = @_;

    # consume $queue status => 'Failed', sub { my $job = shift; }

    my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef;
    my %args;
    %args = (@args) if scalar @args;

    $self->log->debug("In consume");

    if($callback) {
        $self->log->debug("consuming in non-blocking mode");
        my $consumer_id = (scalar keys %{$self->consumers}) + 1;
        $self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking(\%args, $consumer_id, $callback, 0) });
        $self->log->debug("Timer scheduled, consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id});
        return $consumer_id;
    } else {
        $self->log->debug("consuming in blocking mode");



( run in 0.881 second using v1.01-cache-2.11-cpan-0d23b851a93 )