Gearman

 view release on metacpan or  search on metacpan

lib/Gearman/Taskset.pm  view on Meta::CPAN


    my $sock = $self->client()->_get_js_sock($js);

    return $self->{loaned_sock}{$js_str} = $sock;
} ## end sub _get_loaned_sock

=head2 wait(%opts)

Waits for a response from the job server for any of the tasks listed
in the taskset. Will call the I<on_*> handlers for each of the tasks
that have been completed, updated, etc.  Doesn't return until
everything has finished running or failing.

=cut

sub wait {
    my ($self, %opts) = @_;
    my ($timeout, $given_timeout_s);
    if (exists $opts{timeout}) {
        $timeout = delete $opts{timeout};
        if (defined $timeout) {
            ## keep the given timeout value for the failure reason
            #  Handles issue #35
            #  https://github.com/p-alik/perl-Gearman/issues/35
            $given_timeout_s = $timeout;
            $timeout += Time::HiRes::time();
        }
    }

    Carp::carp "Unknown options: "
        . join(',', keys %opts)
        . " passed to Taskset->wait."
        if keys %opts;

    # fd -> Gearman::ResponseParser object
    my %parser;

    my $cb = sub {
        my ($fd) = shift;

        my $parser = $parser{$fd} ||= Gearman::ResponseParser::Taskset->new(
            source  => $fd,
            taskset => $self
        );
        eval {
            $parser->parse_sock($fd);
            1;
        } or do {

            # TODO this should remove the fd from the list, and reassign any tasks to other jobserver, or bail.
            # We're not in an accessible place here, so if all job servers fail we must die to prevent hanging.
            Carp::croak("Job server failure: $@");
            } ## end do
    };

    my $io = IO::Select->new($self->{default_sock},
        values %{ $self->{loaned_sock} });

    my $pending_sock;
    foreach ($io->handles) {
        (ref($_) eq "IO::Socket::SSL" && $_->pending()) || next;

        $pending_sock = $_;
        last;
    }

    if ($pending_sock) {
        return $cb->($pending_sock);
    }

    while (!$self->{cancelled} && keys %{ $self->{waiting} }) {
        my $time_left = $timeout ? $timeout - Time::HiRes::time() : 0.5;
        my $nfound = select($io->bits(), undef, undef, $time_left);
        if ($timeout && $time_left <= 0) {
            ## Attempt to fix
            #  https://github.com/p-alik/perl-Gearman/issues/33
            #  Mark all tasks of that taskset failed.
            #  Get all waiting tasks and call their "fail" method one by one
            #  with the failure reason.
            for (values %{ $self->{waiting} }) {
                for (@$_) {
                    my $func = $_->func;
                    ## use the given timeout here
                    #  Handles issue #35
                    #  https://github.com/p-alik/perl-Gearman/issues/35
                    $_->fail("Task $func elapsed timeout [${given_timeout_s}s]");
                }
            } ## end for (values %{ $self->{...}})
            $self->cancel;
            return;
        } ## end if ($timeout && $time_left...)

        next if !$nfound;
        foreach my $fd ($io->can_read()) {
            $cb->($fd);
        }
    } ## end while (!$self->{cancelled...})
} ## end sub wait

=head2 add_task(Gearman::Task)

=head2 add_task($func, <$scalar | $scalarref>, <$uniq | $opts_hr>

Adds a task to the taskset.  Three different calling conventions are available.

C<$opts_hr> see L<Gearman::Task>

=cut

sub add_task {
    my $self = shift;
    my $task = $self->client()->_get_task_from_args(@_);

    $task->taskset($self);

    $self->run_hook('add_task', $self, $task);

    my $jssock = $task->{jssock};

    return $task->fail("undefined jssock") unless ($jssock);



( run in 3.957 seconds using v1.01-cache-2.11-cpan-75ffa21a3d4 )