AnyEvent-Beanstalk

 view release on metacpan or  search on metacpan

lib/AnyEvent/Beanstalk.pm  view on Meta::CPAN

  my %tubes = map { ($_ => 1) } @_;
  my $done = sub {
    delete $self->{_condvar}{$cv};
    $cv->send(@_);
  };
  $self->list_tubes_watched(
    sub {
      my ($tubes,$r) = @_;
      return $done->(@_) unless $r and $r =~ /^OK\b/;
      my $w = $self->{__watching} = {};
      foreach my $t (@$tubes) {
        $tubes{$t} = 0 unless delete $tubes{$t};
        $w->{$t}++;
      }
      unless (keys %tubes) {    # nothing to do
          my $ts = scalar @$tubes;
          $done->($ts, "WATCHING $ts");
      }
      my @err;    # first error
      foreach my $t (sort { $tubes{$b} <=> $tubes{$a} } keys %tubes) {
        my $cmd = $tubes{$t} ? 'watch' : 'ignore';
        $self->run_cmd(
          $cmd, $t,
          sub {
            if ($_[1] and $_[1] =~ /^WATCHING\b/) {
              $tubes{$t} ? $w->{$t}++ : delete $w->{$t};
            } else {
              @err = @_ unless @err;
            }
            delete $tubes{$t};
            return $done->(@err ? @err : @_)
              unless keys %tubes;
          }
        );
      }
    }
  );

  $cv;
}


sub put {
  my $self = shift;
  my @cb   = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
  my $opt  = shift || {};

  my $pri   = exists $opt->{priority} ? $opt->{priority} : $self->priority;
  my $ttr   = exists $opt->{ttr}      ? $opt->{ttr}      : $self->ttr;
  my $delay = exists $opt->{delay}    ? $opt->{delay}    : $self->delay;
  my $data =
      exists $opt->{data}   ? $opt->{data}
    : exists $opt->{encode} ? $self->encoder->($opt->{encode})
    :                         '';

  $pri   = int($pri   || 0) || 1;
  $ttr   = int($ttr   || 0) || 1;
  $delay = int($delay || 0) || 0;
  $data = '' unless defined $data;

  utf8::encode($data) if utf8::is_utf8($data);    # need bytes

  $self->run_cmd('put' => $pri, $delay, $ttr, length($data), $data, @cb);
}


sub stats {
  my $self = shift;
  my @cb = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
  $self->run_cmd('stats' => @cb);
}


sub stats_tube {
  my $self = shift;
  my @cb   = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
  my $tube = shift;
  $self->run_cmd('stats-tube' => $tube, @cb);
}


sub stats_job {
  my $self = shift;
  my @cb   = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
  my $id   = shift || 0;
  $self->run_cmd('stats-job' => $id, @cb);
}


sub kick {
  my $self  = shift;
  my @cb    = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
  my $bound = shift || 1;
  $self->run_cmd('kick' => $bound, @cb);
}


sub kick_job {
  my $self  = shift;
  my @cb    = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
  my $id    = shift || 0;
  $self->run_cmd('kick-job' => $id, @cb);
}

sub use {
  my $self = shift;
  my @cb   = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();
  my $tube = shift;
  $self->run_cmd(
    'use' => $tube,
    sub {
      $self->{__using} = $_[0] if @_ and $_[1] =~ /^USING\b/;
      $cb[0]->(@_) if @cb;
    }
  );
}


sub reserve {
  my $self    = shift;
  my @cb      = (@_ and ref($_[-1]) eq 'CODE') ? splice(@_, -1) : ();



( run in 2.046 seconds using v1.01-cache-2.11-cpan-5837b0d9d2c )