Gearman
view release on metacpan or search on metacpan
lib/Gearman/Taskset.pm view on Meta::CPAN
my $sock = $self->client()->_get_js_sock($js);
return $self->{loaned_sock}{$js_str} = $sock;
} ## end sub _get_loaned_sock
=head2 wait(%opts)
Waits for a response from the job server for any of the tasks listed
in the taskset. Will call the I<on_*> handlers for each of the tasks
that have been completed, updated, etc. Doesn't return until
everything has finished running or failing.
=cut
sub wait {
my ($self, %opts) = @_;
my ($timeout, $given_timeout_s);
if (exists $opts{timeout}) {
$timeout = delete $opts{timeout};
if (defined $timeout) {
## keep the given timeout value for the failure reason
# Handles issue #35
# https://github.com/p-alik/perl-Gearman/issues/35
$given_timeout_s = $timeout;
$timeout += Time::HiRes::time();
}
}
Carp::carp "Unknown options: "
. join(',', keys %opts)
. " passed to Taskset->wait."
if keys %opts;
# fd -> Gearman::ResponseParser object
my %parser;
my $cb = sub {
my ($fd) = shift;
my $parser = $parser{$fd} ||= Gearman::ResponseParser::Taskset->new(
source => $fd,
taskset => $self
);
eval {
$parser->parse_sock($fd);
1;
} or do {
# TODO this should remove the fd from the list, and reassign any tasks to other jobserver, or bail.
# We're not in an accessible place here, so if all job servers fail we must die to prevent hanging.
Carp::croak("Job server failure: $@");
} ## end do
};
my $io = IO::Select->new($self->{default_sock},
values %{ $self->{loaned_sock} });
my $pending_sock;
foreach ($io->handles) {
(ref($_) eq "IO::Socket::SSL" && $_->pending()) || next;
$pending_sock = $_;
last;
}
if ($pending_sock) {
return $cb->($pending_sock);
}
while (!$self->{cancelled} && keys %{ $self->{waiting} }) {
my $time_left = $timeout ? $timeout - Time::HiRes::time() : 0.5;
my $nfound = select($io->bits(), undef, undef, $time_left);
if ($timeout && $time_left <= 0) {
## Attempt to fix
# https://github.com/p-alik/perl-Gearman/issues/33
# Mark all tasks of that taskset failed.
# Get all waiting tasks and call their "fail" method one by one
# with the failure reason.
for (values %{ $self->{waiting} }) {
for (@$_) {
my $func = $_->func;
## use the given timeout here
# Handles issue #35
# https://github.com/p-alik/perl-Gearman/issues/35
$_->fail("Task $func elapsed timeout [${given_timeout_s}s]");
}
} ## end for (values %{ $self->{...}})
$self->cancel;
return;
} ## end if ($timeout && $time_left...)
next if !$nfound;
foreach my $fd ($io->can_read()) {
$cb->($fd);
}
} ## end while (!$self->{cancelled...})
} ## end sub wait
=head2 add_task(Gearman::Task)
=head2 add_task($func, <$scalar | $scalarref>, <$uniq | $opts_hr>
Adds a task to the taskset. Three different calling conventions are available.
C<$opts_hr> see L<Gearman::Task>
=cut
sub add_task {
my $self = shift;
my $task = $self->client()->_get_task_from_args(@_);
$task->taskset($self);
$self->run_hook('add_task', $self, $task);
my $jssock = $task->{jssock};
return $task->fail("undefined jssock") unless ($jssock);
( run in 3.957 seconds using v1.01-cache-2.11-cpan-75ffa21a3d4 )