Beanstalk-Client

 view release on metacpan or  search on metacpan

lib/Beanstalk/Client.pm  view on Meta::CPAN


sub _peek {
  my $self = shift;
  my $cmd  = shift;

  my @resp = _interact($self, $cmd)
    or return undef;

  if ($resp[0] eq 'FOUND') {
    my $data = _recv_data($self, $resp[2]);
    return undef unless defined $data;
    return Beanstalk::Job->new(
      { id     => $resp[1],
        client => $self,
        data   => $data,
      }
    );
  }

  $self->error(join ' ', @resp);
  return undef;
}

sub __watching {
  my $self = shift;
  my $watching = $self->_watching;
  return $watching if $watching;
  $self->list_tubes_watched;
  $self->_watching;
}

# use namespace::clean;

sub new {
  my $proto  = shift;
  my $fields = shift || {};
  my $self   = $proto->SUPER::new(
    { delay    => 0,
      ttr      => 120,
      priority => 10_000,
      encoder  => \&YAML::Syck::Dump,
      decoder  => \&YAML::Syck::Load,
      %$fields,
    }
  );
  $self->{_recv_buffer} = '';
  $self;
}


sub connect {
  my $self   = shift;
  my $server = $self->server || "127.0.0.1";

  $server .= ":11300" unless $server =~ /:/;

  my $timeout = $self->connect_timeout;

  my $sock = IO::Socket::INET->new(
    PeerAddr => $server,
    Timeout  => $timeout,
  );

  unless ($sock) {
    $self->error("connect: $@");
    return $self->disconnect;
  }

  $self->socket($sock);

  my $was_watching = $self->_watching;
  my $was_using = $self->_using;

  $self->list_tubes_watched;
  if ($was_watching) {
    $self->watch_only(keys %$was_watching)
      or return $self->disconnect;
  }
  elsif (my $default_tube = $self->default_tube) {
    $self->use($default_tube) && $self->watch_only($default_tube)
      or return $self->disconnect;
  }

  if (defined $was_using) {
    $self->use($was_using)
      or return $self->disconnect;
  }

  $sock;
}


sub disconnect {
  my $self = shift;
  if (my $sock = $self->socket) {
    close($sock);
  }
  $self->socket(undef);
}

sub quit {
  shift->disconnect;
  return 1;
}


sub put {
  my $self = shift;
  my $opt  = shift || {};

  my $pri   = exists $opt->{priority} ? $opt->{priority} : $self->priority;
  my $ttr   = exists $opt->{ttr}      ? $opt->{ttr}      : $self->ttr;
  my $delay = exists $opt->{delay}    ? $opt->{delay}    : $self->delay;
  my $data  = exists $opt->{data}     ? $opt->{data}     : $self->encoder->(@_);

  utf8::encode($data) if utf8::is_utf8($data);    # need bytes

  my $bytes = length($data);

  my @resp = _interact($self, "put $pri $delay $ttr $bytes", $data)
    or return undef;



( run in 1.104 second using v1.01-cache-2.11-cpan-39bf76dae61 )