IPC-AnyEvent-Gearman
view release on metacpan or search on metacpan
lib/AnyEvent/Gearman/Worker/RetryConnection.pm view on Meta::CPAN
package AnyEvent::Gearman::Worker::RetryConnection;
# ABSTRACT: patching AnyEvent::Gearman::Worker for retrying support
our $VERSION = '0.8'; # VERSION
use namespace::autoclean;
use Log::Log4perl qw(:easy);
Log::Log4perl->easy_init($ERROR);
use Scalar::Util 'weaken';
use AnyEvent;
use AnyEvent::Socket;
use AnyEvent::Handle;
use Any::Moose;
use Data::Dumper;
has retrying=>(is=>'rw',isa=>'Int',clearer=>'reset_retry',default=>sub{0});
has retry_timer=>(is=>'rw',isa=>'Object',clearer=>'reset_timer');
has registered=>(is=>'ro',isa=>'HashRef',default=>sub{return {};});
has retry_interval=>(is=>'rw',isa=>'Int',default=>sub{1});
extends 'AnyEvent::Gearman::Worker::Connection';
override connect=>sub{
my ($self) = @_;
# already connected
return if $self->handler;
my $g = tcp_connect $self->_host, $self->_port, sub {
my ($fh) = @_;
if ($fh) {
my $handle = AnyEvent::Handle->new(
fh => $fh,
on_read => sub { $self->process_packet },
on_error => sub {
my ($hdl, $fatal, $msg) = @_;
DEBUG $fatal;
DEBUG $msg;
my @undone = @{ $self->_need_handle },
values %{ $self->_job_handles };
$_->event('on_fail') for @undone;
$self->_need_handle([]);
$self->_job_handles({});
$self->mark_dead;
$self->retry_connect();
},
);
$self->handler( $handle );
$_->() for map { $_->[0] } @{ $self->on_connect_callbacks };
DEBUG "connected";
if( $self->retrying )
{
foreach my $key (keys %{$self->registered})
{
DEBUG "re-register '".$key."'";
$self->register_function($key,$self->registered->{$key},1);
}
}
$self->reset_retry;
$self->reset_timer;
}
else {
$self->retry_connect;
return;
}
$self->on_connect_callbacks( [] );
};
weaken $self;
$self->_con_guard($g);
( run in 1.153 second using v1.01-cache-2.11-cpan-39bf76dae61 )