Gearman

 view release on metacpan or  search on metacpan

lib/Gearman/Client.pm  view on Meta::CPAN

    });
    print "1 + 2 = $$result_ref\n";

    # waiting on a set of tasks in parallel
    my $taskset = $client->new_task_set;
    $taskset->add_task( "add" => "1+2", {
       on_complete => sub { ... }
    });
    $taskset->add_task( "divide" => "5/0", {
       on_fail => sub { print "divide by zero error!\n"; },
    });
    $taskset->wait;


=head1 DESCRIPTION

I<Gearman::Client> is a client class for the Gearman distributed job
system, providing a framework for sending jobs to one or more Gearman
servers. These jobs are then distributed out to a farm of workers.

Callers instantiate a I<Gearman::Client> object and from it dispatch
single tasks, sets of tasks, or check on the status of tasks.

I<Gearman::Client> is derived from L<Gearman::Objects>

=head1 USAGE

=head2 Gearman::Client->new(%options)

Creates a new I<Gearman::Client> object, and returns the object.

If I<%options> is provided, initializes the new client object with the
settings in I<%options>, which can contain:

=over 4

=item

exceptions

If true, the client sends an L<OPTION_REQ exceptions|http://gearman.org/protocol/> request for each connection to the job server.
This causes job server to forward WORK_EXCEPTION packets to the client.

=item

job_servers

List of job servers. Value should be an array reference, hash reference
or scalar.

Calls L<Gearman::Objects> to set I<job_servers>

=item

prefix

Calls I<prefix> (see L<Gearman::Objects>) to set the prefix / namespace.

=item

command_timeout

Maximum time a gearman command should take to get a result (not a job timeout)

default: 30 seconds

=item

backoff_max

Max number of failed connection attempts before an job server will be temporary disabled

default: 90

=back

=head1 EXAMPLES

=head2 Summation

This is an example client that sends off a request to sum up a list of
integers.

    use Gearman::Client;
    use Storable qw( freeze );
    my $client = Gearman::Client->new;
    $client->job_servers('127.0.0.1');
    my $tasks = $client->new_task_set;
    my $handle = $tasks->add_task(sum => freeze([ 3, 5 ]), {
        on_complete => sub { print ${ $_[0] }, "\n" }
    });
    $tasks->wait;

See the L<Gearman::Worker> documentation for the worker for the I<sum>
function.

=head1 NOTE

If you intend using UTF-8 data with SSL based connection,
beware there is no UTF-8 support in underlying L<Net::SSLeay>.
L<perlunicode/"Forcing-Unicode-in-Perl-(Or-Unforcing-Unicode-in-Perl)"> describes proper workarounds.

=cut

use base 'Gearman::Objects';

use fields (
    'sock_info',    # hostport -> hashref
    'hooks',        # hookname -> coderef
    'exceptions',
    'backoff_max',

    # maximum time a gearman command should take to get a result (not a job timeout)
    'command_timeout',
);

use Carp;
use Gearman::Task;
use Gearman::Taskset;
use Gearman::JobStatus;
use Time::HiRes;

sub new {
    my ($self, %opts) = @_;
    unless (ref $self) {
        $self = fields::new($self);
    }

    $self->SUPER::new(%opts);

    $self->{hooks}           = {};
    $self->{exceptions}      = 0;
    $self->{backoff_max}     = 90;
    $self->{command_timeout} = 30;

    $self->{exceptions} = delete $opts{exceptions}
        if exists $opts{exceptions};

    $self->{backoff_max} = $opts{backoff_max}
        if defined $opts{backoff_max};

    $self->{command_timeout} = $opts{command_timeout}
        if defined $opts{command_timeout};

    return $self;
} ## end sub new

=head1 METHODS

=head2 new_task_set()

Creates and returns a new L<Gearman::Taskset> object.

=cut

sub new_task_set {
    my $self    = shift;
    my $taskset = Gearman::Taskset->new($self);
    $self->run_hook('new_task_set', $self, $taskset);
    return $taskset;
} ## end sub new_task_set

#
# _job_server_status_command($command, $each_line_sub)
# $command e.g. "status\n".
# $each_line_sub A sub to be called on each line of response;
#                takes $hostport and the $line as args.
#
sub _job_server_status_command {
    my ($self, $command, $each_line_sub) = (shift, shift, shift);

    my $list
        = scalar(@_)
        ? $self->canonicalize_job_servers(@_)
        : $self->job_servers();
    my %js_map = map { $self->_js_str($_) => 1 } $self->job_servers();

    foreach my $js (@{$list}) {
        defined($js_map{ $self->_js_str($js) }) || next;

        my $sock = $self->_get_js_sock($js)
            or next;

        my $rv = $sock->write($command);

        my $err;
        my @lines = Gearman::Util::read_text_status($sock, \$err);
        if ($err) {

            $self->debug() && warn $err;
            next;
        }

        foreach my $l (@lines) {
            $each_line_sub->($js, $l);
        }

        $self->_sock_cache($js, $sock);
    } ## end foreach my $js (@{$list})
} ## end sub _job_server_status_command

=head2 get_job_server_status()

 view all matches for this distribution
 view release on metacpan -  search on metacpan

( run in 0.581 second using v1.00-cache-2.02-grep-82fe00e-cpan-1310916c57ae )