AnyEvent-Task

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN

NAME
    AnyEvent::Task - Client/server-based asynchronous worker pool

SYNOPSIS 1: PASSWORD HASHING
  Server
        use AnyEvent::Task::Server;
        use Authen::Passphrase::BlowfishCrypt;

        my $dev_urandom;
        my $server = AnyEvent::Task::Server->new(
                       name => 'passwd-hasher',
                       listen => ['unix/', '/tmp/anyevent-task.socket'],
                       setup => sub {
                         open($dev_urandom, "/dev/urandom") || die "open urandom: $!";
                       },
                       interface => {
                         hash => sub {
                           my ($plaintext) = @_;
                           read($dev_urandom, my $salt, 16) == 16 || die "bad read from urandom";
                           return Authen::Passphrase::BlowfishCrypt->new(cost => 10,
                                                                         salt => $salt,
                                                                         passphrase => $plaintext)
                                                                   ->as_crypt;

                         },
                         verify => sub {
                           my ($crypted, $plaintext) = @_;
                           return Authen::Passphrase::BlowfishCrypt->from_crypt($crypted)
                                                                   ->match($plaintext);
                         },
                       },
                     );

        $server->run; # or AE::cv->recv

  Client
        use AnyEvent::Task::Client;

        my $client = AnyEvent::Task::Client->new(
                       connect => ['unix/', '/tmp/anyevent-task.socket'],
                     );

        my $checkout = $client->checkout( timeout => 5, );

        my $cv = AE::cv;

        $checkout->hash('secret',
          sub {
            my ($checkout, $crypted) = @_;

            print "Hashed password is $crypted\n";

            $checkout->verify($crypted, 'secret',
              sub {
                my ($checkout, $result) = @_;
                print "Verify result is $result\n";
                $cv->send;
              });
          });

        $cv->recv;

  Output
        Hashed password is $2a$10$NwTOwxmTlG0Lk8YZMT29/uysC9RiZX4jtWCx.deBbb2evRjCq6ovi
        Verify result is 1

SYNOPSIS 2: DBI
  Server
        use AnyEvent::Task::Server;
        use DBI;

        my $dbh;

        AnyEvent::Task::Server->new(
          name => 'dbi',
          listen => ['unix/', '/tmp/anyevent-task.socket'],
          setup => sub {
            $dbh = DBI->connect("dbi:SQLite:dbname=/tmp/junk.sqlite3","","",{ RaiseError => 1, });
          },
          interface => sub {
            my ($method, @args) = @_;
            $dbh->$method(@args);
          },
        )->run;

  Client
        use AnyEvent::Task::Client;

        my $client = AnyEvent::Task::Client->new(
                       connect => ['unix/', '/tmp/anyevent-task.socket'],
                     );

        my $dbh = $client->checkout;

        my $cv = AE::cv;

        $dbh->do(q{ CREATE TABLE user(username TEXT PRIMARY KEY, email TEXT); },
          sub { });

        ## Requests will queue up on the checkout and execute in order:

        $dbh->do(q{ INSERT INTO user (username, email) VALUES (?, ?) },
                 undef, 'jimmy',
                        'jimmy@example.com',
          sub { });

        $dbh->selectrow_hashref(q{ SELECT * FROM user }, sub {
          my ($dbh, $user) = @_;
          print "username: $user->{username}, email: $user->{email}\n";
          $cv->send;
        });

        $cv->recv;

  Output
        username: jimmy, email: jimmy@example.com

DESCRIPTION
    The synopses make this module look much more complicated than it
    actually is. In a nutshell, a synchronous worker process is forked off
    by a server whenever a client asks for one. The client keeps as many of
    these workers around as it wants and delegates tasks to them
    asynchronously.

README  view on Meta::CPAN

                                                 use Data::Dumper; ## or whatever
                                                 print Dumper($msg);
                                               });

        $log_defer_object->info('going to compute some operation in a worker');

        my $checkout = $client->checkout(log_defer_object => $log_defer_object);

        my $cv = AE::cv;

        $checkout->(sub {
          $log_defer_object->info('finished some operation');
          $cv->send;
        });

        $cv->recv;

    When run, the above client will print something like this:

        $VAR1 = {
              'start' => '1363232705.96839',
              'end' => '1.027309',
              'logs' => [
                          [
                            '0.000179',
                            30,
                            'going to compute some operation in a worker'
                          ],
                          [
                            '0.023881061050415',
                            30,
                            'about to compute some operation'
                          ],
                          [
                            '1.025965',
                            30,
                            'finished some operation'
                          ]
                        ],
              'timers' => {
                            'computing some operation' => [
                                                            '0.024089061050415',
                                                            '1.02470206105041'
                                                          ]
                          }
            };

ERROR HANDLING
    In a synchronous program, if you expected some operation to throw an
    exception you might wrap it in "eval" like this:

        my $crypted;

        eval {
          $crypted = hash('secret');
        };

        if ($@) {
          say "hash failed: $@";
        } else {
          say "hashed password is $crypted";
        }

    But in an asynchronous program, typically "hash" would initiate some
    kind of asynchronous operation and then return immediately, allowing the
    program to go about other tasks while waiting for the result. Since the
    error might come back at any time in the future, the program needs a way
    to map the exception that is thrown back to the original context.

    AnyEvent::Task accomplishes this mapping with Callback::Frame.

    Callback::Frame lets you preserve error handlers (and "local" variables)
    across asynchronous callbacks. Callback::Frame is not tied to
    AnyEvent::Task, AnyEvent or any other async framework and can be used
    with almost all callback-based libraries.

    However, when using AnyEvent::Task, libraries that you use in the client
    must be AnyEvent compatible. This restriction obviously does not apply
    to your server code, that being the main purpose of this module:
    accessing blocking resources from an asynchronous program. In your
    server code, when there is an error condition you should simply "die" or
    "croak" as in a synchronous program.

    As an example usage of Callback::Frame, here is how we would handle
    errors thrown from a worker process running the "hash" method in an
    asychronous client program:

        use Callback::Frame;

        frame(code => sub {

          $client->checkout->hash('secret', sub {
            my ($checkout, $crypted) = @_;
            say "Hashed password is $crypted";
          });

        }, catch => sub {

          my $back_trace = shift;
          say "Error is: $@";
          say "Full back-trace: $back_trace";

        })->(); ## <-- frame is created and then immediately executed

    Of course if "hash" is something like a bcrypt hash function it is
    unlikely to raise an exception so maybe that's a bad example. On the
    other hand, maybe it's a really good example: In addition to errors that
    occur while running your callbacks, AnyEvent::Task uses Callback::Frame
    to throw errors if the worker process times out, so if the bcrypt "cost"
    is really cranked up it might hit the default 30 second time limit.

  Rationale for Callback::Frame
    Why not just call the callback but set $@ and indicate an error has
    occurred? This is the approach taken with AnyEvent::DBI for example. I
    believe the Callback::Frame interface is superior to this method. In a
    synchronous program, exceptions are out-of-band messages and code
    doesn't need to locally handle them. It can let them "bubble up" the
    stack, perhaps to a top-level error handler. Invoking the callback when
    an error occurs forces exceptions to be handled in-band.

    How about having AnyEvent::Task expose an error callback? This is the
    approach taken by AnyEvent::Handle for example. I believe
    Callback::Frame is superior to this method also. Although separate
    callbacks are (sort of) out-of-band, you still have to write error
    handler callbacks and do something relevant locally instead of allowing
    the exception to bubble up to an error handler.

    In servers, Callback::Frame helps you maintain the "dynamic state"
    (error handlers and dynamic variables) installed for a single
    connection. In other words, any errors that occur while servicing that
    connection will be able to be caught by an error handler specific to
    that connection. This lets you send an error response to the client and
    collect associated log messages in a Log::Defer object specific to that
    connection.

    Callback::Frame provides an error handler stack so you can have a
    top-level handler as well as nested handlers (similar to nested
    "eval"s). This is useful when you wish to have a top-level "bail-out"
    error handler and also nested error handlers that know how to retry or
    recover from an error in an async sub-operation.

    Callback::Frame is designed to be easily used with callback-based
    libraries that don't know about Callback::Frame. "fub" is a shortcut for
    "frame" with just the "code" argument. Instead of passing "sub { ... }"
    into libraries you can pass in "fub { ... }". When invoked, this wrapped
    callback will first re-establish any error handlers that you installed
    with "frame" and then run your provided code. Libraries that force
    in-band error signalling can be handled with callbacks such as "fub {
    die $@ if $@; ... }". Separate error callbacks should simply be "fub {
    die "failed becase ..." }".

    It's important that all callbacks be created with "fub" (or "frame")
    even if you don't expect them to fail so that the dynamic context is
    preserved for nested callbacks that may. An exception is the callbacks



( run in 1.536 second using v1.01-cache-2.11-cpan-39bf76dae61 )