Gearman-Client-Async
view release on metacpan or search on metacpan
lib/Gearman/Client/Async/Connection.pm view on Meta::CPAN
our $T_ON_TIMEOUT;
use constant S_DISCONNECTED => \ "disconnected";
use constant S_CONNECTING => \ "connecting";
use constant S_READY => \ "ready";
use Carp qw(croak);
use Gearman::Task;
use Gearman::Util;
use Scalar::Util qw(weaken);
use IO::Handle;
use Socket qw(PF_INET IPPROTO_TCP TCP_NODELAY SOL_SOCKET SOCK_STREAM);
sub DEBUGGING () { 0 }
sub new {
my Gearman::Client::Async::Connection $self = shift;
my %opts = @_;
lib/Gearman/Client/Async/Connection.pm view on Meta::CPAN
my Gearman::Client::Async::Connection $self = shift;
my Gearman::Task $task = shift;
Carp::confess("add_task called when in wrong state")
unless $self->{state} == S_READY;
warn "writing task $task to $self->{hostspec}\n" if DEBUGGING;
$self->write( $task->pack_submit_packet );
push @{$self->{need_handle}}, $task;
Scalar::Util::weaken($self->{need_handle}->[-1]);
}
sub stuff_outstanding {
my Gearman::Client::Async::Connection $self = shift;
return
@{$self->{need_handle}} ||
%{$self->{waiting}};
}
sub _requeue_all {
lib/Gearman/Client/Async/Connection.pm view on Meta::CPAN
sub t_set_offline {
my ($self, $val) = @_;
$val = 1 unless defined $val;
$self->{t_offline} = $val;
}
package Gearman::ResponseParser::Async;
use strict;
use warnings;
use Scalar::Util qw(weaken);
use Gearman::ResponseParser;
use base 'Gearman::ResponseParser';
sub new {
my $class = shift;
my $self = $class->SUPER::new;
$self->{_conn} = shift;
weaken($self->{_conn});
return $self;
}
sub on_packet {
my $self = shift;
my $packet = shift;
return unless $self->{_conn};
$self->{_conn}->process_packet( $packet );
# have one (it starts at 9000 and works up)
start_worker(PORT, 2);
my $client = Gearman::Client::Async->new;
$client->set_job_servers('127.0.0.1:' . PORT);
my $complete = 0;
my $failed = 0;
my $done = 0;
use Scalar::Util qw(weaken);
my $taskptr;
{
my $task = Gearman::Task->new( "sleep_for" => \ "2", {
timeout => 1.0,
retry_count => 5,
on_complete => sub {
$complete = 1;
},
on_fail => sub {
$failed = 1;
$done = 1;
},
});
$client->add_task($task);
$taskptr = $task;
weaken($taskptr);
}
Danga::Socket->SetPostLoopCallback(sub {
return !$done;
});
Danga::Socket->EventLoop();
ok(!$taskptr, "Gearman::Task object went out of scope");
ok($failed, "got a failure");
# have one (it starts at 9000 and works up)
start_worker(PORT, 2);
my $client = Gearman::Client::Async->new;
$client->set_job_servers('127.0.0.1:' . PORT);
my $complete = 0;
my $failed = 0;
my $done = 0;
use Scalar::Util qw(weaken);
my $taskptr;
{
my $task = Gearman::Task->new( "sleep_for" => \ "3", {
timeout => 1,
retry_count => 0,
on_complete => sub {
$complete = 1;
},
on_fail => sub {
$failed = 1;
$done = 1;
},
});
$client->add_task($task);
$taskptr = $task;
weaken($taskptr);
}
# don't read so we get a situation
# where the job timeouts before the server responds
Danga::Socket->SetPostLoopCallback(sub {
my $socket = Danga::Socket->DescriptorMap->{3};
$socket->watch_read(0);
( run in 0.287 second using v1.01-cache-2.11-cpan-1f129e94a17 )