AnyEvent-Task

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN

    first-serve queue. Once a worker process becomes available, it is
    associated with that checkout until that checkout is garbage collected
    which in perl means as soon as it is no longer needed. Each checkout
    also maintains a queue of requested method-calls so that as soon as a
    worker process is allocated to a checkout, any queued method calls are
    filled in order.

    "timeout" can be passed as a keyword argument to "checkout". Once a
    request is queued up on that checkout, a timer of "timout" seconds
    (default is 30, undef means infinity) is started. If the request
    completes during this timeframe, the timer is cancelled. If the timer
    expires, the worker connection is terminated and an exception is thrown
    in the dynamic context of the callback (see the "ERROR HANDLING"
    section).

    Note that since timeouts are associated with a checkout, checkouts can
    be created before the server is started. As long as the server is
    running within "timeout" seconds, no error will be thrown and no
    requests will be lost. The client will continually try to acquire worker
    processes until a server is available, and once one is available it will
    attempt to allocate all queued checkouts.

README  view on Meta::CPAN

    But in an asynchronous program, typically "hash" would initiate some
    kind of asynchronous operation and then return immediately, allowing the
    program to go about other tasks while waiting for the result. Since the
    error might come back at any time in the future, the program needs a way
    to map the exception that is thrown back to the original context.

    AnyEvent::Task accomplishes this mapping with Callback::Frame.

    Callback::Frame lets you preserve error handlers (and "local" variables)
    across asynchronous callbacks. Callback::Frame is not tied to
    AnyEvent::Task, AnyEvent or any other async framework and can be used
    with almost all callback-based libraries.

    However, when using AnyEvent::Task, libraries that you use in the client
    must be AnyEvent compatible. This restriction obviously does not apply
    to your server code, that being the main purpose of this module:
    accessing blocking resources from an asynchronous program. In your
    server code, when there is an error condition you should simply "die" or
    "croak" as in a synchronous program.

    As an example usage of Callback::Frame, here is how we would handle
    errors thrown from a worker process running the "hash" method in an
    asychronous client program:

        use Callback::Frame;

        frame(code => sub {

          $client->checkout->hash('secret', sub {
            my ($checkout, $crypted) = @_;
            say "Hashed password is $crypted";
          });

        }, catch => sub {

          my $back_trace = shift;
          say "Error is: $@";
          say "Full back-trace: $back_trace";

        })->(); ## <-- frame is created and then immediately executed

    Of course if "hash" is something like a bcrypt hash function it is
    unlikely to raise an exception so maybe that's a bad example. On the
    other hand, maybe it's a really good example: In addition to errors that
    occur while running your callbacks, AnyEvent::Task uses Callback::Frame
    to throw errors if the worker process times out, so if the bcrypt "cost"
    is really cranked up it might hit the default 30 second time limit.

  Rationale for Callback::Frame
    Why not just call the callback but set $@ and indicate an error has

README  view on Meta::CPAN

    connection.

    Callback::Frame provides an error handler stack so you can have a
    top-level handler as well as nested handlers (similar to nested
    "eval"s). This is useful when you wish to have a top-level "bail-out"
    error handler and also nested error handlers that know how to retry or
    recover from an error in an async sub-operation.

    Callback::Frame is designed to be easily used with callback-based
    libraries that don't know about Callback::Frame. "fub" is a shortcut for
    "frame" with just the "code" argument. Instead of passing "sub { ... }"
    into libraries you can pass in "fub { ... }". When invoked, this wrapped
    callback will first re-establish any error handlers that you installed
    with "frame" and then run your provided code. Libraries that force
    in-band error signalling can be handled with callbacks such as "fub {
    die $@ if $@; ... }". Separate error callbacks should simply be "fub {
    die "failed becase ..." }".

    It's important that all callbacks be created with "fub" (or "frame")
    even if you don't expect them to fail so that the dynamic context is
    preserved for nested callbacks that may. An exception is the callbacks
    provided to AnyEvent::Task checkouts: These are automatically wrapped in
    frames for you (although explicitly passing in fubs is fine too).

    The Callback::Frame documentation explains how this works in much more
    detail.

  Reforking of workers after errors
    If a worker throws an error, the client receives the error but the
    worker process stays running. As long as the client has a reference to
    the checkout (and as long as the exception wasn't "fatal" -- see below),
    it can still be used to communicate with that worker so you can access
    error states, rollback transactions, or do any sort of required

lib/AnyEvent/Task.pm  view on Meta::CPAN





=head1 DESIGN

Both client and server are of course built with L<AnyEvent>. However, workers can't use AnyEvent (yet). I've never found a need to do event processing in the worker since if the library you wish to use is already AnyEvent-compatible you can simply us...

Each client maintains a "pool" of connections to worker processes. Every time a checkout is requested, the request is placed into a first-come, first-serve queue. Once a worker process becomes available, it is associated with that checkout until that...

C<timeout> can be passed as a keyword argument to C<checkout>. Once a request is queued up on that checkout, a timer of C<timout> seconds (default is 30, undef means infinity) is started. If the request completes during this timeframe, the timer is c...

Note that since timeouts are associated with a checkout, checkouts can be created before the server is started. As long as the server is running within C<timeout> seconds, no error will be thrown and no requests will be lost. The client will continua...

Because of checkout queuing, the maximum number of worker processes a client will attempt to obtain can be limited with the C<max_workers> argument when creating a client object. If there are more live checkouts than C<max_workers>, the remaining che...

The C<min_workers> argument determines how many "hot-standby" workers should be pre-forked when creating the client. The default is 2 though note that this may change to 0 in the future.




lib/AnyEvent/Task.pm  view on Meta::CPAN

    if ($@) {
      say "hash failed: $@";
    } else {
      say "hashed password is $crypted";
    }

But in an asynchronous program, typically C<hash> would initiate some kind of asynchronous operation and then return immediately, allowing the program to go about other tasks while waiting for the result. Since the error might come back at any time i...

AnyEvent::Task accomplishes this mapping with L<Callback::Frame>.

Callback::Frame lets you preserve error handlers (and C<local> variables) across asynchronous callbacks. Callback::Frame is not tied to AnyEvent::Task, AnyEvent or any other async framework and can be used with almost all callback-based libraries.

However, when using AnyEvent::Task, libraries that you use in the client must be L<AnyEvent> compatible. This restriction obviously does not apply to your server code, that being the main purpose of this module: accessing blocking resources from an a...

As an example usage of Callback::Frame, here is how we would handle errors thrown from a worker process running the C<hash> method in an asychronous client program:

    use Callback::Frame;

    frame(code => sub {

      $client->checkout->hash('secret', sub {
        my ($checkout, $crypted) = @_;
        say "Hashed password is $crypted";
      });

    }, catch => sub {

      my $back_trace = shift;
      say "Error is: $@";
      say "Full back-trace: $back_trace";

    })->(); ## <-- frame is created and then immediately executed

Of course if C<hash> is something like a bcrypt hash function it is unlikely to raise an exception so maybe that's a bad example. On the other hand, maybe it's a really good example: In addition to errors that occur while running your callbacks, L<An...



=head2 Rationale for Callback::Frame

Why not just call the callback but set C<$@> and indicate an error has occurred? This is the approach taken with L<AnyEvent::DBI> for example. I believe the L<Callback::Frame> interface is superior to this method. In a synchronous program, exceptions...

How about having AnyEvent::Task expose an error callback? This is the approach taken by L<AnyEvent::Handle> for example. I believe Callback::Frame is superior to this method also. Although separate callbacks are (sort of) out-of-band, you still have ...

In servers, Callback::Frame helps you maintain the "dynamic state" (error handlers and dynamic variables) installed for a single connection. In other words, any errors that occur while servicing that connection will be able to be caught by an error h...

Callback::Frame provides an error handler stack so you can have a top-level handler as well as nested handlers (similar to nested C<eval>s). This is useful when you wish to have a top-level "bail-out" error handler and also nested error handlers that...

Callback::Frame is designed to be easily used with callback-based libraries that don't know about Callback::Frame. C<fub> is a shortcut for C<frame> with just the C<code> argument. Instead of passing C<sub { ... }> into libraries you can pass in C<fu...

It's important that all callbacks be created with C<fub> (or C<frame>) even if you don't expect them to fail so that the dynamic context is preserved for nested callbacks that may. An exception is the callbacks provided to AnyEvent::Task checkouts: T...

The L<Callback::Frame> documentation explains how this works in much more detail.



=head2 Reforking of workers after errors

If a worker throws an error, the client receives the error but the worker process stays running. As long as the client has a reference to the checkout (and as long as the exception wasn't "fatal" -- see below), it can still be used to communicate wit...

However, once the checkout object is destroyed, by default the worker will be shutdown instead of returning to the client's worker pool as in the normal case where no errors were thrown. This is a "safe-by-default" behaviour that may help in the even...

lib/AnyEvent/Task.pm  view on Meta::CPAN



TODO

! max checkout queue size
  - start delivering fatal errors to some (at front of queue or back of queue though?)
  - write test for this

! docs: write good error handling examples

Make names more consistent between callback::frame backtraces and auto-generated log::defer timers

make server not use AnyEvent so don't have to worry about workers unlinking unix socket in dtors

In a graceful shutdown scenario, servers wait() on all their children before terminating.
  - Support relinquishing accept() socket during this period?

Document hung_worker_timeout and SIGALRM stuff

Wire protocol:
  - Support something other than JSON? Sereal?

lib/AnyEvent/Task/Client/Checkout.pm  view on Meta::CPAN

  return sub {
    $self->{last_name} = undef;

    return $self->_queue_request([ undef, @_, ]);
  };
}

sub _queue_request {
  my ($self, $request) = @_;

  unless (Callback::Frame::is_frame($request->[-1])) {
    my $name = undef;

    if (defined $self->{client}->{name} || defined $self->{last_name}) {
      $name = defined $self->{client}->{name} ? $self->{client}->{name} : 'ANONYMOUS CLIENT';
      $name .= ' -> ';
      $name .= defined $self->{last_name} ? $self->{last_name} : 'NO METHOD';
    }

    my %args = (code => $request->[-1]);

    $args{name} = $name if defined $name;

    $request->[-1] = frame(%args)
      unless Callback::Frame::is_frame($request->[-1]);
  }

  push @{$self->{pending_requests}}, $request;

  $self->_install_timeout_timer;

  $self->_try_to_fill_requests;

  return;
}

lib/AnyEvent/Task/Client/Checkout.pm  view on Meta::CPAN

    $current_cb = $self->{current_cb};
  } elsif (@{$self->{pending_requests}}) {
    $current_cb = $self->{pending_requests}->[0]->[-1];
  } else {
    die "_throw_error called but no callback installed. Error thrown was: $err";
  }

  $self->{pending_requests} = undef;

  if ($current_cb) {
    frame(existing_frame => $current_cb,
          code => sub {
      die $err;
    })->();
  }

  $self->{cmd_handler} = undef;
}

sub throw_fatal_error {
  my ($self, $err) = @_;

t/args.t  view on Meta::CPAN


  $client->checkout->some_method(1, sub {
    my ($checkout, $ret) = @_;

    ok(!$@, 'no error set 2');
    ok(@$ret == 2);
    ok($ret->[0] eq 'some_method');
    ok($ret->[1] == 1);
  });

  $client->checkout->error('die please', frame(code => sub {
    die "should never get here";
  }, catch => sub {
    ok($@, 'no error set 3');
    ok($@ =~ /ERR: die please/);
    ok($@ !~ /setup exception/i);
  }));

  frame(code => sub {
    $client->checkout->error('again, plz die', sub {
      die "should never get here 2";
    });
  }, catch => sub {
    my $trace = shift;
    ok($@ =~ /ERR: again, plz die/, '$@ has the exception');
    ok($trace =~ /MY CLIENT NAME -> error/, 'argument to callback has stack trace');

    $cv->send;
  })->();

t/dont_refork_after_error.t  view on Meta::CPAN

  $checkout->get_pid(sub {
    my ($checkout, $ret) = @_;
    $pid = $ret;

    like($pid, qr/^\d+$/, "got PID");

    $checkout->get_pid(sub {
      my ($checkout, $ret) = @_;
      is($pid, $ret, "PID didn't change in same checkout");

      $checkout->throw("BLAH", frame(code => sub {
        die "throw method didn't return error";
      }, catch => sub {
        my $err = $@;
        like($err, qr/BLAH/, "caught BLAH error");

        $checkout->get_pid(sub {
          my ($checkout, $ret) = @_;
          is($pid, $ret, "PID didn't change even after error");

          $checkout->throw("OUCH", frame(code => sub {
            die "throw method didn't return error 2";
          }, catch => sub {
            my $err = $@;
            like($err, qr/OUCH/, "caught OUCH error");
 
            $checkout->get_pid(sub {
              my ($checkout, $ret) = @_;
              is($pid, $ret, "PID didn't change even after second error");
            });
          }));

t/error-clears-checkout-queue.t  view on Meta::CPAN



my $cv = AE::cv;

my $timeout_watcher = AE::timer 0.5, 0, sub {
  $cv->send;
};

my $num_exceptions_caught = 0;

frame(code => sub {
  my $checkout = $client->checkout;

  $checkout->success(sub { ok(1, "first in checkout queue") });
  $checkout->success(sub { ok(1, "second in checkout queue") });

  $checkout->die(sub { die "exception should have been caught instead of calling this" });

  $checkout->success(sub { die "this should have been removed from the queue" });
  $checkout->success(sub { die "should have been removed" });
}, catch => sub {

t/logger.t  view on Meta::CPAN

$log_defer_object->info("hello from", $$);

$client->checkout(log_defer_object => $log_defer_object)->normal(sub {
  my ($checkout, $ret) = @_;

  $log_defer_object->info("after");

  $checkout->sleep(0.1, sub {});
  $checkout->sleep(0.1, sub {});

  $checkout->error(frame(code => sub {
    die "error not thrown?";
  }, catch => sub {
    ok(1, 'error caught');
    $cv->send;
  }));
});


$cv->recv;


$cv = AE::cv;

$log_defer_object = Log::Defer->new(sub {
  my $msg = shift;

  is($msg->{timers}->[0]->[0], '->()', "didn't leak first arg when called as code ref");
});

$client->checkout(log_defer_object => $log_defer_object)->('first arg', frame(code => sub {
  die "error not thrown by calling interface as a sub?";
}, catch => sub {
  ok(1, 'error caught');
  $cv->send;
}));

$cv->recv;

t/manual-request-abort.t  view on Meta::CPAN


my $client = AnyEvent::Task::Client->new(
               connect => ['unix/', '/tmp/anyevent-task-test.socket'],
             );


my $cv = AE::cv;

my $checkout = $client->checkout( timeout => 1, );

$checkout->sleep_die(frame(code => sub {
  die "checkout was serviced?";
}, catch => sub {
  my $err = $@;
  ok(1, "error hit");
  like($err, qr/manual request abort/, "manual request abort err");
  ok($err !~ /timed out after/, "no timed out err");
  ok($err !~ /hung worker/, "no hung worker err");

  $checkout->get_pid(frame(code => sub {
    die "shouldn't get here";
  }, catch => sub {
    my $err = $@;

    like($err, qr/manual request abort/, "continue to get manual abort error because error was fatal");
    $cv->send;
  }));

}));

t/setup-errors.t  view on Meta::CPAN


my $client = AnyEvent::Task::Client->new(
               connect => ['unix/', '/tmp/anyevent-task-test.socket'],
               max_workers => 1,
             );


my $cv = AE::cv;

{
  $client->checkout->(frame(code => sub {
    die "should never get here";
  }, catch => sub {
    my $err = $@;

    like($err, qr/setup exception: SETUP EXCEPTION 1/);

    $cv->send;
  }));
}


$cv->recv;


$cv = AE::cv;

{
  $client->checkout->(frame(code => sub {
    die "should never get here";
  }, catch => sub {
    my $err = $@;

    like($err, qr/setup exception: SETUP EXCEPTION 1/);

    $cv->send;
  }));
}

t/timeout-connect.t  view on Meta::CPAN

my $client = AnyEvent::Task::Client->new(
               connect => ['unix/', '/tmp/anyevent-task-test-non-existent.socket'],
             );


my $cv = AE::cv;

{
  my $checkout = $client->checkout( timeout => 0.2, );

  $checkout->(frame(code => sub {
    ok(0, "checkout was serviced?");
  }, catch => sub {
    print "## error: $@\n";
    ok(1, "timeout hit");
    $cv->send;
  }));
}

$cv->recv;

t/timeout-hung-worker.t  view on Meta::CPAN

my $client = AnyEvent::Task::Client->new(
               connect => ['unix/', '/tmp/anyevent-task-test.socket'],
             );


my $cv = AE::cv;

{
  my $checkout = $client->checkout( timeout => 2, );

  $checkout->(frame(code => sub {
    die "checkout was serviced?";
  }, catch => sub {
    my $err = $@;
    diag("Hung worker error: $err");
    ok(1, "error hit");
    ok($err !~ /timed out after/, "no timed out err");
    ok($err =~ /worker connection suddenly/, "hung worker err");
    $cv->send;
  }));
}

t/timeout-in-progress.t  view on Meta::CPAN

my $client = AnyEvent::Task::Client->new(
               connect => ['unix/', '/tmp/anyevent-task-test.socket'],
             );


my $cv = AE::cv;

{
  my $checkout = $client->checkout( timeout => 0.2, );

  $checkout->(frame(code => sub {
    die "checkout was serviced?";
  }, catch => sub {
    my $err = $@;
    print "## error: $err\n";
    ok(1, "timeout hit");
    ok($err =~ /timed out after/, 'correct err msg');
    $cv->send;
  }));
}

t/timeout-log-defer.t  view on Meta::CPAN

my $error_thrown = 0;

my $cv = AE::cv;

{
  my $ld = Log::Defer->new(sub {
    ok($error_thrown, 'log defer obj destroyed after error handler ran');
    $cv->send;
  });

  frame_try {
    $client->checkout( timeout => 0.2, log_defer_object => $ld )->(sub {
      $ld->warn("keep alive 1");
      die "checkout was serviced?";
    });
  } frame_catch {
    $ld->warn("keep alive 2");
    my $err = $@;
    ok(1, "timeout hit");
    ok($err =~ /timed out after/, 'correct err msg');
    $error_thrown = 1;
  };
}

my $timer = AE::timer 1, 0, sub {
  fail("log defer object destroyed");

t/worker-_exits.t  view on Meta::CPAN

my $client = AnyEvent::Task::Client->new(
               connect => ['unix/', '/tmp/anyevent-task-test.socket'],
             );


my $cv = AE::cv;

{
  my $checkout = $client->checkout( timeout => 1, );

  $checkout->(frame(code => sub {
    die "checkout was serviced?";
  }, catch => sub {
    my $err = $@;
    ok(1, "error hit");
    ok($err !~ /timed out after/, "no timed out err");
    like($err, qr/worker connection suddenly died/, "sudden death err");

    $checkout->(frame(code => sub {
      die "shouldn't get here";
    }, catch => sub {
      my $err = $@;

      like($err, qr/worker connection suddenly died/, "got same fatal error after calling checkout again");

      $cv->send;
    }));
  }));
}



( run in 1.353 second using v1.01-cache-2.11-cpan-df04353d9ac )