Gearman-Client-Async

 view release on metacpan or  search on metacpan

lib/Gearman/Client/Async/Connection.pm  view on Meta::CPAN

    $self->{on_ready} = [];
    $self->{on_error} = [];
}

sub event_read {
    my Gearman::Client::Async::Connection $self = shift;

    my $input = $self->read( 128 * 1024 );
    unless (defined $input) {
        $self->mark_dead if $self->stuff_outstanding;
        $self->close( "EOF" );
        return;
    }

    $self->{parser}->parse_data( $input );
}

sub event_err {
    my Gearman::Client::Async::Connection $self = shift;

    my $was_connecting = ($self->{state} == S_CONNECTING);

    if ($was_connecting && $self->{t_offline}) {
        $self->SUPER::close( "error" );
        return;
    }

    $self->mark_dead;
    $self->close( "error" );
    $self->on_connect_error if $was_connecting;
}

sub on_connect_error {
    my Gearman::Client::Async::Connection $self = shift;
    warn "Jobserver, $self->{hostspec} ($self) has failed to connect properly\n" if DEBUGGING;

    $self->mark_dead;
    $self->close( "error" );
    $_->() foreach @{$self->{on_error}};
    $self->destroy_callbacks;
}

sub close {
    my Gearman::Client::Async::Connection $self = shift;
    my $reason = shift;

    if ($self->{state} != S_DISCONNECTED) {
        $self->{state} = S_DISCONNECTED;
        $self->SUPER::close( $reason );
    }

    $self->_requeue_all;
}

sub mark_dead {
    my Gearman::Client::Async::Connection $self = shift;
    $self->{deadtime} = time + 10;
    warn "$self->{hostspec} marked dead for a bit." if DEBUGGING;
}

sub alive {
    my Gearman::Client::Async::Connection $self = shift;
    return $self->{deadtime} <= time;
}

sub add_task {
    my Gearman::Client::Async::Connection $self = shift;
    my Gearman::Task $task = shift;

    Carp::confess("add_task called when in wrong state")
        unless $self->{state} == S_READY;

    warn "writing task $task to $self->{hostspec}\n" if DEBUGGING;

    $self->write( $task->pack_submit_packet );
    push @{$self->{need_handle}}, $task;
    Scalar::Util::weaken($self->{need_handle}->[-1]);
}

sub stuff_outstanding {
    my Gearman::Client::Async::Connection $self = shift;
    return
        @{$self->{need_handle}} ||
        %{$self->{waiting}};
}

sub _requeue_all {
    my Gearman::Client::Async::Connection $self = shift;

    my $need_handle = $self->{need_handle};
    my $waiting     = $self->{waiting};

    $self->{need_handle} = [];
    $self->{waiting}     = {};

    while (@$need_handle) {
        my $task = shift @$need_handle;
        warn "Task $task in need_handle queue during socket error, queueing for redispatch\n" if DEBUGGING;
        $task->fail if $task;
    }

    while (my ($shandle, $tasklist) = each( %$waiting )) {
        foreach my $task (@$tasklist) {
            warn "Task $task ($shandle) in waiting queue during socket error, queueing for redispatch\n" if DEBUGGING;
            $task->fail;
        }
    }
}

sub process_packet {
    my Gearman::Client::Async::Connection $self = shift;
    my $res = shift;

    warn "Got packet '$res->{type}' from $self->{hostspec}\n" if DEBUGGING;

    if ($res->{type} eq "job_created") {

        die "Um, got an unexpected job_created notification" unless @{ $self->{need_handle} };
        my Gearman::Task $task = shift @{ $self->{need_handle} } or
            return 1;



( run in 1.088 second using v1.01-cache-2.11-cpan-39bf76dae61 )