IPC-AnyEvent-Gearman

 view release on metacpan or  search on metacpan

lib/AnyEvent/Gearman/Worker/RetryConnection.pm  view on Meta::CPAN

package AnyEvent::Gearman::Worker::RetryConnection;

# ABSTRACT: patching AnyEvent::Gearman::Worker for retrying support

our $VERSION = '0.8'; # VERSION 

use namespace::autoclean;

use Log::Log4perl qw(:easy);
Log::Log4perl->easy_init($ERROR);

use Scalar::Util 'weaken';
use AnyEvent;
use AnyEvent::Socket;
use AnyEvent::Handle;
use Any::Moose;

use Data::Dumper;

has retrying=>(is=>'rw',isa=>'Int',clearer=>'reset_retry',default=>sub{0});
has retry_timer=>(is=>'rw',isa=>'Object',clearer=>'reset_timer');
has registered=>(is=>'ro',isa=>'HashRef',default=>sub{return {};});

has retry_interval=>(is=>'rw',isa=>'Int',default=>sub{1});

extends 'AnyEvent::Gearman::Worker::Connection';
override connect=>sub{
    my ($self) = @_;
 
    # already connected
    return if $self->handler;
 
    my $g = tcp_connect $self->_host, $self->_port, sub {
        my ($fh) = @_;
 
        if ($fh) {
            my $handle = AnyEvent::Handle->new(
                fh       => $fh,
                on_read  => sub { $self->process_packet },
                on_error => sub {
                    my ($hdl, $fatal, $msg) = @_;

                    DEBUG $fatal;
                    DEBUG $msg;

                    my @undone = @{ $self->_need_handle },
                                 values %{ $self->_job_handles };
                    $_->event('on_fail') for @undone;
 
                    $self->_need_handle([]);
                    $self->_job_handles({});
                    $self->mark_dead;
                    
                    $self->retry_connect();
                },
            );
 
            $self->handler( $handle );
            $_->() for map { $_->[0] } @{ $self->on_connect_callbacks };
            
            DEBUG "connected"; 
            if( $self->retrying )
            {
                foreach my $key (keys %{$self->registered})
                {
                    DEBUG "re-register '".$key."'";
                    $self->register_function($key,$self->registered->{$key},1);
                }
            }
            $self->reset_retry;
            $self->reset_timer;
        }
        else {
            $self->retry_connect;
            return;
        }
 
        $self->on_connect_callbacks( [] );
    };
 
    weaken $self;
    $self->_con_guard($g);



( run in 1.153 second using v1.01-cache-2.11-cpan-39bf76dae61 )