Gearman-Client-Async
view release on metacpan or search on metacpan
lib/Gearman/Client/Async/Connection.pm view on Meta::CPAN
$self->{on_ready} = [];
$self->{on_error} = [];
}
sub event_read {
my Gearman::Client::Async::Connection $self = shift;
my $input = $self->read( 128 * 1024 );
unless (defined $input) {
$self->mark_dead if $self->stuff_outstanding;
$self->close( "EOF" );
return;
}
$self->{parser}->parse_data( $input );
}
sub event_err {
my Gearman::Client::Async::Connection $self = shift;
my $was_connecting = ($self->{state} == S_CONNECTING);
if ($was_connecting && $self->{t_offline}) {
$self->SUPER::close( "error" );
return;
}
$self->mark_dead;
$self->close( "error" );
$self->on_connect_error if $was_connecting;
}
sub on_connect_error {
my Gearman::Client::Async::Connection $self = shift;
warn "Jobserver, $self->{hostspec} ($self) has failed to connect properly\n" if DEBUGGING;
$self->mark_dead;
$self->close( "error" );
$_->() foreach @{$self->{on_error}};
$self->destroy_callbacks;
}
sub close {
my Gearman::Client::Async::Connection $self = shift;
my $reason = shift;
if ($self->{state} != S_DISCONNECTED) {
$self->{state} = S_DISCONNECTED;
$self->SUPER::close( $reason );
}
$self->_requeue_all;
}
sub mark_dead {
my Gearman::Client::Async::Connection $self = shift;
$self->{deadtime} = time + 10;
warn "$self->{hostspec} marked dead for a bit." if DEBUGGING;
}
sub alive {
my Gearman::Client::Async::Connection $self = shift;
return $self->{deadtime} <= time;
}
sub add_task {
my Gearman::Client::Async::Connection $self = shift;
my Gearman::Task $task = shift;
Carp::confess("add_task called when in wrong state")
unless $self->{state} == S_READY;
warn "writing task $task to $self->{hostspec}\n" if DEBUGGING;
$self->write( $task->pack_submit_packet );
push @{$self->{need_handle}}, $task;
Scalar::Util::weaken($self->{need_handle}->[-1]);
}
sub stuff_outstanding {
my Gearman::Client::Async::Connection $self = shift;
return
@{$self->{need_handle}} ||
%{$self->{waiting}};
}
sub _requeue_all {
my Gearman::Client::Async::Connection $self = shift;
my $need_handle = $self->{need_handle};
my $waiting = $self->{waiting};
$self->{need_handle} = [];
$self->{waiting} = {};
while (@$need_handle) {
my $task = shift @$need_handle;
warn "Task $task in need_handle queue during socket error, queueing for redispatch\n" if DEBUGGING;
$task->fail if $task;
}
while (my ($shandle, $tasklist) = each( %$waiting )) {
foreach my $task (@$tasklist) {
warn "Task $task ($shandle) in waiting queue during socket error, queueing for redispatch\n" if DEBUGGING;
$task->fail;
}
}
}
sub process_packet {
my Gearman::Client::Async::Connection $self = shift;
my $res = shift;
warn "Got packet '$res->{type}' from $self->{hostspec}\n" if DEBUGGING;
if ($res->{type} eq "job_created") {
die "Um, got an unexpected job_created notification" unless @{ $self->{need_handle} };
my Gearman::Task $task = shift @{ $self->{need_handle} } or
return 1;
( run in 1.088 second using v1.01-cache-2.11-cpan-39bf76dae61 )