Beanstalk-Client
view release on metacpan or search on metacpan
lib/Beanstalk/Client.pm view on Meta::CPAN
sub _peek {
my $self = shift;
my $cmd = shift;
my @resp = _interact($self, $cmd)
or return undef;
if ($resp[0] eq 'FOUND') {
my $data = _recv_data($self, $resp[2]);
return undef unless defined $data;
return Beanstalk::Job->new(
{ id => $resp[1],
client => $self,
data => $data,
}
);
}
$self->error(join ' ', @resp);
return undef;
}
sub __watching {
my $self = shift;
my $watching = $self->_watching;
return $watching if $watching;
$self->list_tubes_watched;
$self->_watching;
}
# use namespace::clean;
sub new {
my $proto = shift;
my $fields = shift || {};
my $self = $proto->SUPER::new(
{ delay => 0,
ttr => 120,
priority => 10_000,
encoder => \&YAML::Syck::Dump,
decoder => \&YAML::Syck::Load,
%$fields,
}
);
$self->{_recv_buffer} = '';
$self;
}
sub connect {
my $self = shift;
my $server = $self->server || "127.0.0.1";
$server .= ":11300" unless $server =~ /:/;
my $timeout = $self->connect_timeout;
my $sock = IO::Socket::INET->new(
PeerAddr => $server,
Timeout => $timeout,
);
unless ($sock) {
$self->error("connect: $@");
return $self->disconnect;
}
$self->socket($sock);
my $was_watching = $self->_watching;
my $was_using = $self->_using;
$self->list_tubes_watched;
if ($was_watching) {
$self->watch_only(keys %$was_watching)
or return $self->disconnect;
}
elsif (my $default_tube = $self->default_tube) {
$self->use($default_tube) && $self->watch_only($default_tube)
or return $self->disconnect;
}
if (defined $was_using) {
$self->use($was_using)
or return $self->disconnect;
}
$sock;
}
sub disconnect {
my $self = shift;
if (my $sock = $self->socket) {
close($sock);
}
$self->socket(undef);
}
sub quit {
shift->disconnect;
return 1;
}
sub put {
my $self = shift;
my $opt = shift || {};
my $pri = exists $opt->{priority} ? $opt->{priority} : $self->priority;
my $ttr = exists $opt->{ttr} ? $opt->{ttr} : $self->ttr;
my $delay = exists $opt->{delay} ? $opt->{delay} : $self->delay;
my $data = exists $opt->{data} ? $opt->{data} : $self->encoder->(@_);
utf8::encode($data) if utf8::is_utf8($data); # need bytes
my $bytes = length($data);
my @resp = _interact($self, "put $pri $delay $ttr $bytes", $data)
or return undef;
( run in 1.104 second using v1.01-cache-2.11-cpan-39bf76dae61 )