AnyEvent-Beanstalk
view release on metacpan or search on metacpan
lib/AnyEvent/Beanstalk.pm view on Meta::CPAN
package AnyEvent::Beanstalk;
$AnyEvent::Beanstalk::VERSION = '1.170590';
use strict;
use warnings;
use constant DEBUG => $ENV{AE_BEANSTALK_DEBUG};
use Scalar::Util qw(blessed);
use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;
use AnyEvent::Beanstalk::Job;
use AnyEvent::Beanstalk::Stats;
use base qw(Class::Accessor::Fast);
__PACKAGE__->mk_accessors(
qw< decoder delay encoder on_error on_connect priority server socket ttr > ##
);
my $YAML_CLASS = do {
local ($SIG{__DIE__}, $SIG{__WARN__});
eval { require YAML::XS } ? 'YAML::XS'
: eval { require YAML::Syck } ? 'YAML::Syck'
: eval { require YAML } ? 'YAML'
: die $@;
};
my $YAML_LOAD = $YAML_CLASS->can('Load');
my $YAML_DUMP = $YAML_CLASS->can('Dump');
sub new {
my $proto = shift;
my %arg = @_;
bless(
{ delay => $arg{delay} || 0,
ttr => $arg{ttr} || 120,
priority => $arg{priority} || 10_000,
encoder => $arg{encoder} || $YAML_DUMP,
decoder => $arg{decoder} || $YAML_LOAD,
server => $arg{server} || undef,
debug => $arg{debug} || 0,
on_error => $arg{on_error} || undef,
on_connect => $arg{on_connect} || undef,
},
ref($proto) || $proto
);
}
sub run_cmd {
my $self = shift;
$self->{_cmd_cb} or return $self->connect(@_);
$self->{_cmd_cb}->(@_);
}
sub quit { shift->disconnect }
sub reserve_pending { shift->{_reserve_pending} || 0 }
sub disconnect {
my $self = shift;
my $condvar = delete $self->{_condvar};
delete @{$self}{grep {/^_[a-z]/} keys %$self};
if ($condvar) {
$_->send for values %$condvar;
}
return;
}
sub _error {
my $self = shift;
$self->disconnect;
($self->on_error || sub { die @_ })->(@_);
}
sub reconnect {
my $self = shift;
my $using = $self->{__using} || 'default';
$self->use(
$using,
sub {
$self->_error("Can't use '$using'") unless @_ and $_[0] eq 'USING';
}
);
my $watching = $self->{__watching} || {default => 1};
$self->watch_only(
keys %$watching,
sub {
$self->_error("Error watching tubes") unless @_ and $_[0] eq 'WATCHING';
}
);
}
my %EXPECT = qw(
put INSERTED
lib/AnyEvent/Beanstalk.pm view on Meta::CPAN
See the L<beanstalkd 1.3 protocol spec|http://github.com/kr/beanstalkd/blob/v1.3/doc/protocol.txt?raw=true>
for greater detail
=head1 METHODS
=head2 Constructor
=over
=item B<new (%options)>
Any of the attributes with accessor methods described below may be passed to the constructor as key-value pairs
=back
=head2 Attribute Accessor Methods
=over
=item B<server ([$hostname])>
Get/set the hostname, and port, to connect to. The port, which defaults to 11300, can be
specified by appending it to the hostname with a C<:> (eg C<"localhost:1234">).
(Default: C<localhost:11300>)
=item B<delay ([$delay])>
Set/get a default value, in seconds, for job delay. A job with a delay will be
placed into a delayed state and will not be placed into the ready queue until
the time period has passed. This value will be used by C<put> and C<release> as
a default. (Default: 0)
=item B<ttr ([$ttr])>
Set/get a default value, in seconds, for job ttr (time to run). This value will
be used by C<put> as a default. (Default: 120)
=item B<priority ([$priority])>
Set/get a default value for job priority. The highest priority job is the job
where the priority value is the lowest (ie jobs with a lower priority value are
run first). This value will be used by C<put>, C<release> and C<bury> as a
default. (Default: 10000)
=item B<encoder ([$encoder])>
Set/get serialization encoder. C<$encoder> is a reference to a subroutine
that will be called when arguments to C<put> need to be encoded to send
to the beanstalkd server. The subroutine should accept a single argument and
return a string representation to pass to the server. The default is to encode
the argument using YAML
=item B<decoder ([$decoder])>
Set/get the serialization decoder. C<$decoder> is a reference to a
subroutine that will be called when data from the beanstalkd server needs to be
decoded. The subroutine will be passed the data fetched from the beanstalkd
server and should return the value the application can use. The default is
to decode using YAML.
=item B<debug ([$debug])>
Set/get debug value. If set to a true value then all communication with the server will be
output with C<warn>
=item B<on_error ([$callback])>
A code reference to call when there is an error communicating with the server, for example
an enexpected EOF. A description will be passed as an argument. The default is to call
die
=item B<on_connect ([$callback])>
A code reference to call when the TCP connection has been established with the server
=back
=head2 Communication Methods
All methods that communicate with the server take an optional code reference as the last
argument and return a L<condition variable|AnyEvent/"CONDITION_VARIABLES">.
The condition variable C<recv> method will return 2 values. The first is specific to
the command that is being performed and is referred to below as the response value.
The second value returned by C<recv> is the first line of the protocol response.
If there is a protocol error the response value will be C<undef>.
If a callback is specified then the callback will be called with the same arguments
that the C<recv> method would return.
Calling C<recv> in a scalar context will only return the first of the two values.
If there is a communication error, then all condition variables will be triggered
with no values.
=head2 Producer Methods
These methods are used by clients that are placing work into the queue
=over
=item B<put ($options, [$callback])>
Insert job into the currently used tube.
The response value for a C<put> is a L<AnyEvent::Beanstalk::Job> object.
Options may be
=over
=item priority
priority to use to queue the job.
Jobs with smaller priority values will be
scheduled before jobs with larger priorities. The most urgent priority is 0
Defaults to the current value of the L<priority|/priority> attribute
=item delay
An integer number of seconds to wait before putting the job in
( run in 1.524 second using v1.01-cache-2.11-cpan-39bf76dae61 )