Gearman

 view release on metacpan or  search on metacpan

lib/Gearman/Taskset.pm  view on Meta::CPAN


    while (!$self->{cancelled} && keys %{ $self->{waiting} }) {
        my $time_left = $timeout ? $timeout - Time::HiRes::time() : 0.5;
        my $nfound = select($io->bits(), undef, undef, $time_left);
        if ($timeout && $time_left <= 0) {
            ## Attempt to fix
            #  https://github.com/p-alik/perl-Gearman/issues/33
            #  Mark all tasks of that taskset failed.
            #  Get all waiting tasks and call their "fail" method one by one
            #  with the failure reason.
            for (values %{ $self->{waiting} }) {
                for (@$_) {
                    my $func = $_->func;
                    ## use the given timeout here
                    #  Handles issue #35
                    #  https://github.com/p-alik/perl-Gearman/issues/35
                    $_->fail("Task $func elapsed timeout [${given_timeout_s}s]");
                }
            } ## end for (values %{ $self->{...}})
            $self->cancel;
            return;
        } ## end if ($timeout && $time_left...)

        next if !$nfound;
        foreach my $fd ($io->can_read()) {
            $cb->($fd);
        }
    } ## end while (!$self->{cancelled...})
} ## end sub wait

=head2 add_task(Gearman::Task)

=head2 add_task($func, <$scalar | $scalarref>, <$uniq | $opts_hr>

Adds a task to the taskset.  Three different calling conventions are available.

C<$opts_hr> see L<Gearman::Task>

=cut

sub add_task {
    my $self = shift;
    my $task = $self->client()->_get_task_from_args(@_);

    $task->taskset($self);

    $self->run_hook('add_task', $self, $task);

    my $jssock = $task->{jssock};

    return $task->fail("undefined jssock") unless ($jssock);

    my $req = $task->pack_submit_packet($self->client);
    Gearman::Util::send_req($jssock, \$req)
        || Carp::croak "Error sending data to job server";

    push @{ $self->{need_handle} }, $task;
    while (@{ $self->{need_handle} }) {
        my $rv
            = $self->_wait_for_packet($jssock,
            $self->client()->{command_timeout});
        if (!$rv) {

            # ditch it, it failed.
            # this will resubmit it if it failed.
            shift @{ $self->{need_handle} };
            return $task->fail(
                join(' ',
                    "no rv on waiting for packet",
                    defined($rv) ? $rv : $!)
            );
        } ## end if (!$rv)
    } ## end while (@{ $self->{need_handle...}})

    return $task->handle;
} ## end sub add_task

#
# _get_default_sock()
# used in Gearman::Task->taskset only
#
sub _get_default_sock {
    my $self = shift;
    return $self->{default_sock} if $self->{default_sock};

    my $getter = sub {
        my $js = shift;
        return $self->{loaned_sock}{$js}
            || $self->client()->_get_js_sock($js);
    };

    my ($js, $jss) = $self->client()->_get_random_js_sock($getter);
    return unless $jss;

    my $js_str = $self->client()->_js_str($js);
    $self->{loaned_sock}{$js_str} ||= $jss;

    $self->{default_sock}     = $jss;
    $self->{default_sockaddr} = $js_str;

    return $jss;
} ## end sub _get_default_sock

#
#  _get_hashed_sock($hv)
#
# only used in Gearman::Task->taskset only
#
# return a socket
sub _get_hashed_sock {
    my $self = shift;
    my $hv   = shift;
    my ($js_count, @job_servers)
        = ($self->client()->{js_count}, $self->client()->job_servers());
    my $sock;
    for (my $off = 0; $off < $js_count; $off++) {
        my $idx = ($hv + $off) % ($js_count);
        $sock = $self->_get_loaned_sock($job_servers[$idx]);
        last;
    }

 view all matches for this distribution
 view release on metacpan -  search on metacpan

( run in 0.962 second using v1.00-cache-2.02-grep-82fe00e-cpan-1310916c57ae )