AnyEvent-Task
view release on metacpan or search on metacpan
checkout available lexically.
In the event of an exception thrown by the worker process, a timeout, or
some other unexpected condition, an error is raised in the dynamic
context of the callback (see the "ERROR HANDLING" section).
DESIGN
Both client and server are of course built with AnyEvent. However,
workers can't use AnyEvent (yet). I've never found a need to do event
processing in the worker since if the library you wish to use is already
AnyEvent-compatible you can simply use the library in the client
process. If the client process is too over-loaded, it may make sense to
run multiple client processes.
Each client maintains a "pool" of connections to worker processes. Every
time a checkout is requested, the request is placed into a first-come,
first-serve queue. Once a worker process becomes available, it is
associated with that checkout until that checkout is garbage collected
which in perl means as soon as it is no longer needed. Each checkout
also maintains a queue of requested method-calls so that as soon as a
worker process is allocated to a checkout, any queued method calls are
filled in order.
"timeout" can be passed as a keyword argument to "checkout". Once a
request is queued up on that checkout, a timer of "timout" seconds
(default is 30, undef means infinity) is started. If the request
completes during this timeframe, the timer is cancelled. If the timer
expires, the worker connection is terminated and an exception is thrown
in the dynamic context of the callback (see the "ERROR HANDLING"
section).
Note that since timeouts are associated with a checkout, checkouts can
be created before the server is started. As long as the server is
running within "timeout" seconds, no error will be thrown and no
requests will be lost. The client will continually try to acquire worker
processes until a server is available, and once one is available it will
attempt to allocate all queued checkouts.
Because of checkout queuing, the maximum number of worker processes a
client will attempt to obtain can be limited with the "max_workers"
argument when creating a client object. If there are more live checkouts
than "max_workers", the remaining checkouts will have to wait until one
of the other workers becomes available. Because of timeouts, some
checkouts may never be serviced if the system can't handle the load (the
timeout error should be handled to indicate the service is temporarily
unavailable).
The "min_workers" argument determines how many "hot-standby" workers
should be pre-forked when creating the client. The default is 2 though
note that this may change to 0 in the future.
STARTING THE SERVER
Typically you will want to start the client and server as completely
separate processes as shown in the synopses.
Running the server and the client in the same process is technically
possible but is highly discouraged since the server will "fork()" when
the client demands a new worker process. In this case, all descriptors
in use by the client are duped into the worker process and the worker
ought to close these extra descriptors. Also, forking a busy client may
be memory-inefficient (and dangerous if it uses threads).
Since it's more of a bother than it's worth to run the server and the
client in the same process, there is an alternate server constructor,
"AnyEvent::Task::Server::fork_task_server" for when you'd like to fork a
dedicated server process. It can be passed the same arguments as the
regular "new" constructor:
## my ($keepalive_pipe, $server_pid) =
AnyEvent::Task::Server::fork_task_server(
name => 'hello',
listen => ['unix/', '/tmp/anyevent-task.socket'],
interface => sub {
return "Hello from PID $$";
},
);
The only differences between this and the regular constructor is that
"fork_task_server" will fork a process which becomes the server and will
also install a "keep-alive" pipe between the server and the client. This
keep-alive pipe will be used by the server to detect when its parent
(the client process) exits.
If "AnyEvent::Task::Server::fork_task_server" is called in a void
context then the reference to this keep-alive pipe is pushed onto
@AnyEvent::Task::Server::children_sockets. Otherwise, the keep-alive
pipe and the server's PID are returned. Closing the pipe will terminate
the server gracefully. "kill" the PID to terminate it immediately. Note
that even when the server is shutdown, existing worker processes and
checkouts may still be active in the client. The client object and all
checkout objects should be destroyed if you wish to ensure all workers
are shutdown.
Since the "fork_task_server" constructor calls fork and requires using
AnyEvent in both the parent and child processes, it is important that
you not install any AnyEvent watchers before calling it. The usual
caveats about forking AnyEvent processes apply (see the AnyEvent docs).
You should also not call "fork_task_server" after having started threads
since, again, this function calls fork. Forking a threaded process is
dangerous because the threads might have userspace data-structures in
inconsistent states at the time of the fork.
INTERFACE
When creating a server, there are two possible formats for the
"interface" option. The first and most general is a coderef. This
coderef will be passed the list of arguments that were sent when the
checkout was called in the client process (without the trailing callback
of course).
As described above, you can use a checkout object as a coderef or as an
object with methods. If the checkout is invoked as an object, the method
name is prepended to the arguments passed to "interface":
interface => sub {
my ($method, @args) = @_;
},
If the checkout is invoked as a coderef, method is omitted:
interface => sub {
my (@args) = @_;
},
The second format possible for "interface" is a hash ref. This is a
simple method dispatch feature where the method invoked on the checkout
object is the key used to lookup which coderef to run in the worker:
interface => {
method1 => sub {
my (@args) = @_;
},
method2 => sub {
my (@args) = @_;
},
},
Note that since the protocol between the client and the worker process
is currently JSON-based, all arguments and return values must be
serializable to JSON. This includes most perl scalars like strings, a
limited range of numerical types, and hash/list constructs with no
cyclical references.
Because there isn't any way for the callback to indicate the context it
desires, interface subs are always called in scalar context.
A future backwards compatible RPC protocol may use Sereal. Although it's
inefficient you can already serialise an object with Sereal manually,
send the resulting string over the existing protocol, and then
deserialise it in the worker.
LOGGING
Because workers run in a separate process, they can't directly use
logging contexts in the client process. That is why this module is
integrated with Log::Defer.
A Log::Defer object is created on demand in the worker process. Once the
worker is done an operation, any messages in the object will be
extracted and sent back to the client. The client then merges this into
its main Log::Defer object that was passed in when creating the
checkout.
( run in 2.497 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )