AnyEvent-Beanstalk
view release on metacpan or search on metacpan
lib/AnyEvent/Beanstalk.pm view on Meta::CPAN
my %tubes = map { ($_ => 1) } @_;
my $done = sub {
delete $self->{_condvar}{$cv};
$cv->send(@_);
};
$self->list_tubes_watched(
sub {
my ($tubes,$r) = @_;
return $done->(@_) unless $r and $r =~ /^OK\b/;
my $w = $self->{__watching} = {};
foreach my $t (@$tubes) {
$tubes{$t} = 0 unless delete $tubes{$t};
$w->{$t}++;
}
unless (keys %tubes) { # nothing to do
my $ts = scalar @$tubes;
$done->($ts, "WATCHING $ts");
}
my @err; # first error
foreach my $t (sort { $tubes{$b} <=> $tubes{$a} } keys %tubes) {
my $cmd = $tubes{$t} ? 'watch' : 'ignore';
$self->run_cmd(
$cmd, $t,
sub {
if ($_[1] and $_[1] =~ /^WATCHING\b/) {
$tubes{$t} ? $w->{$t}++ : delete $w->{$t};
} else {
@err = @_ unless @err;
}
delete $tubes{$t};
return $done->(@err ? @err : @_)
unless keys %tubes;
}
);
}
}
);
$cv;
}
sub put {
my $self = shift;
my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
my $opt = shift || {};
my $pri = exists $opt->{priority} ? $opt->{priority} : $self->priority;
my $ttr = exists $opt->{ttr} ? $opt->{ttr} : $self->ttr;
my $delay = exists $opt->{delay} ? $opt->{delay} : $self->delay;
my $data =
exists $opt->{data} ? $opt->{data}
: exists $opt->{encode} ? $self->encoder->($opt->{encode})
: '';
$pri = int($pri || 0) || 1;
$ttr = int($ttr || 0) || 1;
$delay = int($delay || 0) || 0;
$data = '' unless defined $data;
utf8::encode($data) if utf8::is_utf8($data); # need bytes
$self->run_cmd('put' => $pri, $delay, $ttr, length($data), $data, @cb);
}
sub stats {
my $self = shift;
my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
$self->run_cmd('stats' => @cb);
}
sub stats_tube {
my $self = shift;
my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
my $tube = shift;
$self->run_cmd('stats-tube' => $tube, @cb);
}
sub stats_job {
my $self = shift;
my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
my $id = shift || 0;
$self->run_cmd('stats-job' => $id, @cb);
}
sub kick {
my $self = shift;
my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
my $bound = shift || 1;
$self->run_cmd('kick' => $bound, @cb);
}
sub kick_job {
my $self = shift;
my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
my $id = shift || 0;
$self->run_cmd('kick-job' => $id, @cb);
}
sub use {
my $self = shift;
my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
my $tube = shift;
$self->run_cmd(
'use' => $tube,
sub {
$self->{__using} = $_[0] if @_ and $_[1] =~ /^USING\b/;
$cb[0]->(@_) if @cb;
}
);
}
sub reserve {
my $self = shift;
my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
( run in 2.046 seconds using v1.01-cache-2.11-cpan-5837b0d9d2c )