AnyEvent-Task

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN

NAME
    AnyEvent::Task - Client/server-based asynchronous worker pool

SYNOPSIS 1: PASSWORD HASHING
  Server
        use AnyEvent::Task::Server;
        use Authen::Passphrase::BlowfishCrypt;

        my $dev_urandom;
        my $server = AnyEvent::Task::Server->new(
                       name => 'passwd-hasher',
                       listen => ['unix/', '/tmp/anyevent-task.socket'],
                       setup => sub {
                         open($dev_urandom, "/dev/urandom") || die "open urandom: $!";
                       },
                       interface => {
                         hash => sub {
                           my ($plaintext) = @_;
                           read($dev_urandom, my $salt, 16) == 16 || die "bad read from urandom";
                           return Authen::Passphrase::BlowfishCrypt->new(cost => 10,
                                                                         salt => $salt,
                                                                         passphrase => $plaintext)
                                                                   ->as_crypt;

                         },
                         verify => sub {
                           my ($crypted, $plaintext) = @_;
                           return Authen::Passphrase::BlowfishCrypt->from_crypt($crypted)
                                                                   ->match($plaintext);
                         },
                       },
                     );

        $server->run; # or AE::cv->recv

  Client
        use AnyEvent::Task::Client;

        my $client = AnyEvent::Task::Client->new(
                       connect => ['unix/', '/tmp/anyevent-task.socket'],
                     );

        my $checkout = $client->checkout( timeout => 5, );

        my $cv = AE::cv;

        $checkout->hash('secret',
          sub {
            my ($checkout, $crypted) = @_;

            print "Hashed password is $crypted\n";

            $checkout->verify($crypted, 'secret',
              sub {
                my ($checkout, $result) = @_;
                print "Verify result is $result\n";
                $cv->send;
              });
          });

        $cv->recv;

  Output
        Hashed password is $2a$10$NwTOwxmTlG0Lk8YZMT29/uysC9RiZX4jtWCx.deBbb2evRjCq6ovi
        Verify result is 1

SYNOPSIS 2: DBI
  Server
        use AnyEvent::Task::Server;
        use DBI;

        my $dbh;

        AnyEvent::Task::Server->new(
          name => 'dbi',
          listen => ['unix/', '/tmp/anyevent-task.socket'],
          setup => sub {
            $dbh = DBI->connect("dbi:SQLite:dbname=/tmp/junk.sqlite3","","",{ RaiseError => 1, });
          },
          interface => sub {
            my ($method, @args) = @_;
            $dbh->$method(@args);
          },
        )->run;

  Client
        use AnyEvent::Task::Client;

        my $client = AnyEvent::Task::Client->new(
                       connect => ['unix/', '/tmp/anyevent-task.socket'],
                     );

        my $dbh = $client->checkout;

        my $cv = AE::cv;

        $dbh->do(q{ CREATE TABLE user(username TEXT PRIMARY KEY, email TEXT); },
          sub { });

        ## Requests will queue up on the checkout and execute in order:

        $dbh->do(q{ INSERT INTO user (username, email) VALUES (?, ?) },
                 undef, 'jimmy',
                        'jimmy@example.com',
          sub { });

        $dbh->selectrow_hashref(q{ SELECT * FROM user }, sub {
          my ($dbh, $user) = @_;
          print "username: $user->{username}, email: $user->{email}\n";
          $cv->send;
        });

        $cv->recv;

  Output
        username: jimmy, email: jimmy@example.com

DESCRIPTION
    The synopses make this module look much more complicated than it
    actually is. In a nutshell, a synchronous worker process is forked off
    by a server whenever a client asks for one. The client keeps as many of
    these workers around as it wants and delegates tasks to them
    asynchronously.

    Another way of saying that is that AnyEvent::Task is a
    pre-fork-on-demand server (AnyEvent::Task::Server) combined with a
    persistent worker-pooled client (AnyEvent::Task::Client).

    The examples in the synopses are complete stand-alone programs. Run the
    server in one window and the client in another. The server will remain
    running but the client will exit after printing its output. Typically
    the "client" programs would be embedded in a server program such as a
    web-server.

    Note that the client examples don't implement error checking (see the
    "ERROR HANDLING" section).

    A server is started with "AnyEvent::Task::Server->new". This constructor
    should be passed in at least the "listen" and "interface" arguments.
    Keep the returned server object around for as long as you want the
    server to be running. "listen" is an array ref containing the host and
    service options to be passed to AnyEvent::Socket's "tcp_server"
    function. "interface" is the code that should handle each request. See
    the INTERFACE section below for its specification. If a "name" parameter
    is set, it will be used to set the process name so you can see which
    processes are which when you run "ps". A "setup" coderef can be passed
    in to run some code after a new worker is forked. A "checkout_done"
    coderef can be passed in to run some code whenever a checkout is
    released in order to perform any required clean-up.

    A client is started with "AnyEvent::Task::Client->new". You only need to
    pass "connect" to this constructor which is an array ref containing the
    host and service options to be passed to AnyEvent::Socket's
    "tcp_connect". Keep the returned client object around as long as you
    wish the client to be connected.

    After the server and client are initialised, each process must enter
    AnyEvent's "main loop" in some way, possibly just "AE::cv->recv". The
    "run" method on the server object is a convenient short-cut for this.

    To acquire a worker process you call the "checkout" method on the client
    object. The "checkout" method doesn't need any arguments, but several
    optional ones such as "timeout" are described below. As long as the
    checkout object is around, this checkout has exclusive access to the
    worker.

    The checkout object is an object that proxies its method calls to a
    worker process or a function that does the same. The arguments to this
    method/function are the arguments you wish to send to the worker process
    followed by a callback to run when the operation completes. The callback
    will be passed two arguments: the original checkout object and the value
    returned by the worker process. The checkout object is passed into the
    callback as a convenience just in case you no longer have the original
    checkout available lexically.

    In the event of an exception thrown by the worker process, a timeout, or
    some other unexpected condition, an error is raised in the dynamic
    context of the callback (see the "ERROR HANDLING" section).

DESIGN
    Both client and server are of course built with AnyEvent. However,
    workers can't use AnyEvent (yet). I've never found a need to do event
    processing in the worker since if the library you wish to use is already

README  view on Meta::CPAN

    In your client code, pass a Log::Defer object in when you create a
    checkout:

        use AnyEvent::Task::Client;
        use Log::Defer;

        my $client = AnyEvent::Task::Client->new(
                       connect => ['unix/', '/tmp/anyevent-task.socket'],
                     );

        my $log_defer_object = Log::Defer->new(sub {
                                                 my $msg = shift;
                                                 use Data::Dumper; ## or whatever
                                                 print Dumper($msg);
                                               });

        $log_defer_object->info('going to compute some operation in a worker');

        my $checkout = $client->checkout(log_defer_object => $log_defer_object);

        my $cv = AE::cv;

        $checkout->(sub {
          $log_defer_object->info('finished some operation');
          $cv->send;
        });

        $cv->recv;

    When run, the above client will print something like this:

        $VAR1 = {
              'start' => '1363232705.96839',
              'end' => '1.027309',
              'logs' => [
                          [
                            '0.000179',
                            30,
                            'going to compute some operation in a worker'
                          ],
                          [
                            '0.023881061050415',
                            30,
                            'about to compute some operation'
                          ],
                          [
                            '1.025965',
                            30,
                            'finished some operation'
                          ]
                        ],
              'timers' => {
                            'computing some operation' => [
                                                            '0.024089061050415',
                                                            '1.02470206105041'
                                                          ]
                          }
            };

ERROR HANDLING
    In a synchronous program, if you expected some operation to throw an
    exception you might wrap it in "eval" like this:

        my $crypted;

        eval {
          $crypted = hash('secret');
        };

        if ($@) {
          say "hash failed: $@";
        } else {
          say "hashed password is $crypted";
        }

    But in an asynchronous program, typically "hash" would initiate some
    kind of asynchronous operation and then return immediately, allowing the
    program to go about other tasks while waiting for the result. Since the
    error might come back at any time in the future, the program needs a way
    to map the exception that is thrown back to the original context.

    AnyEvent::Task accomplishes this mapping with Callback::Frame.

    Callback::Frame lets you preserve error handlers (and "local" variables)
    across asynchronous callbacks. Callback::Frame is not tied to
    AnyEvent::Task, AnyEvent or any other async framework and can be used
    with almost all callback-based libraries.

    However, when using AnyEvent::Task, libraries that you use in the client
    must be AnyEvent compatible. This restriction obviously does not apply
    to your server code, that being the main purpose of this module:
    accessing blocking resources from an asynchronous program. In your
    server code, when there is an error condition you should simply "die" or
    "croak" as in a synchronous program.

    As an example usage of Callback::Frame, here is how we would handle
    errors thrown from a worker process running the "hash" method in an
    asychronous client program:

        use Callback::Frame;

        frame(code => sub {

          $client->checkout->hash('secret', sub {
            my ($checkout, $crypted) = @_;
            say "Hashed password is $crypted";
          });

        }, catch => sub {

          my $back_trace = shift;
          say "Error is: $@";
          say "Full back-trace: $back_trace";

        })->(); ## <-- frame is created and then immediately executed

    Of course if "hash" is something like a bcrypt hash function it is
    unlikely to raise an exception so maybe that's a bad example. On the
    other hand, maybe it's a really good example: In addition to errors that
    occur while running your callbacks, AnyEvent::Task uses Callback::Frame
    to throw errors if the worker process times out, so if the bcrypt "cost"
    is really cranked up it might hit the default 30 second time limit.

  Rationale for Callback::Frame
    Why not just call the callback but set $@ and indicate an error has
    occurred? This is the approach taken with AnyEvent::DBI for example. I
    believe the Callback::Frame interface is superior to this method. In a
    synchronous program, exceptions are out-of-band messages and code
    doesn't need to locally handle them. It can let them "bubble up" the
    stack, perhaps to a top-level error handler. Invoking the callback when
    an error occurs forces exceptions to be handled in-band.

    How about having AnyEvent::Task expose an error callback? This is the
    approach taken by AnyEvent::Handle for example. I believe
    Callback::Frame is superior to this method also. Although separate
    callbacks are (sort of) out-of-band, you still have to write error
    handler callbacks and do something relevant locally instead of allowing
    the exception to bubble up to an error handler.

    In servers, Callback::Frame helps you maintain the "dynamic state"
    (error handlers and dynamic variables) installed for a single
    connection. In other words, any errors that occur while servicing that
    connection will be able to be caught by an error handler specific to
    that connection. This lets you send an error response to the client and
    collect associated log messages in a Log::Defer object specific to that
    connection.

    Callback::Frame provides an error handler stack so you can have a
    top-level handler as well as nested handlers (similar to nested
    "eval"s). This is useful when you wish to have a top-level "bail-out"
    error handler and also nested error handlers that know how to retry or
    recover from an error in an async sub-operation.

    Callback::Frame is designed to be easily used with callback-based
    libraries that don't know about Callback::Frame. "fub" is a shortcut for
    "frame" with just the "code" argument. Instead of passing "sub { ... }"
    into libraries you can pass in "fub { ... }". When invoked, this wrapped
    callback will first re-establish any error handlers that you installed
    with "frame" and then run your provided code. Libraries that force
    in-band error signalling can be handled with callbacks such as "fub {
    die $@ if $@; ... }". Separate error callbacks should simply be "fub {
    die "failed becase ..." }".

    It's important that all callbacks be created with "fub" (or "frame")
    even if you don't expect them to fail so that the dynamic context is
    preserved for nested callbacks that may. An exception is the callbacks
    provided to AnyEvent::Task checkouts: These are automatically wrapped in
    frames for you (although explicitly passing in fubs is fine too).

    The Callback::Frame documentation explains how this works in much more
    detail.

  Reforking of workers after errors
    If a worker throws an error, the client receives the error but the
    worker process stays running. As long as the client has a reference to
    the checkout (and as long as the exception wasn't "fatal" -- see below),
    it can still be used to communicate with that worker so you can access
    error states, rollback transactions, or do any sort of required
    clean-up.

    However, once the checkout object is destroyed, by default the worker
    will be shutdown instead of returning to the client's worker pool as in
    the normal case where no errors were thrown. This is a "safe-by-default"
    behaviour that may help in the event that an exception thrown by a
    worker leaves the worker process in a broken/inconsistent state for some
    reason (for example a DBI connection died). This can be overridden by
    setting the "dont_refork_after_error" option to 1 in the client
    constructor. This will only matter if errors are being thrown frequently

README  view on Meta::CPAN

    Another reason that a worker might not be returned to the worker pool is
    if it has been checked out "max_checkouts" times. If "max_checkouts" is
    specified as an argument to the Client constructor, then workers will be
    destroyed and reforked after being checked out this number of times.
    When not specified, workers are never re-forked for this reason. This
    parameter is useful for coping with libraries that leak memory or
    otherwise become slower/more resource-hungry over time.

COMPARISON WITH HTTP
    Why a custom protocol, client, and server? Can't we just use something
    like HTTP?

    It depends.

    AnyEvent::Task clients send discrete messages and receive ordered
    replies from workers, much like HTTP. The AnyEvent::Task protocol can be
    extended in a backwards-compatible manner like HTTP. AnyEvent::Task
    communication can be pipelined and possibly in the future even
    compressed like HTTP.

    The current AnyEvent::Task server obeys a very specific implementation
    policy: It is like a CGI server in that each process it forks is
    guaranteed to be handling only one connection at once so it can perform
    blocking operations without worrying about holding up other connections.

    But since a single process can handle many requests in a row without
    exiting, they are more like persistent FastCGI processes. The difference
    however is that while a client holds a checkout it is guaranteed an
    exclusive lock on that process (useful for supporting DB transactions
    for example). With a FastCGI server it is assumed that requests are
    stateless so you can't necessarily be sure you'll get the same process
    for two consecutive requests. In fact, if an error is thrown in the
    FastCGI handler you may never get the same process back again,
    preventing you from being able to recover from the error, retry, or at
    least collect process state for logging reasons.

    The fundamental difference between the AnyEvent::Task protocol and HTTP
    is that in AnyEvent::Task the client is the dominant protocol
    orchestrator whereas in HTTP it is the server.

    In AnyEvent::Task, the client manages the worker pool and the client
    decides if/when worker processes should terminate. In the normal case, a
    client will just return the worker to its worker pool. A worker is
    supposed to accept commands for as long as possible until the client
    dismisses it.

    The client decides the timeout for each checkout and different clients
    can have different timeouts while connecting to the same server.

    Client processes can be started and checkouts can be obtained before the
    server is even started. The client will continue trying to connect to
    the server to obtain worker processes until either the server starts or
    the checkout's timeout period lapses. As well as freeing you from having
    to start your services in the "right" order, this also means servers can
    be restarted without throwing any errors (aka "zero-downtime restarts").

    The client even decides how many minimum workers should be in the pool
    upon start-up and how many maximum workers to acquire before checkout
    creation requests are queued. The server is really just a dumb
    fork-on-demand server and most of the sophistication is in the
    asynchronous client.

SEE ALSO
    The AnyEvent::Task github repo
    <https://github.com/hoytech/AnyEvent-Task>

    In order to handle exceptions in a meaningful way with this module, you
    must use Callback::Frame. In order to maintain seamless request logging
    across clients and workers, you should use Log::Defer.

    There are many modules on CPAN similar to AnyEvent::Task.

    This module is designed to be used in a non-blocking, process-based unix
    program. Depending on your exact requirements you might find something
    else useful: Parallel::ForkManager, Thread::Pool, or an HTTP server of
    some kind.

    If you're into AnyEvent, AnyEvent::DBI and AnyEvent::Worker (based on
    AnyEvent::DBI), AnyEvent::ForkObject, and AnyEvent::Fork::RPC send and
    receive commands from worker processes similar to this module.
    AnyEvent::Worker::Pool also has an implementation of a worker pool.
    AnyEvent::Gearman can interface with Gearman services.

    If you're into POE there is POE::Component::Pool::DBI, POEx::WorkerPool,
    POE::Component::ResourcePool, POE::Component::PreforkDispatch,
    Cantella::Worker.

BUGS
    Although this module's interface is now stable and has been in
    production use for some time, there are few remaining TODO items (see
    the bottom of Task.pm).

AUTHOR
    Doug Hoyte, "<doug@hcsw.org>"

COPYRIGHT & LICENSE
    Copyright 2012-2017 Doug Hoyte.

    This module is licensed under the same terms as perl itself.



( run in 0.669 second using v1.01-cache-2.11-cpan-39bf76dae61 )