Gearman
view release on metacpan or search on metacpan
lib/Gearman/Client.pm view on Meta::CPAN
});
print "1 + 2 = $$result_ref\n";
# waiting on a set of tasks in parallel
my $taskset = $client->new_task_set;
$taskset->add_task( "add" => "1+2", {
on_complete => sub { ... }
});
$taskset->add_task( "divide" => "5/0", {
on_fail => sub { print "divide by zero error!\n"; },
});
$taskset->wait;
=head1 DESCRIPTION
I<Gearman::Client> is a client class for the Gearman distributed job
system, providing a framework for sending jobs to one or more Gearman
servers. These jobs are then distributed out to a farm of workers.
Callers instantiate a I<Gearman::Client> object and from it dispatch
single tasks, sets of tasks, or check on the status of tasks.
I<Gearman::Client> is derived from L<Gearman::Objects>
=head1 USAGE
=head2 Gearman::Client->new(%options)
Creates a new I<Gearman::Client> object, and returns the object.
If I<%options> is provided, initializes the new client object with the
settings in I<%options>, which can contain:
=over 4
=item
exceptions
If true, the client sends an L<OPTION_REQ exceptions|http://gearman.org/protocol/> request for each connection to the job server.
This causes job server to forward WORK_EXCEPTION packets to the client.
=item
job_servers
List of job servers. Value should be an array reference, hash reference
or scalar.
Calls L<Gearman::Objects> to set I<job_servers>
=item
prefix
Calls I<prefix> (see L<Gearman::Objects>) to set the prefix / namespace.
=item
command_timeout
Maximum time a gearman command should take to get a result (not a job timeout)
default: 30 seconds
=item
backoff_max
Max number of failed connection attempts before an job server will be temporary disabled
default: 90
=back
=head1 EXAMPLES
=head2 Summation
This is an example client that sends off a request to sum up a list of
integers.
use Gearman::Client;
use Storable qw( freeze );
my $client = Gearman::Client->new;
$client->job_servers('127.0.0.1');
my $tasks = $client->new_task_set;
my $handle = $tasks->add_task(sum => freeze([ 3, 5 ]), {
on_complete => sub { print ${ $_[0] }, "\n" }
});
$tasks->wait;
See the L<Gearman::Worker> documentation for the worker for the I<sum>
function.
=head1 NOTE
If you intend using UTF-8 data with SSL based connection,
beware there is no UTF-8 support in underlying L<Net::SSLeay>.
L<perlunicode/"Forcing-Unicode-in-Perl-(Or-Unforcing-Unicode-in-Perl)"> describes proper workarounds.
=cut
use base 'Gearman::Objects';
use fields (
'sock_info', # hostport -> hashref
'hooks', # hookname -> coderef
'exceptions',
'backoff_max',
# maximum time a gearman command should take to get a result (not a job timeout)
'command_timeout',
);
use Carp;
use Gearman::Task;
use Gearman::Taskset;
use Gearman::JobStatus;
use Time::HiRes;
sub new {
my ($self, %opts) = @_;
unless (ref $self) {
$self = fields::new($self);
}
$self->SUPER::new(%opts);
$self->{hooks} = {};
$self->{exceptions} = 0;
$self->{backoff_max} = 90;
$self->{command_timeout} = 30;
$self->{exceptions} = delete $opts{exceptions}
if exists $opts{exceptions};
$self->{backoff_max} = $opts{backoff_max}
if defined $opts{backoff_max};
$self->{command_timeout} = $opts{command_timeout}
if defined $opts{command_timeout};
return $self;
} ## end sub new
=head1 METHODS
=head2 new_task_set()
Creates and returns a new L<Gearman::Taskset> object.
=cut
sub new_task_set {
my $self = shift;
my $taskset = Gearman::Taskset->new($self);
$self->run_hook('new_task_set', $self, $taskset);
return $taskset;
} ## end sub new_task_set
#
# _job_server_status_command($command, $each_line_sub)
# $command e.g. "status\n".
# $each_line_sub A sub to be called on each line of response;
# takes $hostport and the $line as args.
#
sub _job_server_status_command {
my ($self, $command, $each_line_sub) = (shift, shift, shift);
my $list
= scalar(@_)
? $self->canonicalize_job_servers(@_)
: $self->job_servers();
my %js_map = map { $self->_js_str($_) => 1 } $self->job_servers();
foreach my $js (@{$list}) {
defined($js_map{ $self->_js_str($js) }) || next;
my $sock = $self->_get_js_sock($js)
or next;
my $rv = $sock->write($command);
my $err;
my @lines = Gearman::Util::read_text_status($sock, \$err);
if ($err) {
$self->debug() && warn $err;
next;
}
foreach my $l (@lines) {
$each_line_sub->($js, $l);
}
$self->_sock_cache($js, $sock);
} ## end foreach my $js (@{$list})
} ## end sub _job_server_status_command
=head2 get_job_server_status()
lib/Gearman/Client.pm view on Meta::CPAN
B<return> L<Gearman::JobStatus> on success
=cut
sub get_status {
my ($self, $handle) = @_;
$handle || return;
my ($js_str, $shandle) = split(m!//!, $handle);
#TODO simple check for $js_str in job_server doesn't work if
# $js_str is not contained in job_servers
# job_servers = ["localhost:4730"]
# handle = 127.0.0.1:4730//H:...
#
# hopefully commit 58e2aa5 solves this TODO
my $js = $self->_js($js_str);
$js || return;
my $sock = $self->_get_js_sock($js);
$sock || return;
my $req = Gearman::Util::pack_req_command("get_status", $shandle);
my $len = length($req);
my $rv = $sock->write($req, $len);
my $err;
my $res = Gearman::Util::read_res_packet($sock, \$err);
if ($res && $res->{type} eq "error") {
Carp::croak
"Error packet from server after get_status: ${$res->{blobref}}\n";
}
return undef unless $res && $res->{type} eq "status_res";
my @args = split(/\0/, ${ $res->{blobref} });
#FIXME returns on '', 0
$args[0] || return;
shift @args;
$self->_sock_cache($js_str, $sock);
return Gearman::JobStatus->new(@args);
} ## end sub get_status
#
# _option_request($sock, $option)
#
sub _option_request {
my ($self, $sock, $option) = @_;
my $req = Gearman::Util::pack_req_command("option_req", $option);
my $len = length($req);
my $rv = $sock->write($req, $len);
my $err;
my $res = Gearman::Util::read_res_packet($sock, \$err,
$self->{command_timeout});
return unless $res;
return 0 if $res->{type} eq "error";
return 1 if $res->{type} eq "option_res";
warn "Got unknown response to option request: $res->{type}\n";
return;
} ## end sub _option_request
#
# _get_js_sock($js)
#
# returns a socket from the cache. it should be returned to the
# cache with _sock_cache($js, $sock).
# The hostport isn't verified. the caller
# should verify that $js is in the set of jobservers.
sub _get_js_sock {
my ($self, $js) = @_;
if (my $sock = $self->_sock_cache($js, undef, 1)) {
return $sock if $sock->connected;
}
my $sockinfo = $self->{sock_info}{ $self->_js_str($js) } ||= {};
my $disabled_until = $sockinfo->{disabled_until};
return if defined $disabled_until && $disabled_until > Time::HiRes::time();
my $sock = $self->socket($js, 1);
unless ($sock) {
my $count = ++$sockinfo->{failed_connects};
my $disable_for = $count**2;
my $max = $self->{backoff_max};
$disable_for = $disable_for > $max ? $max : $disable_for;
$sockinfo->{disabled_until} = $disable_for + Time::HiRes::time();
return;
} ## end unless ($sock)
$self->sock_nodelay($sock);
$sock->autoflush(1);
# If exceptions support is to be requested, and the request fails, disable
# exceptions for this client.
if ($self->{exceptions} && !$self->_option_request($sock, 'exceptions')) {
warn "Exceptions support denied by server, disabling.\n";
$self->{exceptions} = 0;
}
delete $sockinfo->{failed_connects}; # Success, mark the socket as such.
delete $sockinfo->{disabled_until};
return $sock;
} ## end sub _get_js_sock
sub _get_random_js_sock {
my ($self, $getter) = @_;
$self->{js_count} || return;
$getter ||= sub {
my $js = shift;
( run in 0.663 second using v1.01-cache-2.11-cpan-a5abf4f5562 )