Coro-MP

 view release on metacpan or  search on metacpan

MP.pm  view on Meta::CPAN


use base "Exporter";

our $VERSION = "0.1";

our @EXPORT = (@AnyEvent::MP::EXPORT, qw(
   port_async rcv_async get get_cond syncal peval_async
));
our @EXPORT_OK = (@AnyEvent::MP::EXPORT_OK);

sub _new_coro {
   my ($port, $threadcb) = @_;

   my $coro = async_pool {
      eval { $threadcb->() };
      kil $SELF, die => $@ if $@;
   };
   $coro->swap_sv (\$SELF, \$port);

   # killing the port cancels the coro
   # delaying kil messages inside aemp guarantees
   # (hopefully) that $coro != $Coro::current.
   mon $port, sub { $coro->cancel (@_) };

   # cancelling the coro kills the port
   $coro->on_destroy (sub { kil $port, @_ });

   $coro
}

=item NODE, $NODE, node_of, configure

=item $SELF, *SELF, SELF, %SELF, @SELF...

=item snd, mon, kil, psub

These variables and functions work exactly as in AnyEvent::MP, in fact,
they are exactly the same functions, and are used in much the same way.

MP.pm  view on Meta::CPAN

When C<$threadcb> returns or the thread is canceled, the return/cancel
values become the C<kil> reason.

It is not allowed to call C<rcv_async> more than once on a given port.

=cut

sub rcv_async($$) {
   my ($port, $threadcb) = @_;

   my (@queue, $coro);

   AnyEvent::MP::rcv $port, sub {
      push @queue, \@_; # TODO, take copy?
      $coro->ready; # TODO, maybe too many unwanted wake-ups?
   };

   $coro = _new_coro $port, $threadcb;
   $coro->{_coro_mp_queue} = \@queue;
}

=item @msg = get $tag

=item @msg = get $tag, $timeout

Find, dequeue and return the next message with the specified C<$tag>. If
no matching message is currently queued, wait up to C<$timeout> seconds
(or forever if no C<$timeout> has been specified or it is C<undef>) for
one to arrive.

MP.pm  view on Meta::CPAN

it. If no message was there, do not wait, but die.

   my @msg = get "now", 0
      or die "expected now emssage to be there, but it wasn't";

=cut

sub get($;$) {
   my ($tag, $timeout) = @_;

   my $queue = $Coro::current->{_coro_mp_queue}
      or Carp::croak "Coro::MP::get called from thread not attached to any port";

   my $i;

   while () {
      $queue->[$_][0] eq $tag
         and return @{ splice @$queue, $_, 1 }
         for $i..$#$queue;

      $i = @$queue;

MP.pm  view on Meta::CPAN

      }
   };

=cut

sub _true { 1 }

sub get_cond(;&$) {
   my ($cond, $timeout) = @_;

   my $queue = $Coro::current->{_coro_mp_queue}
      or Carp::croak "Coro::MP::get_cond called from thread not attached to any port";

   my ($i, $ok);

   $cond ||= \&_true;

   while () {
      do
         {
            local *_ = $queue->[$_];

MP.pm  view on Meta::CPAN

         my $fh = aio_open ...
            or die "open: $!";

         aio_close $fh;
      };
   };

=cut

sub peval_async($$) {
   _new_coro $_[0], $_[1]
}

=item @reply = syncal $port, @msg, $callback[, $timeout]

The synchronous form of C<cal>, a simple form of RPC - it sends a message
to the given C<$port> with the given contents (C<@msg>), but adds a reply
port to the message.

The reply port is created temporarily just for the purpose of receiving
the reply, and will be C<kil>ed when no longer needed.



( run in 0.232 second using v1.01-cache-2.11-cpan-3cd7ad12f66 )