Gearman-Client-Async

 view release on metacpan or  search on metacpan

lib/Gearman/Client/Async/Connection.pm  view on Meta::CPAN


our $T_ON_TIMEOUT;

use constant S_DISCONNECTED => \ "disconnected";
use constant S_CONNECTING   => \ "connecting";
use constant S_READY        => \ "ready";

use Carp qw(croak);
use Gearman::Task;
use Gearman::Util;
use Scalar::Util qw(weaken);

use IO::Handle;
use Socket qw(PF_INET IPPROTO_TCP TCP_NODELAY SOL_SOCKET SOCK_STREAM);

sub DEBUGGING () { 0 }

sub new {
    my Gearman::Client::Async::Connection $self = shift;

    my %opts = @_;

lib/Gearman/Client/Async/Connection.pm  view on Meta::CPAN

    my Gearman::Client::Async::Connection $self = shift;
    my Gearman::Task $task = shift;

    Carp::confess("add_task called when in wrong state")
        unless $self->{state} == S_READY;

    warn "writing task $task to $self->{hostspec}\n" if DEBUGGING;

    $self->write( $task->pack_submit_packet );
    push @{$self->{need_handle}}, $task;
    Scalar::Util::weaken($self->{need_handle}->[-1]);
}

sub stuff_outstanding {
    my Gearman::Client::Async::Connection $self = shift;
    return
        @{$self->{need_handle}} ||
        %{$self->{waiting}};
}

sub _requeue_all {

lib/Gearman/Client/Async/Connection.pm  view on Meta::CPAN

sub t_set_offline {
    my ($self, $val) = @_;
    $val = 1 unless defined $val;
    $self->{t_offline} = $val;
}

package Gearman::ResponseParser::Async;

use strict;
use warnings;
use Scalar::Util qw(weaken);

use Gearman::ResponseParser;
use base 'Gearman::ResponseParser';

sub new {
    my $class = shift;

    my $self = $class->SUPER::new;

    $self->{_conn} = shift;
    weaken($self->{_conn});

    return $self;
}

sub on_packet {
    my $self = shift;
    my $packet = shift;

    return unless $self->{_conn};
    $self->{_conn}->process_packet( $packet );

t/err7.t  view on Meta::CPAN

# have one (it starts at 9000 and works up)
start_worker(PORT, 2);

my $client = Gearman::Client::Async->new;
$client->set_job_servers('127.0.0.1:' . PORT);

my $complete = 0;
my $failed = 0;
my $done   = 0;

use Scalar::Util qw(weaken);
my $taskptr;
{
    my $task = Gearman::Task->new( "sleep_for" => \ "2", {
        timeout => 1.0,
        retry_count => 5,
        on_complete => sub {
            $complete = 1;
        },
        on_fail => sub {
            $failed = 1;
            $done = 1;
        },
    });
    $client->add_task($task);
    $taskptr = $task;
    weaken($taskptr);
}

Danga::Socket->SetPostLoopCallback(sub {
    return !$done;
});

Danga::Socket->EventLoop();

ok(!$taskptr, "Gearman::Task object went out of scope");
ok($failed, "got a failure");

t/err8.t  view on Meta::CPAN

# have one (it starts at 9000 and works up)
start_worker(PORT, 2);

my $client = Gearman::Client::Async->new;
$client->set_job_servers('127.0.0.1:' . PORT);

my $complete = 0;
my $failed = 0;
my $done   = 0;

use Scalar::Util qw(weaken);
my $taskptr;
{
    my $task = Gearman::Task->new( "sleep_for" => \ "3", {
        timeout => 1,
        retry_count => 0,
        on_complete => sub {
            $complete = 1;
        },
        on_fail => sub {
            $failed = 1;
            $done = 1;
        },
    });
    $client->add_task($task);
    $taskptr = $task;
    weaken($taskptr);
}

# don't read so we get a situation
# where the job timeouts before the server responds


Danga::Socket->SetPostLoopCallback(sub {
    my $socket = Danga::Socket->DescriptorMap->{3};
    $socket->watch_read(0);



( run in 0.287 second using v1.01-cache-2.11-cpan-1f129e94a17 )