AnyEvent-Task

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN

    A server is started with "AnyEvent::Task::Server->new". This constructor
    should be passed in at least the "listen" and "interface" arguments.
    Keep the returned server object around for as long as you want the
    server to be running. "listen" is an array ref containing the host and
    service options to be passed to AnyEvent::Socket's "tcp_server"
    function. "interface" is the code that should handle each request. See
    the INTERFACE section below for its specification. If a "name" parameter
    is set, it will be used to set the process name so you can see which
    processes are which when you run "ps". A "setup" coderef can be passed
    in to run some code after a new worker is forked. A "checkout_done"
    coderef can be passed in to run some code whenever a checkout is
    released in order to perform any required clean-up.

    A client is started with "AnyEvent::Task::Client->new". You only need to
    pass "connect" to this constructor which is an array ref containing the
    host and service options to be passed to AnyEvent::Socket's
    "tcp_connect". Keep the returned client object around as long as you
    wish the client to be connected.

    After the server and client are initialised, each process must enter
    AnyEvent's "main loop" in some way, possibly just "AE::cv->recv". The
    "run" method on the server object is a convenient short-cut for this.

    To acquire a worker process you call the "checkout" method on the client
    object. The "checkout" method doesn't need any arguments, but several
    optional ones such as "timeout" are described below. As long as the
    checkout object is around, this checkout has exclusive access to the
    worker.

    The checkout object is an object that proxies its method calls to a
    worker process or a function that does the same. The arguments to this
    method/function are the arguments you wish to send to the worker process
    followed by a callback to run when the operation completes. The callback
    will be passed two arguments: the original checkout object and the value
    returned by the worker process. The checkout object is passed into the
    callback as a convenience just in case you no longer have the original
    checkout available lexically.

    In the event of an exception thrown by the worker process, a timeout, or
    some other unexpected condition, an error is raised in the dynamic
    context of the callback (see the "ERROR HANDLING" section).

DESIGN
    Both client and server are of course built with AnyEvent. However,
    workers can't use AnyEvent (yet). I've never found a need to do event
    processing in the worker since if the library you wish to use is already
    AnyEvent-compatible you can simply use the library in the client
    process. If the client process is too over-loaded, it may make sense to
    run multiple client processes.

    Each client maintains a "pool" of connections to worker processes. Every
    time a checkout is requested, the request is placed into a first-come,
    first-serve queue. Once a worker process becomes available, it is
    associated with that checkout until that checkout is garbage collected
    which in perl means as soon as it is no longer needed. Each checkout
    also maintains a queue of requested method-calls so that as soon as a
    worker process is allocated to a checkout, any queued method calls are
    filled in order.

    "timeout" can be passed as a keyword argument to "checkout". Once a
    request is queued up on that checkout, a timer of "timout" seconds
    (default is 30, undef means infinity) is started. If the request
    completes during this timeframe, the timer is cancelled. If the timer
    expires, the worker connection is terminated and an exception is thrown
    in the dynamic context of the callback (see the "ERROR HANDLING"
    section).

    Note that since timeouts are associated with a checkout, checkouts can
    be created before the server is started. As long as the server is
    running within "timeout" seconds, no error will be thrown and no
    requests will be lost. The client will continually try to acquire worker
    processes until a server is available, and once one is available it will
    attempt to allocate all queued checkouts.

    Because of checkout queuing, the maximum number of worker processes a
    client will attempt to obtain can be limited with the "max_workers"
    argument when creating a client object. If there are more live checkouts
    than "max_workers", the remaining checkouts will have to wait until one
    of the other workers becomes available. Because of timeouts, some
    checkouts may never be serviced if the system can't handle the load (the
    timeout error should be handled to indicate the service is temporarily
    unavailable).

    The "min_workers" argument determines how many "hot-standby" workers
    should be pre-forked when creating the client. The default is 2 though
    note that this may change to 0 in the future.

STARTING THE SERVER
    Typically you will want to start the client and server as completely
    separate processes as shown in the synopses.

    Running the server and the client in the same process is technically
    possible but is highly discouraged since the server will "fork()" when
    the client demands a new worker process. In this case, all descriptors
    in use by the client are duped into the worker process and the worker
    ought to close these extra descriptors. Also, forking a busy client may
    be memory-inefficient (and dangerous if it uses threads).

    Since it's more of a bother than it's worth to run the server and the
    client in the same process, there is an alternate server constructor,
    "AnyEvent::Task::Server::fork_task_server" for when you'd like to fork a
    dedicated server process. It can be passed the same arguments as the
    regular "new" constructor:

        ## my ($keepalive_pipe, $server_pid) =
        AnyEvent::Task::Server::fork_task_server(
          name => 'hello',
          listen => ['unix/', '/tmp/anyevent-task.socket'],
          interface => sub {
                             return "Hello from PID $$";
                           },
        );

    The only differences between this and the regular constructor is that
    "fork_task_server" will fork a process which becomes the server and will
    also install a "keep-alive" pipe between the server and the client. This
    keep-alive pipe will be used by the server to detect when its parent
    (the client process) exits.

    If "AnyEvent::Task::Server::fork_task_server" is called in a void
    context then the reference to this keep-alive pipe is pushed onto
    @AnyEvent::Task::Server::children_sockets. Otherwise, the keep-alive
    pipe and the server's PID are returned. Closing the pipe will terminate

README  view on Meta::CPAN

        interface => sub {
          my ($method, @args) = @_;
        },

    If the checkout is invoked as a coderef, method is omitted:

        interface => sub {
          my (@args) = @_;
        },

    The second format possible for "interface" is a hash ref. This is a
    simple method dispatch feature where the method invoked on the checkout
    object is the key used to lookup which coderef to run in the worker:

        interface => {
          method1 => sub {
            my (@args) = @_;
          },
          method2 => sub {
            my (@args) = @_;
          },
        },

    Note that since the protocol between the client and the worker process
    is currently JSON-based, all arguments and return values must be
    serializable to JSON. This includes most perl scalars like strings, a
    limited range of numerical types, and hash/list constructs with no
    cyclical references.

    Because there isn't any way for the callback to indicate the context it
    desires, interface subs are always called in scalar context.

    A future backwards compatible RPC protocol may use Sereal. Although it's
    inefficient you can already serialise an object with Sereal manually,
    send the resulting string over the existing protocol, and then
    deserialise it in the worker.

LOGGING
    Because workers run in a separate process, they can't directly use
    logging contexts in the client process. That is why this module is
    integrated with Log::Defer.

    A Log::Defer object is created on demand in the worker process. Once the
    worker is done an operation, any messages in the object will be
    extracted and sent back to the client. The client then merges this into
    its main Log::Defer object that was passed in when creating the
    checkout.

    In your server code, use AnyEvent::Task::Logger. It exports the function
    "logger" which returns a Log::Defer object:

        use AnyEvent::Task::Server;
        use AnyEvent::Task::Logger;

        AnyEvent::Task::Server->new(
          name => 'sleeper',
          listen => ['unix/', '/tmp/anyevent-task.socket'],
          interface => sub {
            logger->info('about to compute some operation');
            {
              my $timer = logger->timer('computing some operation');
              select undef,undef,undef, 1; ## sleep for 1 second
            }
          },
        )->run;

    Note: Portable server code should never call "sleep" because on some
    systems it will interfere with the recoverable worker timeout feature
    implemented with "SIGALRM".

    In your client code, pass a Log::Defer object in when you create a
    checkout:

        use AnyEvent::Task::Client;
        use Log::Defer;

        my $client = AnyEvent::Task::Client->new(
                       connect => ['unix/', '/tmp/anyevent-task.socket'],
                     );

        my $log_defer_object = Log::Defer->new(sub {
                                                 my $msg = shift;
                                                 use Data::Dumper; ## or whatever
                                                 print Dumper($msg);
                                               });

        $log_defer_object->info('going to compute some operation in a worker');

        my $checkout = $client->checkout(log_defer_object => $log_defer_object);

        my $cv = AE::cv;

        $checkout->(sub {
          $log_defer_object->info('finished some operation');
          $cv->send;
        });

        $cv->recv;

    When run, the above client will print something like this:

        $VAR1 = {
              'start' => '1363232705.96839',
              'end' => '1.027309',
              'logs' => [
                          [
                            '0.000179',
                            30,
                            'going to compute some operation in a worker'
                          ],
                          [
                            '0.023881061050415',
                            30,
                            'about to compute some operation'
                          ],
                          [
                            '1.025965',
                            30,
                            'finished some operation'
                          ]
                        ],
              'timers' => {
                            'computing some operation' => [
                                                            '0.024089061050415',
                                                            '1.02470206105041'
                                                          ]
                          }
            };

ERROR HANDLING
    In a synchronous program, if you expected some operation to throw an
    exception you might wrap it in "eval" like this:

        my $crypted;

        eval {
          $crypted = hash('secret');
        };

        if ($@) {
          say "hash failed: $@";
        } else {
          say "hashed password is $crypted";
        }

    But in an asynchronous program, typically "hash" would initiate some
    kind of asynchronous operation and then return immediately, allowing the
    program to go about other tasks while waiting for the result. Since the
    error might come back at any time in the future, the program needs a way
    to map the exception that is thrown back to the original context.

    AnyEvent::Task accomplishes this mapping with Callback::Frame.

    Callback::Frame lets you preserve error handlers (and "local" variables)
    across asynchronous callbacks. Callback::Frame is not tied to
    AnyEvent::Task, AnyEvent or any other async framework and can be used
    with almost all callback-based libraries.

    However, when using AnyEvent::Task, libraries that you use in the client
    must be AnyEvent compatible. This restriction obviously does not apply
    to your server code, that being the main purpose of this module:
    accessing blocking resources from an asynchronous program. In your
    server code, when there is an error condition you should simply "die" or
    "croak" as in a synchronous program.

    As an example usage of Callback::Frame, here is how we would handle
    errors thrown from a worker process running the "hash" method in an
    asychronous client program:

        use Callback::Frame;

        frame(code => sub {

          $client->checkout->hash('secret', sub {
            my ($checkout, $crypted) = @_;
            say "Hashed password is $crypted";
          });

        }, catch => sub {

          my $back_trace = shift;
          say "Error is: $@";



( run in 1.014 second using v1.01-cache-2.11-cpan-7e98afdb40f )