Coro-MP

 view release on metacpan or  search on metacpan

MP.pm  view on Meta::CPAN

use AnyEvent::MP::Kernel;
use AnyEvent::MP;
use Coro;
use Coro::AnyEvent ();

use AE ();

use base "Exporter";

our $VERSION = "0.1";

our @EXPORT = (@AnyEvent::MP::EXPORT, qw(
   port_async rcv_async get get_cond syncal peval_async
));
our @EXPORT_OK = (@AnyEvent::MP::EXPORT_OK);

sub _new_coro {
   my ($port, $threadcb) = @_;

   my $coro = async_pool {
      eval { $threadcb->() };
      kil $SELF, die => $@ if $@;
   };
   $coro->swap_sv (\$SELF, \$port);

   # killing the port cancels the coro
   # delaying kil messages inside aemp guarantees
   # (hopefully) that $coro != $Coro::current.
   mon $port, sub { $coro->cancel (@_) };

   # cancelling the coro kills the port
   $coro->on_destroy (sub { kil $port, @_ });

   $coro
}

=item NODE, $NODE, node_of, configure

=item $SELF, *SELF, SELF, %SELF, @SELF...

=item snd, mon, kil, psub

These variables and functions work exactly as in AnyEvent::MP, in fact,
they are exactly the same functions, and are used in much the same way.

=item rcv

This function works exactly as C<AnyEvent::MP::rcv>, and is in fact
compatible with Coro::MP ports. However, the canonical way to receive
messages with Coro::MP is to use C<get> or C<get_cond>.

=item port

This function is exactly the same as C<AnyEvent::MP::port> and creates new
ports. You can attach a thread to them by calling C<rcv_async> or you can
do a create and attach in one operation using C<port_async>.

=item peval

This function works exactly as C<AnyEvent::MP::psub> - you could use it to
run callbacks within a port context (good for monitoring), but you cannot
C<get> messages unless the callback executes within the thread attached to
the port.

Since creating a thread with port context requires somewhta annoying
syntax, there is a C<peval_async> function that handles that for you - note
that within such a thread, you still cannot C<get> messages.

=item spawn

This function is identical to C<AnyEvent::MP::spawn>. This means that
it doesn't spawn a new thread as one would expect, but simply calls an
init function. The init function, however, can attach a new thread easily:

   sub initfun {
      my (@args) = @_;

      rcv_async $SELF, sub {
         # thread-code
      };
   }

=item cal

This function is identical to C<AnyEvent::MP::cal>. The easiest way to
make a synchronous call is to use Coro's rouse functionality:

   # send 1, 2, 3 to $port and wait up to 30s for reply
   cal $port, 1, 2, 3, rouse_cb, 30;
   my @reply = rouse_wait;

You can also use C<syncal> if you want, and are ok with learning yet
another function with a weird name:

   my @reply = syncal 30, $port, 1, 2, 3;

=item $local_port = port_async { ... }

Creates a new local port, and returns its ID. A new thread is created and
attached to the port (see C<rcv_async>, below, for details).

=cut

sub rcv_async($$);

sub port_async(;&) {
   my $id = "$UNIQ." . $ID++;
   my $port = "$NODE#$id";

   @_
      ? rcv_async $port, shift
      : AnyEvent::MP::rcv $port, undef;

   $port
}

=item rcv_async $port, $threadcb

This function creates and attaches a thread on a port. The thread is set
to execute C<$threadcb> and is put into the ready queue. The thread will
receive all messages not filtered away by tagged receive callbacks (as set
by C<AnyEvent::MP::rcv>) - it simply replaces the default callback of an
AnyEvent::MP port.

The special variable C<$SELF> will be set to C<$port> during thread
execution.

When C<$threadcb> returns or the thread is canceled, the return/cancel
values become the C<kil> reason.

It is not allowed to call C<rcv_async> more than once on a given port.

=cut

sub rcv_async($$) {
   my ($port, $threadcb) = @_;

   my (@queue, $coro);

   AnyEvent::MP::rcv $port, sub {
      push @queue, \@_; # TODO, take copy?
      $coro->ready; # TODO, maybe too many unwanted wake-ups?
   };

   $coro = _new_coro $port, $threadcb;
   $coro->{_coro_mp_queue} = \@queue;
}

=item @msg = get $tag

=item @msg = get $tag, $timeout

Find, dequeue and return the next message with the specified C<$tag>. If
no matching message is currently queued, wait up to C<$timeout> seconds
(or forever if no C<$timeout> has been specified or it is C<undef>) for
one to arrive.

Returns the message with the initial tag removed. In case of a timeout,
the empty list. The function I<must> be called in list context.

Note that empty messages cannot be distinguished from a timeout when using
C<rcv>.

Example: send a "log" message to C<$SELF> and then get and print it.

   snd $SELF, log => "text";
   my ($text) = get "log";
   print "log message: $text\n";

Example: receive C<p1> and C<p2> messages, regardless of the order they
arrive in on the port.

   my @p1 = get "p1";
   my @21 = get "p2";

Example: assume a message with tag C<now> is already in the queue and fetch
it. If no message was there, do not wait, but die.

   my @msg = get "now", 0
      or die "expected now emssage to be there, but it wasn't";



( run in 0.783 second using v1.01-cache-2.11-cpan-140bd7fdf52 )