AnyEvent-Task
view release on metacpan or search on metacpan
lib/AnyEvent/Task.pm view on Meta::CPAN
my $dbh = $client->checkout;
my $cv = AE::cv;
$dbh->do(q{ CREATE TABLE user(username TEXT PRIMARY KEY, email TEXT); },
sub { });
## Requests will queue up on the checkout and execute in order:
$dbh->do(q{ INSERT INTO user (username, email) VALUES (?, ?) },
undef, 'jimmy',
'jimmy@example.com',
sub { });
$dbh->selectrow_hashref(q{ SELECT * FROM user }, sub {
my ($dbh, $user) = @_;
print "username: $user->{username}, email: $user->{email}\n";
$cv->send;
});
$cv->recv;
=head2 Output
username: jimmy, email: jimmy@example.com
=head1 DESCRIPTION
The synopses make this module look much more complicated than it actually is. In a nutshell, a synchronous worker process is forked off by a server whenever a client asks for one. The client keeps as many of these workers around as it wants and deleg...
Another way of saying that is that L<AnyEvent::Task> is a pre-fork-on-demand server (L<AnyEvent::Task::Server>) combined with a persistent worker-pooled client (L<AnyEvent::Task::Client>).
The examples in the synopses are complete stand-alone programs. Run the server in one window and the client in another. The server will remain running but the client will exit after printing its output. Typically the "client" programs would be embedd...
Note that the client examples don't implement error checking (see the L<ERROR HANDLING> section).
A server is started with C<< AnyEvent::Task::Server->new >>. This constructor should be passed in at least the C<listen> and C<interface> arguments. Keep the returned server object around for as long as you want the server to be running. C<listen> is...
A client is started with C<< AnyEvent::Task::Client->new >>. You only need to pass C<connect> to this constructor which is an array ref containing the host and service options to be passed to L<AnyEvent::Socket>'s C<tcp_connect>. Keep the returned cl...
After the server and client are initialised, each process must enter AnyEvent's "main loop" in some way, possibly just C<< AE::cv->recv >>. The C<run> method on the server object is a convenient short-cut for this.
To acquire a worker process you call the C<checkout> method on the client object. The C<checkout> method doesn't need any arguments, but several optional ones such as C<timeout> are described below. As long as the checkout object is around, this chec...
The checkout object is an object that proxies its method calls to a worker process or a function that does the same. The arguments to this method/function are the arguments you wish to send to the worker process followed by a callback to run when the...
In the event of an exception thrown by the worker process, a timeout, or some other unexpected condition, an error is raised in the dynamic context of the callback (see the L<ERROR HANDLING> section).
=head1 DESIGN
Both client and server are of course built with L<AnyEvent>. However, workers can't use AnyEvent (yet). I've never found a need to do event processing in the worker since if the library you wish to use is already AnyEvent-compatible you can simply us...
Each client maintains a "pool" of connections to worker processes. Every time a checkout is requested, the request is placed into a first-come, first-serve queue. Once a worker process becomes available, it is associated with that checkout until that...
C<timeout> can be passed as a keyword argument to C<checkout>. Once a request is queued up on that checkout, a timer of C<timout> seconds (default is 30, undef means infinity) is started. If the request completes during this timeframe, the timer is c...
Note that since timeouts are associated with a checkout, checkouts can be created before the server is started. As long as the server is running within C<timeout> seconds, no error will be thrown and no requests will be lost. The client will continua...
Because of checkout queuing, the maximum number of worker processes a client will attempt to obtain can be limited with the C<max_workers> argument when creating a client object. If there are more live checkouts than C<max_workers>, the remaining che...
The C<min_workers> argument determines how many "hot-standby" workers should be pre-forked when creating the client. The default is 2 though note that this may change to 0 in the future.
=head1 STARTING THE SERVER
Typically you will want to start the client and server as completely separate processes as shown in the synopses.
Running the server and the client in the same process is technically possible but is highly discouraged since the server will C<fork()> when the client demands a new worker process. In this case, all descriptors in use by the client are duped into th...
Since it's more of a bother than it's worth to run the server and the client in the same process, there is an alternate server constructor, C<AnyEvent::Task::Server::fork_task_server> for when you'd like to fork a dedicated server process. It can be ...
## my ($keepalive_pipe, $server_pid) =
AnyEvent::Task::Server::fork_task_server(
name => 'hello',
listen => ['unix/', '/tmp/anyevent-task.socket'],
interface => sub {
return "Hello from PID $$";
},
);
The only differences between this and the regular constructor is that C<fork_task_server> will fork a process which becomes the server and will also install a "keep-alive" pipe between the server and the client. This keep-alive pipe will be used by t...
If C<AnyEvent::Task::Server::fork_task_server> is called in a void context then the reference to this keep-alive pipe is pushed onto C<@AnyEvent::Task::Server::children_sockets>. Otherwise, the keep-alive pipe and the server's PID are returned. Closi...
Since the C<fork_task_server> constructor calls fork and requires using AnyEvent in both the parent and child processes, it is important that you not install any AnyEvent watchers before calling it. The usual caveats about forking AnyEvent processes ...
You should also not call C<fork_task_server> after having started threads since, again, this function calls fork. Forking a threaded process is dangerous because the threads might have userspace data-structures in inconsistent states at the time of t...
=head1 INTERFACE
When creating a server, there are two possible formats for the C<interface> option. The first and most general is a coderef. This coderef will be passed the list of arguments that were sent when the checkout was called in the client process (without ...
As described above, you can use a checkout object as a coderef or as an object with methods. If the checkout is invoked as an object, the method name is prepended to the arguments passed to C<interface>:
interface => sub {
my ($method, @args) = @_;
},
If the checkout is invoked as a coderef, method is omitted:
interface => sub {
my (@args) = @_;
},
The second format possible for C<interface> is a hash ref. This is a simple method dispatch feature where the method invoked on the checkout object is the key used to lookup which coderef to run in the worker:
interface => {
method1 => sub {
my (@args) = @_;
},
method2 => sub {
my (@args) = @_;
},
},
Note that since the protocol between the client and the worker process is currently JSON-based, all arguments and return values must be serializable to JSON. This includes most perl scalars like strings, a limited range of numerical types, and hash/l...
Because there isn't any way for the callback to indicate the context it desires, interface subs are always called in scalar context.
A future backwards compatible RPC protocol may use L<Sereal>. Although it's inefficient you can already serialise an object with Sereal manually, send the resulting string over the existing protocol, and then deserialise it in the worker.
=head1 LOGGING
Because workers run in a separate process, they can't directly use logging contexts in the client process. That is why this module is integrated with L<Log::Defer>.
A L<Log::Defer> object is created on demand in the worker process. Once the worker is done an operation, any messages in the object will be extracted and sent back to the client. The client then merges this into its main Log::Defer object that was pa...
In your server code, use L<AnyEvent::Task::Logger>. It exports the function C<logger> which returns a L<Log::Defer> object:
use AnyEvent::Task::Server;
use AnyEvent::Task::Logger;
AnyEvent::Task::Server->new(
name => 'sleeper',
listen => ['unix/', '/tmp/anyevent-task.socket'],
interface => sub {
logger->info('about to compute some operation');
{
my $timer = logger->timer('computing some operation');
select undef,undef,undef, 1; ## sleep for 1 second
}
},
)->run;
Note: Portable server code should never call C<sleep> because on some systems it will interfere with the recoverable worker timeout feature implemented with C<SIGALRM>.
In your client code, pass a L<Log::Defer> object in when you create a checkout:
use AnyEvent::Task::Client;
use Log::Defer;
my $client = AnyEvent::Task::Client->new(
connect => ['unix/', '/tmp/anyevent-task.socket'],
);
my $log_defer_object = Log::Defer->new(sub {
my $msg = shift;
use Data::Dumper; ## or whatever
print Dumper($msg);
});
$log_defer_object->info('going to compute some operation in a worker');
my $checkout = $client->checkout(log_defer_object => $log_defer_object);
my $cv = AE::cv;
$checkout->(sub {
$log_defer_object->info('finished some operation');
$cv->send;
});
$cv->recv;
When run, the above client will print something like this:
$VAR1 = {
'start' => '1363232705.96839',
'end' => '1.027309',
'logs' => [
[
'0.000179',
30,
'going to compute some operation in a worker'
],
[
'0.023881061050415',
30,
'about to compute some operation'
],
[
'1.025965',
30,
'finished some operation'
]
],
'timers' => {
'computing some operation' => [
'0.024089061050415',
'1.02470206105041'
]
}
};
=head1 ERROR HANDLING
In a synchronous program, if you expected some operation to throw an exception you might wrap it in C<eval> like this:
my $crypted;
eval {
$crypted = hash('secret');
};
if ($@) {
say "hash failed: $@";
} else {
say "hashed password is $crypted";
}
But in an asynchronous program, typically C<hash> would initiate some kind of asynchronous operation and then return immediately, allowing the program to go about other tasks while waiting for the result. Since the error might come back at any time i...
AnyEvent::Task accomplishes this mapping with L<Callback::Frame>.
Callback::Frame lets you preserve error handlers (and C<local> variables) across asynchronous callbacks. Callback::Frame is not tied to AnyEvent::Task, AnyEvent or any other async framework and can be used with almost all callback-based libraries.
However, when using AnyEvent::Task, libraries that you use in the client must be L<AnyEvent> compatible. This restriction obviously does not apply to your server code, that being the main purpose of this module: accessing blocking resources from an a...
As an example usage of Callback::Frame, here is how we would handle errors thrown from a worker process running the C<hash> method in an asychronous client program:
use Callback::Frame;
frame(code => sub {
$client->checkout->hash('secret', sub {
my ($checkout, $crypted) = @_;
say "Hashed password is $crypted";
});
}, catch => sub {
my $back_trace = shift;
say "Error is: $@";
say "Full back-trace: $back_trace";
})->(); ## <-- frame is created and then immediately executed
Of course if C<hash> is something like a bcrypt hash function it is unlikely to raise an exception so maybe that's a bad example. On the other hand, maybe it's a really good example: In addition to errors that occur while running your callbacks, L<An...
=head2 Rationale for Callback::Frame
Why not just call the callback but set C<$@> and indicate an error has occurred? This is the approach taken with L<AnyEvent::DBI> for example. I believe the L<Callback::Frame> interface is superior to this method. In a synchronous program, exceptions...
lib/AnyEvent/Task.pm view on Meta::CPAN
If you're into POE there is L<POE::Component::Pool::DBI>, L<POEx::WorkerPool>, L<POE::Component::ResourcePool>, L<POE::Component::PreforkDispatch>, L<Cantella::Worker>.
=head1 BUGS
Although this module's interface is now stable and has been in production use for some time, there are few remaining TODO items (see the bottom of Task.pm).
=head1 AUTHOR
Doug Hoyte, C<< <doug@hcsw.org> >>
=head1 COPYRIGHT & LICENSE
Copyright 2012-2017 Doug Hoyte.
This module is licensed under the same terms as perl itself.
=cut
__END__
PROTOCOL
Normal request:
client -> worker
['do', {META}, @ARGS]
<-
['ok', {META}, $RESULT]
OR
['er', {META}, $ERR_MSG]
Transaction done:
client -> worker
['dn', {META}]
TODO
! max checkout queue size
- start delivering fatal errors to some (at front of queue or back of queue though?)
- write test for this
! docs: write good error handling examples
Make names more consistent between callback::frame backtraces and auto-generated log::defer timers
make server not use AnyEvent so don't have to worry about workers unlinking unix socket in dtors
In a graceful shutdown scenario, servers wait() on all their children before terminating.
- Support relinquishing accept() socket during this period?
Document hung_worker_timeout and SIGALRM stuff
Wire protocol:
- Support something other than JSON? Sereal?
- Document the protocol?
need tests for the following features:
- checkout_done signal sent to worker to issue rollback or whatever
- recovering stuff off a worker after C<SIGALRM> timeout
( run in 0.571 second using v1.01-cache-2.11-cpan-1edf4fed603 )