view release on metacpan or search on metacpan
AnyEvent::Task - Client/server-based asynchronous worker pool
SYNOPSIS 1: PASSWORD HASHING
Server
use AnyEvent::Task::Server;
use Authen::Passphrase::BlowfishCrypt;
my $dev_urandom;
my $server = AnyEvent::Task::Server->new(
name => 'passwd-hasher',
listen => ['unix/', '/tmp/anyevent-task.socket'],
setup => sub {
open($dev_urandom, "/dev/urandom") || die "open urandom: $!";
},
interface => {
hash => sub {
my ($plaintext) = @_;
read($dev_urandom, my $salt, 16) == 16 || die "bad read from urandom";
return Authen::Passphrase::BlowfishCrypt->new(cost => 10,
salt => $salt,
passphrase => $plaintext)
},
},
);
$server->run; # or AE::cv->recv
Client
use AnyEvent::Task::Client;
my $client = AnyEvent::Task::Client->new(
connect => ['unix/', '/tmp/anyevent-task.socket'],
);
my $checkout = $client->checkout( timeout => 5, );
my $cv = AE::cv;
$checkout->hash('secret',
sub {
my ($checkout, $crypted) = @_;
SYNOPSIS 2: DBI
Server
use AnyEvent::Task::Server;
use DBI;
my $dbh;
AnyEvent::Task::Server->new(
name => 'dbi',
listen => ['unix/', '/tmp/anyevent-task.socket'],
setup => sub {
$dbh = DBI->connect("dbi:SQLite:dbname=/tmp/junk.sqlite3","","",{ RaiseError => 1, });
},
interface => sub {
my ($method, @args) = @_;
$dbh->$method(@args);
},
)->run;
Client
use AnyEvent::Task::Client;
my $client = AnyEvent::Task::Client->new(
connect => ['unix/', '/tmp/anyevent-task.socket'],
);
my $dbh = $client->checkout;
my $cv = AE::cv;
$dbh->do(q{ CREATE TABLE user(username TEXT PRIMARY KEY, email TEXT); },
sub { });
## Requests will queue up on the checkout and execute in order:
Since it's more of a bother than it's worth to run the server and the
client in the same process, there is an alternate server constructor,
"AnyEvent::Task::Server::fork_task_server" for when you'd like to fork a
dedicated server process. It can be passed the same arguments as the
regular "new" constructor:
## my ($keepalive_pipe, $server_pid) =
AnyEvent::Task::Server::fork_task_server(
name => 'hello',
listen => ['unix/', '/tmp/anyevent-task.socket'],
interface => sub {
return "Hello from PID $$";
},
);
The only differences between this and the regular constructor is that
"fork_task_server" will fork a process which becomes the server and will
also install a "keep-alive" pipe between the server and the client. This
keep-alive pipe will be used by the server to detect when its parent
(the client process) exits.
checkout.
In your server code, use AnyEvent::Task::Logger. It exports the function
"logger" which returns a Log::Defer object:
use AnyEvent::Task::Server;
use AnyEvent::Task::Logger;
AnyEvent::Task::Server->new(
name => 'sleeper',
listen => ['unix/', '/tmp/anyevent-task.socket'],
interface => sub {
logger->info('about to compute some operation');
{
my $timer = logger->timer('computing some operation');
select undef,undef,undef, 1; ## sleep for 1 second
}
},
)->run;
Note: Portable server code should never call "sleep" because on some
systems it will interfere with the recoverable worker timeout feature
implemented with "SIGALRM".
In your client code, pass a Log::Defer object in when you create a
checkout:
use AnyEvent::Task::Client;
use Log::Defer;
my $client = AnyEvent::Task::Client->new(
connect => ['unix/', '/tmp/anyevent-task.socket'],
);
my $log_defer_object = Log::Defer->new(sub {
my $msg = shift;
use Data::Dumper; ## or whatever
print Dumper($msg);
});
$log_defer_object->info('going to compute some operation in a worker');
SEE ALSO
The AnyEvent::Task github repo
<https://github.com/hoytech/AnyEvent-Task>
In order to handle exceptions in a meaningful way with this module, you
must use Callback::Frame. In order to maintain seamless request logging
across clients and workers, you should use Log::Defer.
There are many modules on CPAN similar to AnyEvent::Task.
This module is designed to be used in a non-blocking, process-based unix
program. Depending on your exact requirements you might find something
else useful: Parallel::ForkManager, Thread::Pool, or an HTTP server of
some kind.
If you're into AnyEvent, AnyEvent::DBI and AnyEvent::Worker (based on
AnyEvent::DBI), AnyEvent::ForkObject, and AnyEvent::Fork::RPC send and
receive commands from worker processes similar to this module.
AnyEvent::Worker::Pool also has an implementation of a worker pool.
AnyEvent::Gearman can interface with Gearman services.
lib/AnyEvent/Task.pm view on Meta::CPAN
=head1 SYNOPSIS 1: PASSWORD HASHING
=head2 Server
use AnyEvent::Task::Server;
use Authen::Passphrase::BlowfishCrypt;
my $dev_urandom;
my $server = AnyEvent::Task::Server->new(
name => 'passwd-hasher',
listen => ['unix/', '/tmp/anyevent-task.socket'],
setup => sub {
open($dev_urandom, "/dev/urandom") || die "open urandom: $!";
},
interface => {
hash => sub {
my ($plaintext) = @_;
read($dev_urandom, my $salt, 16) == 16 || die "bad read from urandom";
return Authen::Passphrase::BlowfishCrypt->new(cost => 10,
salt => $salt,
passphrase => $plaintext)
lib/AnyEvent/Task.pm view on Meta::CPAN
);
$server->run; # or AE::cv->recv
=head2 Client
use AnyEvent::Task::Client;
my $client = AnyEvent::Task::Client->new(
connect => ['unix/', '/tmp/anyevent-task.socket'],
);
my $checkout = $client->checkout( timeout => 5, );
my $cv = AE::cv;
$checkout->hash('secret',
sub {
my ($checkout, $crypted) = @_;
lib/AnyEvent/Task.pm view on Meta::CPAN
=head2 Server
use AnyEvent::Task::Server;
use DBI;
my $dbh;
AnyEvent::Task::Server->new(
name => 'dbi',
listen => ['unix/', '/tmp/anyevent-task.socket'],
setup => sub {
$dbh = DBI->connect("dbi:SQLite:dbname=/tmp/junk.sqlite3","","",{ RaiseError => 1, });
},
interface => sub {
my ($method, @args) = @_;
$dbh->$method(@args);
},
)->run;
=head2 Client
use AnyEvent::Task::Client;
my $client = AnyEvent::Task::Client->new(
connect => ['unix/', '/tmp/anyevent-task.socket'],
);
my $dbh = $client->checkout;
my $cv = AE::cv;
$dbh->do(q{ CREATE TABLE user(username TEXT PRIMARY KEY, email TEXT); },
sub { });
## Requests will queue up on the checkout and execute in order:
lib/AnyEvent/Task.pm view on Meta::CPAN
Typically you will want to start the client and server as completely separate processes as shown in the synopses.
Running the server and the client in the same process is technically possible but is highly discouraged since the server will C<fork()> when the client demands a new worker process. In this case, all descriptors in use by the client are duped into th...
Since it's more of a bother than it's worth to run the server and the client in the same process, there is an alternate server constructor, C<AnyEvent::Task::Server::fork_task_server> for when you'd like to fork a dedicated server process. It can be ...
## my ($keepalive_pipe, $server_pid) =
AnyEvent::Task::Server::fork_task_server(
name => 'hello',
listen => ['unix/', '/tmp/anyevent-task.socket'],
interface => sub {
return "Hello from PID $$";
},
);
The only differences between this and the regular constructor is that C<fork_task_server> will fork a process which becomes the server and will also install a "keep-alive" pipe between the server and the client. This keep-alive pipe will be used by t...
If C<AnyEvent::Task::Server::fork_task_server> is called in a void context then the reference to this keep-alive pipe is pushed onto C<@AnyEvent::Task::Server::children_sockets>. Otherwise, the keep-alive pipe and the server's PID are returned. Closi...
Since the C<fork_task_server> constructor calls fork and requires using AnyEvent in both the parent and child processes, it is important that you not install any AnyEvent watchers before calling it. The usual caveats about forking AnyEvent processes ...
lib/AnyEvent/Task.pm view on Meta::CPAN
A L<Log::Defer> object is created on demand in the worker process. Once the worker is done an operation, any messages in the object will be extracted and sent back to the client. The client then merges this into its main Log::Defer object that was pa...
In your server code, use L<AnyEvent::Task::Logger>. It exports the function C<logger> which returns a L<Log::Defer> object:
use AnyEvent::Task::Server;
use AnyEvent::Task::Logger;
AnyEvent::Task::Server->new(
name => 'sleeper',
listen => ['unix/', '/tmp/anyevent-task.socket'],
interface => sub {
logger->info('about to compute some operation');
{
my $timer = logger->timer('computing some operation');
select undef,undef,undef, 1; ## sleep for 1 second
}
},
)->run;
Note: Portable server code should never call C<sleep> because on some systems it will interfere with the recoverable worker timeout feature implemented with C<SIGALRM>.
In your client code, pass a L<Log::Defer> object in when you create a checkout:
use AnyEvent::Task::Client;
use Log::Defer;
my $client = AnyEvent::Task::Client->new(
connect => ['unix/', '/tmp/anyevent-task.socket'],
);
my $log_defer_object = Log::Defer->new(sub {
my $msg = shift;
use Data::Dumper; ## or whatever
print Dumper($msg);
});
$log_defer_object->info('going to compute some operation in a worker');
lib/AnyEvent/Task.pm view on Meta::CPAN
=head1 SEE ALSO
L<The AnyEvent::Task github repo|https://github.com/hoytech/AnyEvent-Task>
In order to handle exceptions in a meaningful way with this module, you must use L<Callback::Frame>. In order to maintain seamless request logging across clients and workers, you should use L<Log::Defer>.
There are many modules on CPAN similar to L<AnyEvent::Task>.
This module is designed to be used in a non-blocking, process-based unix program. Depending on your exact requirements you might find something else useful: L<Parallel::ForkManager>, L<Thread::Pool>, or an HTTP server of some kind.
If you're into AnyEvent, L<AnyEvent::DBI> and L<AnyEvent::Worker> (based on AnyEvent::DBI), L<AnyEvent::ForkObject>, and L<AnyEvent::Fork::RPC> send and receive commands from worker processes similar to this module. L<AnyEvent::Worker::Pool> also has...
If you're into POE there is L<POE::Component::Pool::DBI>, L<POEx::WorkerPool>, L<POE::Component::ResourcePool>, L<POE::Component::PreforkDispatch>, L<Cantella::Worker>.
=head1 BUGS
Although this module's interface is now stable and has been in production use for some time, there are few remaining TODO items (see the bottom of Task.pm).
lib/AnyEvent/Task.pm view on Meta::CPAN
TODO
! max checkout queue size
- start delivering fatal errors to some (at front of queue or back of queue though?)
- write test for this
! docs: write good error handling examples
Make names more consistent between callback::frame backtraces and auto-generated log::defer timers
make server not use AnyEvent so don't have to worry about workers unlinking unix socket in dtors
In a graceful shutdown scenario, servers wait() on all their children before terminating.
- Support relinquishing accept() socket during this period?
Document hung_worker_timeout and SIGALRM stuff
Wire protocol:
- Support something other than JSON? Sereal?
- Document the protocol?
lib/AnyEvent/Task/Server/Worker.pm view on Meta::CPAN
package AnyEvent::Task::Server::Worker;
use common::sense;
use AnyEvent::Util;
use Guard;
use POSIX; ## POSIX::_exit is used so we don't unlink the unix socket file created by our parent before the fork
use IO::Select;
use JSON::XS;
use Scalar::Util qw/blessed/;
my $setup_has_been_run;
my $json;
my $sel;
use Test::More tests => 16;
## The point of this test is to verify that arguments, errors, and
## return values are passed correctly between client and server.
AnyEvent::Task::Server::fork_task_server(
listen => ['unix/', '/tmp/anyevent-task-test.socket'],
interface => sub {
die "ERR: $_[1]" if $_[0] eq 'error';
return \@_;
},
);
my $client = AnyEvent::Task::Client->new(
connect => ['unix/', '/tmp/anyevent-task-test.socket'],
max_workers => 1,
name => 'MY CLIENT NAME',
);
my $cv = AE::cv;
{
$client->checkout->(1, [2], { three => 3, λ => 'ð' }, sub {
t/dont_refork_after_error.t view on Meta::CPAN
## worker process from being killed off after a checkout is released
## where the worker threw an error in its lifetime.
## Note that a checkout's methods can still be called after an error
## is thrown but before the checkout is released, perhaps to access
## error states or to rollback a transaction.
AnyEvent::Task::Server::fork_task_server(
listen => ['unix/', '/tmp/anyevent-task-test.socket'],
interface => {
get_pid => sub { return $$ },
throw => sub { my ($err) = @_; die $err; },
},
);
my $client = AnyEvent::Task::Client->new(
connect => ['unix/', '/tmp/anyevent-task-test.socket'],
max_workers => 1,
dont_refork_after_error => 1,
);
my $cv = AE::cv;
my $pid;
{
my $checkout = $client->checkout();
t/error-clears-checkout-queue.t view on Meta::CPAN
## The point of this test is to verify that method calls can queue
## up on a checkout and that if any errors are thrown by one of
## the queued methods, then all the other method calls are removed
## from the checkout's queue.
AnyEvent::Task::Server::fork_task_server(
listen => ['unix/', '/tmp/anyevent-task-test.socket'],
interface => {
die => sub { die "ouch"; },
success => sub { 1 },
},
);
my $client = AnyEvent::Task::Client->new(
connect => ['unix/', '/tmp/anyevent-task-test.socket'],
);
my $cv = AE::cv;
my $timeout_watcher = AE::timer 0.5, 0, sub {
$cv->send;
};
my $num_exceptions_caught = 0;
## If log_defer_object is passed in when creating a checkout:
## 1) The server can add log messages, timers, data, etc to
## this object by using the AnyEvent::Task::Logger::logger
## 2) Every time a request is placed onto a checkout, a timer
## is started in this object and it is ended once the
## request is fulfilled.
AnyEvent::Task::Server::fork_task_server(
listen => ['unix/', '/tmp/anyevent-task-test.socket'],
interface => { normal =>
sub {
logger->info("hello from", $$);
logger->timer("junk");
1;
},
error =>
sub {
logger->warn("something weird happened");
die "uh oh";
},
sleep =>
sub { select undef,undef,undef,shift; },
},
);
my $client = AnyEvent::Task::Client->new(
connect => ['unix/', '/tmp/anyevent-task-test.socket'],
);
my $cv = AE::cv;
my $log_defer_object = Log::Defer->new(sub {
my $msg = shift;
is($msg->{logs}->[0]->[2], 'hello from', 'message from client');
t/manual-request-abort.t view on Meta::CPAN
## The point of this test is to verify that fatal errors cut off
## the worker and permanently disable the checkout. If methods are
## called again on the checkout they will continue to throw the
## fatal error.
AnyEvent::Task::Server::fork_task_server(
listen => ['unix/', '/tmp/anyevent-task-test.socket'],
interface => {
sleep_die => sub {
select undef, undef, undef, 1;
die "shouldn't get here";
},
get_pid => sub { $$ },
},
);
my $client = AnyEvent::Task::Client->new(
connect => ['unix/', '/tmp/anyevent-task-test.socket'],
);
my $cv = AE::cv;
my $checkout = $client->checkout( timeout => 1, );
$checkout->sleep_die(frame(code => sub {
die "checkout was serviced?";
}, catch => sub {
t/max_checkouts.t view on Meta::CPAN
use List::Util;
use AnyEvent::Util;
use AnyEvent::Task::Server;
use AnyEvent::Task::Client;
use Test::More tests => 4;
## The point of this test is to verify that workers are restarted
## after they handle max_checkouts checkouts, and that the unix
## socket isn't unlinked after the worker terminates due to
## max_checkouts.
AnyEvent::Task::Server::fork_task_server(
listen => ['unix/', '/tmp/anyevent-task-test.socket'],
interface => sub {
return $$;
},
);
my $client = AnyEvent::Task::Client->new(
connect => ['unix/', '/tmp/anyevent-task-test.socket'],
max_workers => 1,
max_checkouts => 2,
);
my $cv = AE::cv;
my $pid;
my $timeout_watcher = AE::timer 1.0, 0, sub {
t/sequential.t view on Meta::CPAN
use Test::More tests => 7;
## The point of this test is to ensure that client requests are queued as per
## the design. In order to simplify, we set max_workers to 1 so that at most
## one checkout will be active at any given time.
AnyEvent::Task::Server::fork_task_server(
listen => ['unix/', '/tmp/anyevent-task-test.socket'],
interface => sub {
return $$;
},
);
my $client = AnyEvent::Task::Client->new(
connect => ['unix/', '/tmp/anyevent-task-test.socket'],
max_workers => 1,
);
my $cv = AE::cv;
my $pid;
my $counter = 0;
{
t/setup-errors.t view on Meta::CPAN
## The point of this test is to verify that exceptions thrown in
## setup callbacks are propagated to the client. It also validates
## that by default workers are restarted on setup errors.
my $attempt = 0;
AnyEvent::Task::Server::fork_task_server(
listen => ['unix/', '/tmp/anyevent-task-test.socket'],
setup => sub {
$attempt++;
die "SETUP EXCEPTION $attempt";
},
interface => sub {
die "INTERFACE EXCEPTION (shouldn't happen)";
},
);
my $client = AnyEvent::Task::Client->new(
connect => ['unix/', '/tmp/anyevent-task-test.socket'],
max_workers => 1,
);
my $cv = AE::cv;
{
$client->checkout->(frame(code => sub {
die "should never get here";
}, catch => sub {
## The point of this test is to verify that the setup feature can
## initialize a worker's environment before requests are handled,
## and that this initialization only runs once per worker process.
my $counter;
AnyEvent::Task::Server::fork_task_server(
listen => ['unix/', '/tmp/anyevent-task-test.socket'],
setup => sub {
$counter = 100;
},
interface => sub {
$counter++;
return $counter;
},
);
my $client = AnyEvent::Task::Client->new(
connect => ['unix/', '/tmp/anyevent-task-test.socket'],
max_workers => 1,
);
my $cv = AE::cv;
{
$client->checkout->(sub {
my ($checkout, $res) = @_;
use Test::More tests => 102;
## The point of this test is to ensure that even if we make many more
## checkouts than there are max workers, all of them are eventually
## still serviced.
AnyEvent::Task::Server::fork_task_server(
listen => ['unix/', '/tmp/anyevent-task-test.socket'],
interface => sub {
my $i = shift;
return $i;
},
);
my $client = AnyEvent::Task::Client->new(
connect => ['unix/', '/tmp/anyevent-task-test.socket'],
max_workers => 5,
);
my $cv = AE::cv;
my $counter = 0;
for my $i (1 .. 100) {
$client->checkout->($i, sub {
t/timeout-connect.t view on Meta::CPAN
use Test::More tests => 1;
## The point of this test is to ensure that checkouts are timed out
## when the client is unable to connect to the server at all.
my $client = AnyEvent::Task::Client->new(
connect => ['unix/', '/tmp/anyevent-task-test-non-existent.socket'],
);
my $cv = AE::cv;
{
my $checkout = $client->checkout( timeout => 0.2, );
$checkout->(frame(code => sub {
ok(0, "checkout was serviced?");
t/timeout-hung-worker.t view on Meta::CPAN
## The point of this test is to ensure that if a worker is "hung" on some
## operation, it will eventually die off. Since this is implemented with
## SIGALRM/alarm we have to hang for at least a second which makes this
## test slow :(
AnyEvent::Task::Server::fork_task_server(
listen => ['unix/', '/tmp/anyevent-task-test.socket'],
interface => sub {
select undef, undef, undef, 3; # can't use sleep() because sleep might use alarm
die "shouldn't get here";
},
hung_worker_timeout => 1, ## can't be a float because we use alarm()
);
my $client = AnyEvent::Task::Client->new(
connect => ['unix/', '/tmp/anyevent-task-test.socket'],
);
my $cv = AE::cv;
{
my $checkout = $client->checkout( timeout => 2, );
$checkout->(frame(code => sub {
die "checkout was serviced?";
t/timeout-in-progress.t view on Meta::CPAN
use AnyEvent::Task::Client;
use Test::More tests => 2;
## The point of this test is to ensure that checkouts are timed out
## when the worker process takes too long.
AnyEvent::Task::Server::fork_task_server(
listen => ['unix/', '/tmp/anyevent-task-test.socket'],
interface => sub {
select undef, undef, undef, 0.4;
die "shouldn't get here";
},
);
my $client = AnyEvent::Task::Client->new(
connect => ['unix/', '/tmp/anyevent-task-test.socket'],
);
my $cv = AE::cv;
{
my $checkout = $client->checkout( timeout => 0.2, );
$checkout->(frame(code => sub {
die "checkout was serviced?";
t/timeout-log-defer.t view on Meta::CPAN
use Test::More tests => 3;
## The point of this test is to verify that if a timeout error is thrown
## from a checkout with a log_defer_object then a reference to the Log::Defer
## object is not kept alive by the cmd_handler closure of the checkout. This was
## a bug in AE::T 0.802.
AnyEvent::Task::Server::fork_task_server(
listen => ['unix/', '/tmp/anyevent-task-test.socket'],
interface => sub {
select undef, undef, undef, 0.4;
die "shouldn't get here";
},
);
my $client = AnyEvent::Task::Client->new(
connect => ['unix/', '/tmp/anyevent-task-test.socket'],
);
my $error_thrown = 0;
my $cv = AE::cv;
{
my $ld = Log::Defer->new(sub {
ok($error_thrown, 'log defer obj destroyed after error handler ran');
t/worker-_exits.t view on Meta::CPAN
## The point of this test is to ensure that when a worker connection dies
## suddenly, ie with POSIX::_exit(), an appropriate error is promptly raised
## in the callback's dynamic environment. This test also verifies that
## fatal errors like losing a worker connection put a checkout into
## a permanent error state that will always return the same fatal
## error message.
AnyEvent::Task::Server::fork_task_server(
listen => ['unix/', '/tmp/anyevent-task-test.socket'],
interface => sub {
select undef, undef, undef, 0.1;
POSIX::_exit(1);
die "shouldn't get here";
},
);
my $client = AnyEvent::Task::Client->new(
connect => ['unix/', '/tmp/anyevent-task-test.socket'],
);
my $cv = AE::cv;
{
my $checkout = $client->checkout( timeout => 1, );
$checkout->(frame(code => sub {
die "checkout was serviced?";