Gearman
view release on metacpan or search on metacpan
lib/Gearman/Taskset.pm view on Meta::CPAN
package Gearman::Taskset;
use version ();
$Gearman::Taskset::VERSION = version->declare("2.004.015");
use strict;
use warnings;
=head1 NAME
Gearman::Taskset - a taskset in Gearman, from the point of view of a L<Gearman::Client>
=head1 SYNOPSIS
use Gearman::Client;
my $client = Gearman::Client->new;
# waiting on a set of tasks in parallel
my $ts = $client->new_task_set;
$ts->add_task( "add" => "1+2", {...});
$ts->wait();
=head1 DESCRIPTION
Gearman::Taskset is a L<Gearman::Client>'s representation of tasks queue
=head1 METHODS
=cut
use fields (
qw/
waiting
client
need_handle
default_sock
default_sockaddr
loaned_sock
cancelled
hooks
/
);
use Carp ();
use Gearman::Util ();
use Gearman::ResponseParser::Taskset;
use IO::Select;
# i thought about weakening taskset's client, but might be too weak.
use Scalar::Util ();
use Socket ();
use Storable ();
use Time::HiRes ();
=head2 new($client)
=cut
sub new {
my ($self, $client) = @_;
(Scalar::Util::blessed($client) && $client->isa("Gearman::Client"))
|| Carp::croak
"provided client argument is not a Gearman::Client reference";
unless (ref $self) {
$self = fields::new($self);
}
# { handle => [Task, ...] }
$self->{waiting} = {};
$self->{need_handle} = [];
$self->{client} = $client;
# { hostport => socket }
$self->{loaned_sock} = {};
# bool, if taskset has been cancelled mid-processing
$self->{cancelled} = 0;
# { hookname => coderef }
$self->{hooks} = {};
# default socket (non-merged requests)
$self->{default_sock} = undef;
# $self->client()->_js_str($self->{default_sock});
$self->{default_sockaddr} = undef;
return $self;
} ## end sub new
sub DESTROY {
my $self = shift;
# During global cleanup this may be called out of order, and the client my not exist in the taskset.
return unless $self->client;
if ($self->{default_sock}) {
$self->client->_sock_cache($self->{default_sockaddr},
$self->{default_sock});
}
keys %{ $self->{loaned_sock} };
while (my ($hp, $sock) = each %{ $self->{loaned_sock} }) {
$self->client->_sock_cache($hp, $sock);
}
} ## end sub DESTROY
#=head2 run_hook($name)
( run in 1.795 second using v1.01-cache-2.11-cpan-5a3173703d6 )