Gearman-Client-Async
view release on metacpan or search on metacpan
lib/Gearman/Client/Async.pm view on Meta::CPAN
package Gearman::Client::Async;
=head1 NAME
Gearman::Client::Async - Asynchronous client module for Gearman for Danga::Socket applications
=head1 SYNOPSIS
use Gearman::Client::Async;
# Instantiate a new Gearman::Client::Async object.
$client = Gearman::Client::Async->new(
job_servers => [ '127.0.0.1', '192.168.0.1:123' ],
);
# Overwrite job server list with a new one.
$client->set_job_servers( '10.0.0.1' );
# Read list of job servers out of the client.
$arrayref = $client->job_servers;
@array = $client->job_servers;
# Start a task
$task = Gearman::Task->new(...); # with callbacks, etc
$client->add_task( $task );
=head1 COPYRIGHT
Copyright 2006 Six Apart, Ltd.
License granted to use/distribute under the same terms as Perl itself.
=head1 WARRANTY
This is free software. This comes with no warranty whatsoever.
=head1 AUTHORS
Brad Fitzpatrick (brad@danga.com)
Jonathan Steinert (hachi@cpan.org)
=cut
use strict;
use warnings;
use Carp qw(croak);
use fields (
'job_servers', # arrayref of Gearman::Client::Async::Connection objects
't_no_random', # don't randomize job server to use: use first alive one.
't_offline_host', # hashref: hostname -> $bool, if host should act as offline, for testing
);
use Danga::Socket 1.52;
use Gearman::Objects;
use Gearman::Task;
use Gearman::JobStatus;
use Gearman::Client::Async::Connection;
use List::Util qw(first);
use vars qw($VERSION);
$VERSION = "0.94";
sub DEBUGGING () { 0 }
sub new {
my ($class, %opts) = @_;
my $self = $class;
$self = fields::new($class) unless ref $self;
$self->{job_servers} = [];
$self->{t_offline_host} = {};
my $js = delete $opts{job_servers};
$self->set_job_servers(@$js) if $js;
croak "Unknown parameters: " . join(", ", keys %opts) if %opts;
return $self;
}
# for testing.
sub t_set_disable_random {
my $self = shift;
$self->{t_no_random} = shift;
}
sub t_set_offline_host {
my ($self, $host, $val) = @_;
$val = 1 unless defined $val;
$self->{t_offline_host}{$host} = $val;
my $conn = first { $_->hostspec eq $host } @{ $self->{job_servers} }
or die "No host found with that spec to mark offline";
$conn->t_set_offline($val);
}
# set job servers, without shutting down dups, and shutting down old ones gracefully
sub set_job_servers {
my Gearman::Client::Async $self = shift;
my %being_set; # hostspec -> 1
%being_set = map { $_, 1 } @_;
my %exist; # hostspec -> existing conn
foreach my $econn (@{ $self->{job_servers} }) {
my $spec = $econn->hostspec;
if ($being_set{$spec}) {
$exist{$spec} = $econn;
} else {
$econn->close_when_finished;
}
}
my @newlist;
foreach (@_) {
push @newlist, $exist{$_} || Gearman::Client::Async::Connection->new( hostspec => $_ );
}
$self->{job_servers} = \@newlist;
}
# getter
sub job_servers {
my Gearman::Client::Async $self = shift;
croak "Not a setter" if @_;
my @list = map { $_->hostspec } @{ $self->{job_servers} };
return wantarray ? @list : \@list;
}
sub add_task {
my Gearman::Client::Async $self = shift;
my Gearman::Task $task = shift;
my $try_again;
$try_again = sub {
my @job_servers = grep { $_->alive } @{$self->{job_servers}};
warn "Alive servers: " . @job_servers . " out of " . @{$self->{job_servers}} . "\n" if DEBUGGING;
unless (@job_servers) {
$task->final_fail;
$try_again = undef;
return;
}
my $js;
if (defined( my $hash = $task->hash )) {
# Task is hashed, use key to fetch job server
$js = @job_servers[$hash % @job_servers];
}
else {
# Task is not hashed, random job server
$js = @job_servers[$self->{t_no_random} ? 0 :
int( rand( @job_servers ))];
}
# TODO Fix this violation of object privacy.
$task->{taskset} = $self;
$js->get_in_ready_state(
# on_ready:
sub {
my $timer;
if (my $timeout = $task->{timeout}) {
$timer = Danga::Socket->AddTimer($timeout, sub {
$task->final_fail('timeout');
});
}
$task->set_on_post_hooks(sub {
$timer->cancel if $timer;
# ALSO clean up our $js (connection's) waiting stuff:
$js->give_up_on($task);
});
$js->add_task( $task );
$try_again = undef;
},
# on_error:
$try_again,
);
};
$try_again->();
}
# Gearman::Client::Async sometimes fakes itself duck-typing style as a
# Gearman::Taskset, since a task"set" makes no sense in an async
# world, where there's no need to wait on a set of things... since
# everything happens at its own pace. so for duck-typing reasons (or,
# er, "implementing an interface", say), we need to implement a the
# "taskset client method" but in our case, that's just us.
sub client { $_[0] }
# as a Gearman::Client-like thing, we'll be asked for our prefix, which this module
# currently doesn't support, but the base Gearman libraries expect.
sub prefix { "" }
1;
( run in 0.913 second using v1.01-cache-2.11-cpan-39bf76dae61 )