Gearman
view release on metacpan - search on metacpan
view release on metacpan or search on metacpan
lib/Gearman/Client.pm view on Meta::CPAN
});
print "1 + 2 = $$result_ref\n";
# waiting on a set of tasks in parallel
my $taskset = $client->new_task_set;
$taskset->add_task( "add" => "1+2", {
on_complete => sub { ... }
});
$taskset->add_task( "divide" => "5/0", {
on_fail => sub { print "divide by zero error!\n"; },
});
$taskset->wait;
=head1 DESCRIPTION
I<Gearman::Client> is a client class for the Gearman distributed job
system, providing a framework for sending jobs to one or more Gearman
servers. These jobs are then distributed out to a farm of workers.
Callers instantiate a I<Gearman::Client> object and from it dispatch
single tasks, sets of tasks, or check on the status of tasks.
I<Gearman::Client> is derived from L<Gearman::Objects>
=head1 USAGE
=head2 Gearman::Client->new(%options)
Creates a new I<Gearman::Client> object, and returns the object.
If I<%options> is provided, initializes the new client object with the
settings in I<%options>, which can contain:
=over 4
=item
exceptions
If true, the client sends an L<OPTION_REQ exceptions|http://gearman.org/protocol/> request for each connection to the job server.
This causes job server to forward WORK_EXCEPTION packets to the client.
=item
job_servers
List of job servers. Value should be an array reference, hash reference
or scalar.
Calls L<Gearman::Objects> to set I<job_servers>
=item
prefix
Calls I<prefix> (see L<Gearman::Objects>) to set the prefix / namespace.
=item
command_timeout
Maximum time a gearman command should take to get a result (not a job timeout)
default: 30 seconds
=item
backoff_max
Max number of failed connection attempts before an job server will be temporary disabled
default: 90
=back
=head1 EXAMPLES
=head2 Summation
This is an example client that sends off a request to sum up a list of
integers.
use Gearman::Client;
use Storable qw( freeze );
my $client = Gearman::Client->new;
$client->job_servers('127.0.0.1');
my $tasks = $client->new_task_set;
my $handle = $tasks->add_task(sum => freeze([ 3, 5 ]), {
on_complete => sub { print ${ $_[0] }, "\n" }
});
$tasks->wait;
See the L<Gearman::Worker> documentation for the worker for the I<sum>
function.
=head1 NOTE
If you intend using UTF-8 data with SSL based connection,
beware there is no UTF-8 support in underlying L<Net::SSLeay>.
L<perlunicode/"Forcing-Unicode-in-Perl-(Or-Unforcing-Unicode-in-Perl)"> describes proper workarounds.
=cut
use base 'Gearman::Objects';
use fields (
'sock_info', # hostport -> hashref
'hooks', # hookname -> coderef
'exceptions',
'backoff_max',
# maximum time a gearman command should take to get a result (not a job timeout)
'command_timeout',
);
use Carp;
use Gearman::Task;
use Gearman::Taskset;
use Gearman::JobStatus;
use Time::HiRes;
sub new {
my ($self, %opts) = @_;
unless (ref $self) {
$self = fields::new($self);
}
$self->SUPER::new(%opts);
$self->{hooks} = {};
$self->{exceptions} = 0;
$self->{backoff_max} = 90;
$self->{command_timeout} = 30;
$self->{exceptions} = delete $opts{exceptions}
if exists $opts{exceptions};
$self->{backoff_max} = $opts{backoff_max}
if defined $opts{backoff_max};
$self->{command_timeout} = $opts{command_timeout}
if defined $opts{command_timeout};
return $self;
} ## end sub new
=head1 METHODS
=head2 new_task_set()
Creates and returns a new L<Gearman::Taskset> object.
=cut
sub new_task_set {
my $self = shift;
my $taskset = Gearman::Taskset->new($self);
$self->run_hook('new_task_set', $self, $taskset);
return $taskset;
} ## end sub new_task_set
#
# _job_server_status_command($command, $each_line_sub)
# $command e.g. "status\n".
# $each_line_sub A sub to be called on each line of response;
# takes $hostport and the $line as args.
#
sub _job_server_status_command {
my ($self, $command, $each_line_sub) = (shift, shift, shift);
my $list
= scalar(@_)
? $self->canonicalize_job_servers(@_)
: $self->job_servers();
my %js_map = map { $self->_js_str($_) => 1 } $self->job_servers();
foreach my $js (@{$list}) {
defined($js_map{ $self->_js_str($js) }) || next;
my $sock = $self->_get_js_sock($js)
or next;
my $rv = $sock->write($command);
my $err;
my @lines = Gearman::Util::read_text_status($sock, \$err);
if ($err) {
$self->debug() && warn $err;
next;
}
foreach my $l (@lines) {
$each_line_sub->($js, $l);
}
$self->_sock_cache($js, $sock);
} ## end foreach my $js (@{$list})
} ## end sub _job_server_status_command
=head2 get_job_server_status()
view all matches for this distributionview release on metacpan - search on metacpan
( run in 0.581 second using v1.00-cache-2.02-grep-82fe00e-cpan-1310916c57ae )