Coro-MP
view release on metacpan or search on metacpan
use AnyEvent::MP::Kernel;
use AnyEvent::MP;
use Coro;
use Coro::AnyEvent ();
use AE ();
use base "Exporter";
our $VERSION = "0.1";
our @EXPORT = (@AnyEvent::MP::EXPORT, qw(
port_async rcv_async get get_cond syncal peval_async
));
our @EXPORT_OK = (@AnyEvent::MP::EXPORT_OK);
sub _new_coro {
my ($port, $threadcb) = @_;
my $coro = async_pool {
eval { $threadcb->() };
kil $SELF, die => $@ if $@;
};
$coro->swap_sv (\$SELF, \$port);
# killing the port cancels the coro
# delaying kil messages inside aemp guarantees
# (hopefully) that $coro != $Coro::current.
mon $port, sub { $coro->cancel (@_) };
# cancelling the coro kills the port
$coro->on_destroy (sub { kil $port, @_ });
$coro
}
=item NODE, $NODE, node_of, configure
=item $SELF, *SELF, SELF, %SELF, @SELF...
=item snd, mon, kil, psub
These variables and functions work exactly as in AnyEvent::MP, in fact,
they are exactly the same functions, and are used in much the same way.
=item rcv
This function works exactly as C<AnyEvent::MP::rcv>, and is in fact
compatible with Coro::MP ports. However, the canonical way to receive
messages with Coro::MP is to use C<get> or C<get_cond>.
=item port
This function is exactly the same as C<AnyEvent::MP::port> and creates new
ports. You can attach a thread to them by calling C<rcv_async> or you can
do a create and attach in one operation using C<port_async>.
=item peval
This function works exactly as C<AnyEvent::MP::psub> - you could use it to
run callbacks within a port context (good for monitoring), but you cannot
C<get> messages unless the callback executes within the thread attached to
the port.
Since creating a thread with port context requires somewhta annoying
syntax, there is a C<peval_async> function that handles that for you - note
that within such a thread, you still cannot C<get> messages.
=item spawn
This function is identical to C<AnyEvent::MP::spawn>. This means that
it doesn't spawn a new thread as one would expect, but simply calls an
init function. The init function, however, can attach a new thread easily:
sub initfun {
my (@args) = @_;
rcv_async $SELF, sub {
# thread-code
};
}
=item cal
This function is identical to C<AnyEvent::MP::cal>. The easiest way to
make a synchronous call is to use Coro's rouse functionality:
# send 1, 2, 3 to $port and wait up to 30s for reply
cal $port, 1, 2, 3, rouse_cb, 30;
my @reply = rouse_wait;
You can also use C<syncal> if you want, and are ok with learning yet
another function with a weird name:
my @reply = syncal 30, $port, 1, 2, 3;
=item $local_port = port_async { ... }
Creates a new local port, and returns its ID. A new thread is created and
attached to the port (see C<rcv_async>, below, for details).
=cut
sub rcv_async($$);
sub port_async(;&) {
my $id = "$UNIQ." . $ID++;
my $port = "$NODE#$id";
@_
? rcv_async $port, shift
: AnyEvent::MP::rcv $port, undef;
$port
}
=item rcv_async $port, $threadcb
This function creates and attaches a thread on a port. The thread is set
to execute C<$threadcb> and is put into the ready queue. The thread will
receive all messages not filtered away by tagged receive callbacks (as set
by C<AnyEvent::MP::rcv>) - it simply replaces the default callback of an
AnyEvent::MP port.
The special variable C<$SELF> will be set to C<$port> during thread
execution.
When C<$threadcb> returns or the thread is canceled, the return/cancel
values become the C<kil> reason.
It is not allowed to call C<rcv_async> more than once on a given port.
=cut
sub rcv_async($$) {
my ($port, $threadcb) = @_;
my (@queue, $coro);
AnyEvent::MP::rcv $port, sub {
push @queue, \@_; # TODO, take copy?
$coro->ready; # TODO, maybe too many unwanted wake-ups?
};
$coro = _new_coro $port, $threadcb;
$coro->{_coro_mp_queue} = \@queue;
}
=item @msg = get $tag
=item @msg = get $tag, $timeout
Find, dequeue and return the next message with the specified C<$tag>. If
no matching message is currently queued, wait up to C<$timeout> seconds
(or forever if no C<$timeout> has been specified or it is C<undef>) for
one to arrive.
Returns the message with the initial tag removed. In case of a timeout,
the empty list. The function I<must> be called in list context.
Note that empty messages cannot be distinguished from a timeout when using
C<rcv>.
Example: send a "log" message to C<$SELF> and then get and print it.
snd $SELF, log => "text";
my ($text) = get "log";
print "log message: $text\n";
Example: receive C<p1> and C<p2> messages, regardless of the order they
arrive in on the port.
my @p1 = get "p1";
my @21 = get "p2";
Example: assume a message with tag C<now> is already in the queue and fetch
it. If no message was there, do not wait, but die.
my @msg = get "now", 0
or die "expected now emssage to be there, but it wasn't";
( run in 0.783 second using v1.01-cache-2.11-cpan-140bd7fdf52 )