AnyEvent-Task

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN

NAME
    AnyEvent::Task - Client/server-based asynchronous worker pool

SYNOPSIS 1: PASSWORD HASHING
  Server
        use AnyEvent::Task::Server;
        use Authen::Passphrase::BlowfishCrypt;

        my $dev_urandom;
        my $server = AnyEvent::Task::Server->new(
                       name => 'passwd-hasher',
                       listen => ['unix/', '/tmp/anyevent-task.socket'],
                       setup => sub {
                         open($dev_urandom, "/dev/urandom") || die "open urandom: $!";
                       },
                       interface => {
                         hash => sub {
                           my ($plaintext) = @_;
                           read($dev_urandom, my $salt, 16) == 16 || die "bad read from urandom";
                           return Authen::Passphrase::BlowfishCrypt->new(cost => 10,
                                                                         salt => $salt,
                                                                         passphrase => $plaintext)
                                                                   ->as_crypt;

                         },
                         verify => sub {
                           my ($crypted, $plaintext) = @_;
                           return Authen::Passphrase::BlowfishCrypt->from_crypt($crypted)
                                                                   ->match($plaintext);
                         },
                       },
                     );

        $server->run; # or AE::cv->recv

  Client
        use AnyEvent::Task::Client;

        my $client = AnyEvent::Task::Client->new(
                       connect => ['unix/', '/tmp/anyevent-task.socket'],
                     );

        my $checkout = $client->checkout( timeout => 5, );

        my $cv = AE::cv;

        $checkout->hash('secret',
          sub {
            my ($checkout, $crypted) = @_;

            print "Hashed password is $crypted\n";

            $checkout->verify($crypted, 'secret',
              sub {
                my ($checkout, $result) = @_;
                print "Verify result is $result\n";
                $cv->send;
              });
          });

        $cv->recv;

  Output
        Hashed password is $2a$10$NwTOwxmTlG0Lk8YZMT29/uysC9RiZX4jtWCx.deBbb2evRjCq6ovi
        Verify result is 1

SYNOPSIS 2: DBI
  Server
        use AnyEvent::Task::Server;
        use DBI;

        my $dbh;

        AnyEvent::Task::Server->new(
          name => 'dbi',
          listen => ['unix/', '/tmp/anyevent-task.socket'],
          setup => sub {
            $dbh = DBI->connect("dbi:SQLite:dbname=/tmp/junk.sqlite3","","",{ RaiseError => 1, });
          },
          interface => sub {
            my ($method, @args) = @_;
            $dbh->$method(@args);
          },
        )->run;

  Client
        use AnyEvent::Task::Client;

        my $client = AnyEvent::Task::Client->new(
                       connect => ['unix/', '/tmp/anyevent-task.socket'],
                     );

        my $dbh = $client->checkout;

        my $cv = AE::cv;

        $dbh->do(q{ CREATE TABLE user(username TEXT PRIMARY KEY, email TEXT); },
          sub { });

        ## Requests will queue up on the checkout and execute in order:

        $dbh->do(q{ INSERT INTO user (username, email) VALUES (?, ?) },
                 undef, 'jimmy',
                        'jimmy@example.com',
          sub { });

        $dbh->selectrow_hashref(q{ SELECT * FROM user }, sub {
          my ($dbh, $user) = @_;
          print "username: $user->{username}, email: $user->{email}\n";
          $cv->send;
        });

        $cv->recv;

  Output
        username: jimmy, email: jimmy@example.com

DESCRIPTION
    The synopses make this module look much more complicated than it
    actually is. In a nutshell, a synchronous worker process is forked off
    by a server whenever a client asks for one. The client keeps as many of
    these workers around as it wants and delegates tasks to them
    asynchronously.

    Another way of saying that is that AnyEvent::Task is a
    pre-fork-on-demand server (AnyEvent::Task::Server) combined with a
    persistent worker-pooled client (AnyEvent::Task::Client).

    The examples in the synopses are complete stand-alone programs. Run the
    server in one window and the client in another. The server will remain
    running but the client will exit after printing its output. Typically
    the "client" programs would be embedded in a server program such as a
    web-server.

    Note that the client examples don't implement error checking (see the
    "ERROR HANDLING" section).

    A server is started with "AnyEvent::Task::Server->new". This constructor
    should be passed in at least the "listen" and "interface" arguments.
    Keep the returned server object around for as long as you want the
    server to be running. "listen" is an array ref containing the host and
    service options to be passed to AnyEvent::Socket's "tcp_server"
    function. "interface" is the code that should handle each request. See
    the INTERFACE section below for its specification. If a "name" parameter
    is set, it will be used to set the process name so you can see which
    processes are which when you run "ps". A "setup" coderef can be passed
    in to run some code after a new worker is forked. A "checkout_done"
    coderef can be passed in to run some code whenever a checkout is
    released in order to perform any required clean-up.

README  view on Meta::CPAN

    process. If the client process is too over-loaded, it may make sense to
    run multiple client processes.

    Each client maintains a "pool" of connections to worker processes. Every
    time a checkout is requested, the request is placed into a first-come,
    first-serve queue. Once a worker process becomes available, it is
    associated with that checkout until that checkout is garbage collected
    which in perl means as soon as it is no longer needed. Each checkout
    also maintains a queue of requested method-calls so that as soon as a
    worker process is allocated to a checkout, any queued method calls are
    filled in order.

    "timeout" can be passed as a keyword argument to "checkout". Once a
    request is queued up on that checkout, a timer of "timout" seconds
    (default is 30, undef means infinity) is started. If the request
    completes during this timeframe, the timer is cancelled. If the timer
    expires, the worker connection is terminated and an exception is thrown
    in the dynamic context of the callback (see the "ERROR HANDLING"
    section).

    Note that since timeouts are associated with a checkout, checkouts can
    be created before the server is started. As long as the server is
    running within "timeout" seconds, no error will be thrown and no
    requests will be lost. The client will continually try to acquire worker
    processes until a server is available, and once one is available it will
    attempt to allocate all queued checkouts.

    Because of checkout queuing, the maximum number of worker processes a
    client will attempt to obtain can be limited with the "max_workers"
    argument when creating a client object. If there are more live checkouts
    than "max_workers", the remaining checkouts will have to wait until one
    of the other workers becomes available. Because of timeouts, some
    checkouts may never be serviced if the system can't handle the load (the
    timeout error should be handled to indicate the service is temporarily
    unavailable).

    The "min_workers" argument determines how many "hot-standby" workers
    should be pre-forked when creating the client. The default is 2 though
    note that this may change to 0 in the future.

STARTING THE SERVER
    Typically you will want to start the client and server as completely
    separate processes as shown in the synopses.

    Running the server and the client in the same process is technically
    possible but is highly discouraged since the server will "fork()" when
    the client demands a new worker process. In this case, all descriptors
    in use by the client are duped into the worker process and the worker
    ought to close these extra descriptors. Also, forking a busy client may
    be memory-inefficient (and dangerous if it uses threads).

    Since it's more of a bother than it's worth to run the server and the
    client in the same process, there is an alternate server constructor,
    "AnyEvent::Task::Server::fork_task_server" for when you'd like to fork a
    dedicated server process. It can be passed the same arguments as the
    regular "new" constructor:

        ## my ($keepalive_pipe, $server_pid) =
        AnyEvent::Task::Server::fork_task_server(
          name => 'hello',
          listen => ['unix/', '/tmp/anyevent-task.socket'],
          interface => sub {
                             return "Hello from PID $$";
                           },
        );

    The only differences between this and the regular constructor is that
    "fork_task_server" will fork a process which becomes the server and will
    also install a "keep-alive" pipe between the server and the client. This
    keep-alive pipe will be used by the server to detect when its parent
    (the client process) exits.

    If "AnyEvent::Task::Server::fork_task_server" is called in a void
    context then the reference to this keep-alive pipe is pushed onto
    @AnyEvent::Task::Server::children_sockets. Otherwise, the keep-alive
    pipe and the server's PID are returned. Closing the pipe will terminate
    the server gracefully. "kill" the PID to terminate it immediately. Note
    that even when the server is shutdown, existing worker processes and
    checkouts may still be active in the client. The client object and all
    checkout objects should be destroyed if you wish to ensure all workers
    are shutdown.

    Since the "fork_task_server" constructor calls fork and requires using
    AnyEvent in both the parent and child processes, it is important that
    you not install any AnyEvent watchers before calling it. The usual
    caveats about forking AnyEvent processes apply (see the AnyEvent docs).

    You should also not call "fork_task_server" after having started threads
    since, again, this function calls fork. Forking a threaded process is
    dangerous because the threads might have userspace data-structures in
    inconsistent states at the time of the fork.

INTERFACE
    When creating a server, there are two possible formats for the
    "interface" option. The first and most general is a coderef. This
    coderef will be passed the list of arguments that were sent when the
    checkout was called in the client process (without the trailing callback
    of course).

    As described above, you can use a checkout object as a coderef or as an
    object with methods. If the checkout is invoked as an object, the method
    name is prepended to the arguments passed to "interface":

        interface => sub {
          my ($method, @args) = @_;
        },

    If the checkout is invoked as a coderef, method is omitted:

        interface => sub {
          my (@args) = @_;
        },

    The second format possible for "interface" is a hash ref. This is a
    simple method dispatch feature where the method invoked on the checkout
    object is the key used to lookup which coderef to run in the worker:

        interface => {
          method1 => sub {
            my (@args) = @_;
          },
          method2 => sub {
            my (@args) = @_;
          },
        },

    Note that since the protocol between the client and the worker process
    is currently JSON-based, all arguments and return values must be
    serializable to JSON. This includes most perl scalars like strings, a
    limited range of numerical types, and hash/list constructs with no
    cyclical references.

    Because there isn't any way for the callback to indicate the context it
    desires, interface subs are always called in scalar context.

    A future backwards compatible RPC protocol may use Sereal. Although it's
    inefficient you can already serialise an object with Sereal manually,
    send the resulting string over the existing protocol, and then
    deserialise it in the worker.

LOGGING
    Because workers run in a separate process, they can't directly use
    logging contexts in the client process. That is why this module is
    integrated with Log::Defer.

    A Log::Defer object is created on demand in the worker process. Once the
    worker is done an operation, any messages in the object will be
    extracted and sent back to the client. The client then merges this into
    its main Log::Defer object that was passed in when creating the
    checkout.

    In your server code, use AnyEvent::Task::Logger. It exports the function
    "logger" which returns a Log::Defer object:

        use AnyEvent::Task::Server;
        use AnyEvent::Task::Logger;

        AnyEvent::Task::Server->new(
          name => 'sleeper',
          listen => ['unix/', '/tmp/anyevent-task.socket'],
          interface => sub {
            logger->info('about to compute some operation');
            {
              my $timer = logger->timer('computing some operation');
              select undef,undef,undef, 1; ## sleep for 1 second
            }
          },
        )->run;

    Note: Portable server code should never call "sleep" because on some
    systems it will interfere with the recoverable worker timeout feature
    implemented with "SIGALRM".

    In your client code, pass a Log::Defer object in when you create a
    checkout:

        use AnyEvent::Task::Client;
        use Log::Defer;

        my $client = AnyEvent::Task::Client->new(
                       connect => ['unix/', '/tmp/anyevent-task.socket'],
                     );

        my $log_defer_object = Log::Defer->new(sub {
                                                 my $msg = shift;
                                                 use Data::Dumper; ## or whatever
                                                 print Dumper($msg);
                                               });

        $log_defer_object->info('going to compute some operation in a worker');

        my $checkout = $client->checkout(log_defer_object => $log_defer_object);

        my $cv = AE::cv;

        $checkout->(sub {
          $log_defer_object->info('finished some operation');
          $cv->send;
        });

        $cv->recv;

    When run, the above client will print something like this:

        $VAR1 = {
              'start' => '1363232705.96839',
              'end' => '1.027309',
              'logs' => [
                          [
                            '0.000179',
                            30,
                            'going to compute some operation in a worker'
                          ],
                          [
                            '0.023881061050415',
                            30,
                            'about to compute some operation'
                          ],
                          [
                            '1.025965',
                            30,
                            'finished some operation'
                          ]
                        ],
              'timers' => {
                            'computing some operation' => [
                                                            '0.024089061050415',
                                                            '1.02470206105041'
                                                          ]
                          }
            };

ERROR HANDLING
    In a synchronous program, if you expected some operation to throw an
    exception you might wrap it in "eval" like this:

        my $crypted;

        eval {
          $crypted = hash('secret');
        };

README  view on Meta::CPAN

    It depends.

    AnyEvent::Task clients send discrete messages and receive ordered
    replies from workers, much like HTTP. The AnyEvent::Task protocol can be
    extended in a backwards-compatible manner like HTTP. AnyEvent::Task
    communication can be pipelined and possibly in the future even
    compressed like HTTP.

    The current AnyEvent::Task server obeys a very specific implementation
    policy: It is like a CGI server in that each process it forks is
    guaranteed to be handling only one connection at once so it can perform
    blocking operations without worrying about holding up other connections.

    But since a single process can handle many requests in a row without
    exiting, they are more like persistent FastCGI processes. The difference
    however is that while a client holds a checkout it is guaranteed an
    exclusive lock on that process (useful for supporting DB transactions
    for example). With a FastCGI server it is assumed that requests are
    stateless so you can't necessarily be sure you'll get the same process
    for two consecutive requests. In fact, if an error is thrown in the
    FastCGI handler you may never get the same process back again,
    preventing you from being able to recover from the error, retry, or at
    least collect process state for logging reasons.

    The fundamental difference between the AnyEvent::Task protocol and HTTP
    is that in AnyEvent::Task the client is the dominant protocol
    orchestrator whereas in HTTP it is the server.

    In AnyEvent::Task, the client manages the worker pool and the client
    decides if/when worker processes should terminate. In the normal case, a
    client will just return the worker to its worker pool. A worker is
    supposed to accept commands for as long as possible until the client
    dismisses it.

    The client decides the timeout for each checkout and different clients
    can have different timeouts while connecting to the same server.

    Client processes can be started and checkouts can be obtained before the
    server is even started. The client will continue trying to connect to
    the server to obtain worker processes until either the server starts or
    the checkout's timeout period lapses. As well as freeing you from having
    to start your services in the "right" order, this also means servers can
    be restarted without throwing any errors (aka "zero-downtime restarts").

    The client even decides how many minimum workers should be in the pool
    upon start-up and how many maximum workers to acquire before checkout
    creation requests are queued. The server is really just a dumb
    fork-on-demand server and most of the sophistication is in the
    asynchronous client.

SEE ALSO
    The AnyEvent::Task github repo
    <https://github.com/hoytech/AnyEvent-Task>

    In order to handle exceptions in a meaningful way with this module, you
    must use Callback::Frame. In order to maintain seamless request logging
    across clients and workers, you should use Log::Defer.

    There are many modules on CPAN similar to AnyEvent::Task.

    This module is designed to be used in a non-blocking, process-based unix
    program. Depending on your exact requirements you might find something
    else useful: Parallel::ForkManager, Thread::Pool, or an HTTP server of
    some kind.

    If you're into AnyEvent, AnyEvent::DBI and AnyEvent::Worker (based on
    AnyEvent::DBI), AnyEvent::ForkObject, and AnyEvent::Fork::RPC send and
    receive commands from worker processes similar to this module.
    AnyEvent::Worker::Pool also has an implementation of a worker pool.
    AnyEvent::Gearman can interface with Gearman services.

    If you're into POE there is POE::Component::Pool::DBI, POEx::WorkerPool,
    POE::Component::ResourcePool, POE::Component::PreforkDispatch,
    Cantella::Worker.

BUGS
    Although this module's interface is now stable and has been in
    production use for some time, there are few remaining TODO items (see
    the bottom of Task.pm).

AUTHOR
    Doug Hoyte, "<doug@hcsw.org>"

COPYRIGHT & LICENSE
    Copyright 2012-2017 Doug Hoyte.

    This module is licensed under the same terms as perl itself.



( run in 0.714 second using v1.01-cache-2.11-cpan-39bf76dae61 )