Gearman

 view release on metacpan or  search on metacpan

CHANGES  view on Meta::CPAN

     -- dynamic tests by using environment variable GEARMAN_SERVERS

1.12 (2014-12-14)

     -- Repoint HACKING file.

     -- Make a jobserver connection use the command timeout during exception negotiation.

     -- Make $taskset->add_task use the command timeout to not hang during job submission.

     -- Add a client option 'command_timeout' to indicate how long we should wait before considering a
        gearman command to have failed. This not the same as a job timeout, and only affects commands
        that should generally not block apart from the roundtrip on the network.

     -- When a connection to a gearman server fails, start counting how many failures we've had and
        do an exponential backoff (1s, 4s, 9s, 16s...) to a maximum of 90 seconds (default) that we
        treat the server as 'gone' quickly.

     -- Change dispatch_background to share code paths with other dispatching, this will make
        background jobs now be hashed to a particular server (rather than handed to a random server)

lib/Gearman/Client.pm  view on Meta::CPAN

Calls L<Gearman::Objects> to set I<job_servers>

=item

prefix

Calls I<prefix> (see L<Gearman::Objects>) to set the prefix / namespace.

=item

command_timeout

Maximum time a gearman command should take to get a result (not a job timeout)

default: 30 seconds

=item

backoff_max

Max number of failed connection attempts before an job server will be temporary disabled

lib/Gearman/Client.pm  view on Meta::CPAN


use base 'Gearman::Objects';

use fields (
    'sock_info',    # hostport -> hashref
    'hooks',        # hookname -> coderef
    'exceptions',
    'backoff_max',

    # maximum time a gearman command should take to get a result (not a job timeout)
    'command_timeout',
);

use Carp;
use Gearman::Task;
use Gearman::Taskset;
use Gearman::JobStatus;
use Time::HiRes;

sub new {
    my ($self, %opts) = @_;
    unless (ref $self) {
        $self = fields::new($self);
    }

    $self->SUPER::new(%opts);

    $self->{hooks}           = {};
    $self->{exceptions}      = 0;
    $self->{backoff_max}     = 90;
    $self->{command_timeout} = 30;

    $self->{exceptions} = delete $opts{exceptions}
        if exists $opts{exceptions};

    $self->{backoff_max} = $opts{backoff_max}
        if defined $opts{backoff_max};

    $self->{command_timeout} = $opts{command_timeout}
        if defined $opts{command_timeout};

    return $self;
} ## end sub new

=head1 METHODS

=head2 new_task_set()

Creates and returns a new L<Gearman::Taskset> object.

lib/Gearman/Client.pm  view on Meta::CPAN

#
sub _option_request {
    my ($self, $sock, $option) = @_;

    my $req = Gearman::Util::pack_req_command("option_req", $option);
    my $len = length($req);
    my $rv  = $sock->write($req, $len);

    my $err;
    my $res = Gearman::Util::read_res_packet($sock, \$err,
        $self->{command_timeout});

    return unless $res;

    return 0 if $res->{type} eq "error";
    return 1 if $res->{type} eq "option_res";

    warn "Got unknown response to option request: $res->{type}\n";
    return;
} ## end sub _option_request

lib/Gearman/Taskset.pm  view on Meta::CPAN

    return $task->fail("undefined jssock") unless ($jssock);

    my $req = $task->pack_submit_packet($self->client);
    Gearman::Util::send_req($jssock, \$req)
        || Carp::croak "Error sending data to job server";

    push @{ $self->{need_handle} }, $task;
    while (@{ $self->{need_handle} }) {
        my $rv
            = $self->_wait_for_packet($jssock,
            $self->client()->{command_timeout});
        if (!$rv) {

            # ditch it, it failed.
            # this will resubmit it if it failed.
            shift @{ $self->{need_handle} };
            return $task->fail(
                join(' ',
                    "no rv on waiting for packet",
                    defined($rv) ? $rv : $!)
            );

t/02-client.t  view on Meta::CPAN

        get_status
        new_task_set
        run_hook
        /
);

subtest "new", sub {
    my $c = new_ok($mn);
    isa_ok($c, "Gearman::Objects");
    is($c->{backoff_max},           90, join "->", $mn, "{backoff_max}");
    is($c->{command_timeout},       30, join "->", $mn, "{command_timeout}");
    is($c->{exceptions},            0,  join "->", $mn, "{exceptions}");
    is($c->{js_count},              0,  "js_count");
    is(keys(%{ $c->{hooks} }),      0,  join "->", $mn, "{hooks}");
    is(keys(%{ $c->{sock_cache} }), 0,  join "->", $mn, "{sock_cache}");
};

subtest "new_task_set", sub {
    my $c  = new_ok($mn);
    my $h  = "new_task_set";
    my $cb = sub { pass("$h cb") };



( run in 0.276 second using v1.01-cache-2.11-cpan-4d50c553e7e )