Gearman

 view release on metacpan or  search on metacpan

lib/Gearman/Taskset.pm  view on Meta::CPAN

package Gearman::Taskset;
use version ();
$Gearman::Taskset::VERSION = version->declare("2.004.015");

use strict;
use warnings;

=head1 NAME

Gearman::Taskset - a taskset in Gearman, from the point of view of a L<Gearman::Client>

=head1 SYNOPSIS

    use Gearman::Client;
    my $client = Gearman::Client->new;

    # waiting on a set of tasks in parallel
    my $ts = $client->new_task_set;
    $ts->add_task( "add" => "1+2", {...});
    $ts->wait();


=head1 DESCRIPTION

Gearman::Taskset is a L<Gearman::Client>'s representation of tasks queue

=head1 METHODS

=cut

use fields (
    qw/
        waiting
        client
        need_handle
        default_sock
        default_sockaddr
        loaned_sock
        cancelled
        hooks
        /
);

use Carp          ();
use Gearman::Util ();
use Gearman::ResponseParser::Taskset;
use IO::Select;

# i thought about weakening taskset's client, but might be too weak.
use Scalar::Util ();
use Socket       ();
use Storable     ();
use Time::HiRes  ();

=head2 new($client)

=cut

sub new {
    my ($self, $client) = @_;
    (Scalar::Util::blessed($client) && $client->isa("Gearman::Client"))
        || Carp::croak
        "provided client argument is not a Gearman::Client reference";

    unless (ref $self) {
        $self = fields::new($self);
    }

    # { handle => [Task, ...] }
    $self->{waiting}     = {};
    $self->{need_handle} = [];
    $self->{client}      = $client;

    # { hostport => socket }
    $self->{loaned_sock} = {};

    # bool, if taskset has been cancelled mid-processing
    $self->{cancelled} = 0;

    # { hookname => coderef }
    $self->{hooks} = {};

    # default socket (non-merged requests)
    $self->{default_sock} = undef;

    # $self->client()->_js_str($self->{default_sock});
    $self->{default_sockaddr} = undef;

    return $self;
} ## end sub new

sub DESTROY {
    my $self = shift;

    # During global cleanup this may be called out of order, and the client my not exist in the taskset.
    return unless $self->client;

    if ($self->{default_sock}) {
        $self->client->_sock_cache($self->{default_sockaddr},
            $self->{default_sock});
    }

    keys %{ $self->{loaned_sock} };
    while (my ($hp, $sock) = each %{ $self->{loaned_sock} }) {
        $self->client->_sock_cache($hp, $sock);
    }
} ## end sub DESTROY

#=head2 run_hook($name)



( run in 1.795 second using v1.01-cache-2.11-cpan-5a3173703d6 )