MangoX-Queue
view release on metacpan or search on metacpan
});
return undef;
}
});
}
sub requeue {
my ($self, $job, $callback) = @_;
my $pending = $self->{_pending_status} // $self->init_status->{_pending_status};
$job->{status} = ref($pending) eq 'ARRAY' ? $pending->[0] : $pending;
return $self->update($job, $callback);
}
sub dequeue {
my ($self, $id_or_job, $callback) = @_;
# TODO option to not remove on dequeue?
my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job;
if($callback) {
$self->collection->remove({'_id' => $id} => sub {
my ($collection, $error, $doc) = @_;
if($error) {
$self->emit_safe(error => qq(Error removing job from collection: $error), $id_or_job, $error) if $self->has_subscribers('error');
$self->run_callback($callback, $id_or_job, $error);
return;
}
$self->run_callback($callback, $id_or_job, undef);
$self->emit_safe(dequeued => $id_or_job) if $self->has_subscribers('dequeued');
});
} else {
$self->collection->remove({'_id' => $id});
$self->emit_safe(dequeued => $id_or_job) if $self->has_subscribers('dequeued');
}
}
sub get {
my ($self, $id_or_job, $callback) = @_;
my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job;
if($callback) {
return $self->collection->find_one({'_id' => $id} => sub {
my ($collection, $error, $doc) = @_;
if($error) {
$self->emit_safe(error => qq(Error retrieving job: $error), $id_or_job, $error) if $self->has_subscribers('error');
}
$self->run_callback($callback, $doc, $error);
});
} else {
return $self->collection->find_one({'_id' => $id});
}
}
sub update {
my ($self, $job, $callback) = @_;
# FIXME Temporary fix to remove has_finished indicator from MangoX::Queue::Job
$job = { map { $_ => $job->{$_} } grep { $_ !~ /^(?:has_finished|events)$/ } keys %$job };
$job->{_id} = Mango::BSON::ObjectID->new($job->{_id}) if $self->no_binary_oid;
if($callback) {
return $self->collection->update({'_id' => $job->{_id}}, $job => sub {
my ($collection, $error, $doc) = @_;
if($error) {
$self->emit_safe(error => qq(Error updating job: $error), $job, $error) if $self->has_subscribers('error');
}
$self->run_callback($callback, $doc, $error);
});
} else {
return $self->collection->update({'_id' => $job->{_id}}, $job, {upsert => 1}) or croak qq{Error updating collection: $@};
}
}
sub fetch {
my ($self, @args) = @_;
# fetch $queue status => 'Complete', sub { my $job = shift; }
my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef;
my %args;
%args = (@args) if scalar @args;
$self->log->debug("In fetch");
if($callback) {
$self->log->debug("Fetching in non-blocking mode");
my $consumer_id = (scalar keys %{$self->consumers}) + 1;
$self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking(\%args, $consumer_id, $callback, 1) });
return $consumer_id;
} else {
$self->log->debug("Fetching in blocking mode");
return $self->_consume_blocking(\%args, 1);
}
}
sub consume {
my ($self, @args) = @_;
# consume $queue status => 'Failed', sub { my $job = shift; }
my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef;
my %args;
%args = (@args) if scalar @args;
$self->log->debug("In consume");
if($callback) {
$self->log->debug("consuming in non-blocking mode");
my $consumer_id = (scalar keys %{$self->consumers}) + 1;
$self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking(\%args, $consumer_id, $callback, 0) });
$self->log->debug("Timer scheduled, consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id});
return $consumer_id;
} else {
$self->log->debug("consuming in blocking mode");
( run in 0.881 second using v1.01-cache-2.11-cpan-0d23b851a93 )