Gearman
view release on metacpan - search on metacpan
view release on metacpan or search on metacpan
lib/Gearman/Taskset.pm view on Meta::CPAN
while (!$self->{cancelled} && keys %{ $self->{waiting} }) {
my $time_left = $timeout ? $timeout - Time::HiRes::time() : 0.5;
my $nfound = select($io->bits(), undef, undef, $time_left);
if ($timeout && $time_left <= 0) {
## Attempt to fix
# https://github.com/p-alik/perl-Gearman/issues/33
# Mark all tasks of that taskset failed.
# Get all waiting tasks and call their "fail" method one by one
# with the failure reason.
for (values %{ $self->{waiting} }) {
for (@$_) {
my $func = $_->func;
## use the given timeout here
# Handles issue #35
# https://github.com/p-alik/perl-Gearman/issues/35
$_->fail("Task $func elapsed timeout [${given_timeout_s}s]");
}
} ## end for (values %{ $self->{...}})
$self->cancel;
return;
} ## end if ($timeout && $time_left...)
next if !$nfound;
foreach my $fd ($io->can_read()) {
$cb->($fd);
}
} ## end while (!$self->{cancelled...})
} ## end sub wait
=head2 add_task(Gearman::Task)
=head2 add_task($func, <$scalar | $scalarref>, <$uniq | $opts_hr>
Adds a task to the taskset. Three different calling conventions are available.
C<$opts_hr> see L<Gearman::Task>
=cut
sub add_task {
my $self = shift;
my $task = $self->client()->_get_task_from_args(@_);
$task->taskset($self);
$self->run_hook('add_task', $self, $task);
my $jssock = $task->{jssock};
return $task->fail("undefined jssock") unless ($jssock);
my $req = $task->pack_submit_packet($self->client);
Gearman::Util::send_req($jssock, \$req)
|| Carp::croak "Error sending data to job server";
push @{ $self->{need_handle} }, $task;
while (@{ $self->{need_handle} }) {
my $rv
= $self->_wait_for_packet($jssock,
$self->client()->{command_timeout});
if (!$rv) {
# ditch it, it failed.
# this will resubmit it if it failed.
shift @{ $self->{need_handle} };
return $task->fail(
join(' ',
"no rv on waiting for packet",
defined($rv) ? $rv : $!)
);
} ## end if (!$rv)
} ## end while (@{ $self->{need_handle...}})
return $task->handle;
} ## end sub add_task
#
# _get_default_sock()
# used in Gearman::Task->taskset only
#
sub _get_default_sock {
my $self = shift;
return $self->{default_sock} if $self->{default_sock};
my $getter = sub {
my $js = shift;
return $self->{loaned_sock}{$js}
|| $self->client()->_get_js_sock($js);
};
my ($js, $jss) = $self->client()->_get_random_js_sock($getter);
return unless $jss;
my $js_str = $self->client()->_js_str($js);
$self->{loaned_sock}{$js_str} ||= $jss;
$self->{default_sock} = $jss;
$self->{default_sockaddr} = $js_str;
return $jss;
} ## end sub _get_default_sock
#
# _get_hashed_sock($hv)
#
# only used in Gearman::Task->taskset only
#
# return a socket
sub _get_hashed_sock {
my $self = shift;
my $hv = shift;
my ($js_count, @job_servers)
= ($self->client()->{js_count}, $self->client()->job_servers());
my $sock;
for (my $off = 0; $off < $js_count; $off++) {
my $idx = ($hv + $off) % ($js_count);
$sock = $self->_get_loaned_sock($job_servers[$idx]);
last;
}
view all matches for this distributionview release on metacpan - search on metacpan
( run in 0.962 second using v1.00-cache-2.02-grep-82fe00e-cpan-1310916c57ae )