AnyEvent-Beanstalk-Worker
view release on metacpan or search on metacpan
lib/AnyEvent/Beanstalk/Worker.pm view on Meta::CPAN
AnyEvent->condvar(
cb => sub {
if ( ref( $self->{_cb}->{$evt} ) eq 'CODE' ) {
$self->{_log}->{trace}->("event: $evt");
my @data = $_[0]->recv;
$self->{_log}->{debug}->(
"shift event ($evt): " . shift @{ $self->{_events} } );
$self->{_log}->{debug}->(
"EVENTS (s): " . join( ' ' => @{ $self->{_events} } ) );
$self->{_cb}->{$evt}->(@data);
}
$self->{_event}->{$evt} = AnyEvent->condvar( cb => __SUB__ );
}
);
}
->($event);
}
sub emit {
my $self = shift;
my $event = shift;
$self->{_log}->{debug}->("push event ($event)");
push @{ $self->{_events} }, $event;
$self->{_log}->{debug}
->( "EVENTS (p): " . join( ' ' => @{ $self->{_events} } ) );
$self->{_event}->{$event}->send( $self, @_ );
}
sub beanstalk {
my $self = shift;
$self->{_beanstalk} = AnyEvent::Beanstalk->new(@_) if @_;
return $self->{_beanstalk};
}
sub job_count { scalar keys %{ $_[0]->{_jobs} } }
sub handled_jobs { $_[0]->{_handled_jobs} }
sub max_jobs { $_[0]->{_max_jobs} }
sub concurrency {
my $self = shift;
if (@_) {
$self->{_concurrency} = shift;
}
return $self->{_concurrency};
}
1;
__END__
=head1 NAME
AnyEvent::Beanstalk::Worker - Event-driven FSA for beanstalk queues
=head1 SYNOPSIS
use AnyEvent::Beanstalk::Worker;
use Data::Dumper;
use JSON;
my $w = AnyEvent::Beanstalk::Worker->new(
concurrency => 10,
initial_state => 'reserved',
beanstalk_watch => 'jobs',
beanstalk_decoder => sub {
eval { decode_json(shift) };
}
);
$w->on(reserved => sub {
my $self = shift;
my ($qjob, $qresp) = @_;
say "Got a job: " . Dumper($qjob->decode);
shift->emit( my_next_state => $qjob );
});
$w->on(my_next_state => sub {
my $self = shift;
my $job = shift;
## do something with job
...
## maybe not ready yet?
unless ($job_is_ready) {
return $self->finish(release => $job->id, { delay => 60 });
}
## all done!
$self->finish(delete => $job->id);
});
$w->start;
AnyEvent->condvar->recv;
=head1 DESCRIPTION
B<AnyEvent::Beanstalk::Worker> implements a simple, abstract
finite-state automaton for beanstalk queues. It can handle a
configurable number of concurrent jobs, and implements graceful worker
shutdown.
You are encouraged to subclass B<AnyEvent::Beanstalk::Worker> and
implement your own B<init> function, for example, so your object has
access to anything you need in subsequent states.
The L</SUPPLEMENTAL> section below contains additional information
about the various technolgies this module uses.
=head1 METHODS
B<AnyEvent::Beanstalk::Worker> implements these methods:
=head2 new
Create a new object. The B<new> method accepts the following arguments:
( run in 1.258 second using v1.01-cache-2.11-cpan-75ffa21a3d4 )