AnyEvent-Beanstalk-Worker
view release on metacpan or search on metacpan
lib/AnyEvent/Beanstalk/Worker.pm view on Meta::CPAN
$self->{_release_delay} = $args{release_delay} || 3;
$self->{_initial_state} = $args{initial_state};
$self->{_log_ctx} = AnyEvent::Log::ctx;
$self->{_log_ctx}->title(__PACKAGE__);
$self->{_log_ctx}->level($self->{_log_level});
$self->{_log} = {};
$self->{_log}->{trace} = $self->{_log_ctx}->logger("trace");
$self->{_log}->{debug} = $self->{_log_ctx}->logger("debug");
$self->{_log}->{info} = $self->{_log_ctx}->logger("info");
$self->{_log}->{note} = $self->{_log_ctx}->logger("note");
$self->{_signal} = {};
$self->{_signal}->{TERM} = AnyEvent->signal(
signal => "TERM",
cb =>
sub { $self->{_log_ctx}->log( warn => "TERM received" ); $self->stop }
);
$self->{_signal}->{INT} = AnyEvent->signal(
lib/AnyEvent/Beanstalk/Worker.pm view on Meta::CPAN
$self->{_cb}->{$event} = $cb;
$self->{_event}->{$event} = sub {
my $evt = shift;
AnyEvent->condvar(
cb => sub {
if ( ref( $self->{_cb}->{$evt} ) eq 'CODE' ) {
$self->{_log}->{trace}->("event: $evt");
my @data = $_[0]->recv;
$self->{_log}->{debug}->(
"shift event ($evt): " . shift @{ $self->{_events} } );
$self->{_log}->{debug}->(
"EVENTS (s): " . join( ' ' => @{ $self->{_events} } ) );
$self->{_cb}->{$evt}->(@data);
}
$self->{_event}->{$evt} = AnyEvent->condvar( cb => __SUB__ );
}
);
}
->($event);
}
sub emit {
my $self = shift;
my $event = shift;
$self->{_log}->{debug}->("push event ($event)");
push @{ $self->{_events} }, $event;
$self->{_log}->{debug}
->( "EVENTS (p): " . join( ' ' => @{ $self->{_events} } ) );
$self->{_event}->{$event}->send( $self, @_ );
}
sub beanstalk {
my $self = shift;
$self->{_beanstalk} = AnyEvent::Beanstalk->new(@_) if @_;
return $self->{_beanstalk};
}
( run in 0.485 second using v1.01-cache-2.11-cpan-3cd7ad12f66 )