AnyEvent-Beanstalk-Worker
view release on metacpan or search on metacpan
lib/AnyEvent/Beanstalk/Worker.pm view on Meta::CPAN
package AnyEvent::Beanstalk::Worker;
use 5.016001;
use strict;
use warnings;
use feature 'current_sub';
use AnyEvent;
use AnyEvent::Log;
use AnyEvent::Beanstalk;
our $VERSION = '0.05';
sub new {
my $class = shift;
my $self = {};
bless $self => $class;
my %args = @_;
$self->{_cb} = {};
$self->{_event} = {};
$self->{_jobs} = {};
$self->{_events} = []; ## event queue
$self->{_handled_jobs} = 0; ## simple job counter
$self->{_running} = 0;
$self->{_stop_tries} = 0;
$self->{_max_stop_tries} = $args{max_stop_tries} // 3;
$self->{_max_jobs} = $args{max_jobs} || 0;
$self->{_concurrency} = $args{concurrency} || 1;
$self->{_log_level} = $args{log_level} // 4;
$self->{_reserve_timeout} = $args{reserve_timeout} || 1;
$self->{_reserve_base} = $self->{_reserve_timeout};
$self->{_reserve_timeout_factor} = 1.1;
$self->{_reserve_timeout_max} = 4;
$self->{_release_delay} = $args{release_delay} || 3;
$self->{_initial_state} = $args{initial_state};
$self->{_log_ctx} = AnyEvent::Log::ctx;
$self->{_log_ctx}->title(__PACKAGE__);
$self->{_log_ctx}->level($self->{_log_level});
$self->{_log} = {};
$self->{_log}->{trace} = $self->{_log_ctx}->logger("trace");
$self->{_log}->{debug} = $self->{_log_ctx}->logger("debug");
$self->{_log}->{info} = $self->{_log_ctx}->logger("info");
$self->{_log}->{note} = $self->{_log_ctx}->logger("note");
$self->{_signal} = {};
$self->{_signal}->{TERM} = AnyEvent->signal(
signal => "TERM",
cb =>
sub { $self->{_log_ctx}->log( warn => "TERM received" ); $self->stop }
);
$self->{_signal}->{INT} = AnyEvent->signal(
signal => "INT",
cb =>
sub { $self->{_log_ctx}->log( warn => "INT received" ); $self->stop }
);
$self->{_signal}->{USR2} = AnyEvent->signal(
signal => "USR2",
cb => sub {
$self->{_log_level} =
( $self->{_log_level} >= 9 ? 2 : $self->{_log_level} + 1 );
$self->{_log_ctx}->level($self->{_log_level});
}
);
$args{beanstalk_host} ||= 'localhost';
$args{beanstalk_port} ||= 11300;
unless ($args{beanstalk_watch}) {
die "beanstalk_watch argument required\n";
}
$self->beanstalk(
server => $args{beanstalk_host} . ':' . $args{beanstalk_port},
decoder => $args{beanstalk_decoder}
);
$self->beanstalk->watch( $args{beanstalk_watch} )->recv;
$self->on(
start => sub {
my $self = shift;
my $reason = shift || '(unknown)';
$self->{_log}->{trace}->("in start: $reason");
unless ( $self->{_running} ) {
$self->{_log}->{trace}->("worker is not running");
return;
}
unless ( $self->job_count < $self->concurrency ) {
$self->{_log}->{trace}->( "worker running "
. $self->job_count
. " jobs; will not accept more jobs until others finish"
);
return;
}
if ( $self->max_jobs and $self->handled_jobs >= $self->max_jobs ) {
$self->{_log}->{info}->("Handled " . $self->handled_jobs . "; will not accept more jobs");
return;
lib/AnyEvent/Beanstalk/Worker.pm view on Meta::CPAN
## we've been waiting for a slot to free up
$self->emit( start => ('finish sub') );
}
$self->{_log}->{info}
->( "finished with $job_id ($action); outstanding jobs: "
. $self->job_count );
# $cb->($job_id);
## we're done
if ( $self->max_jobs
and $self->handled_jobs >= $self->max_jobs
and ! $self->job_count ) {
$self->{_log}->{info}->("Handled " . $self->handled_jobs . "; quitting");
return $self->stop;
}
};
eval {
$self->beanstalk->$action( $job_id, ( $args ? $args : () ), $internal );
};
$self->{_log_ctx}->log(
error => "first argument to finish() must be a beanstalk command: $@" )
if $@;
}
sub stop {
my $self = shift;
$self->{_stop_tries}++;
if ( $self->{_stop_tries} >= $self->{_max_stop_tries} ) {
$self->{_log_ctx}->log(
warn => "stop requested; impatiently quitting outstanding jobs" );
exit;
}
if ( $self->job_count ) {
$self->{_log_ctx}
->log( warn => "stop requested; waiting for outstanding jobs" );
return;
}
$self->{_log_ctx}->log( fatal => "exiting" );
exit;
}
sub on {
my ( $self, $event, $cb ) = @_;
$self->{_cb}->{$event} = $cb;
$self->{_event}->{$event} = sub {
my $evt = shift;
AnyEvent->condvar(
cb => sub {
if ( ref( $self->{_cb}->{$evt} ) eq 'CODE' ) {
$self->{_log}->{trace}->("event: $evt");
my @data = $_[0]->recv;
$self->{_log}->{debug}->(
"shift event ($evt): " . shift @{ $self->{_events} } );
$self->{_log}->{debug}->(
"EVENTS (s): " . join( ' ' => @{ $self->{_events} } ) );
$self->{_cb}->{$evt}->(@data);
}
$self->{_event}->{$evt} = AnyEvent->condvar( cb => __SUB__ );
}
);
}
->($event);
}
sub emit {
my $self = shift;
my $event = shift;
$self->{_log}->{debug}->("push event ($event)");
push @{ $self->{_events} }, $event;
$self->{_log}->{debug}
->( "EVENTS (p): " . join( ' ' => @{ $self->{_events} } ) );
$self->{_event}->{$event}->send( $self, @_ );
}
sub beanstalk {
my $self = shift;
$self->{_beanstalk} = AnyEvent::Beanstalk->new(@_) if @_;
return $self->{_beanstalk};
}
sub job_count { scalar keys %{ $_[0]->{_jobs} } }
sub handled_jobs { $_[0]->{_handled_jobs} }
sub max_jobs { $_[0]->{_max_jobs} }
sub concurrency {
my $self = shift;
if (@_) {
$self->{_concurrency} = shift;
}
return $self->{_concurrency};
}
1;
__END__
=head1 NAME
AnyEvent::Beanstalk::Worker - Event-driven FSA for beanstalk queues
=head1 SYNOPSIS
use AnyEvent::Beanstalk::Worker;
use Data::Dumper;
use JSON;
my $w = AnyEvent::Beanstalk::Worker->new(
concurrency => 10,
initial_state => 'reserved',
beanstalk_watch => 'jobs',
beanstalk_decoder => sub {
eval { decode_json(shift) };
}
);
$w->on(reserved => sub {
my $self = shift;
my ($qjob, $qresp) = @_;
say "Got a job: " . Dumper($qjob->decode);
shift->emit( my_next_state => $qjob );
});
$w->on(my_next_state => sub {
my $self = shift;
my $job = shift;
( run in 2.671 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )