Coro-MP
view release on metacpan or search on metacpan
use base "Exporter";
our $VERSION = "0.1";
our @EXPORT = (@AnyEvent::MP::EXPORT, qw(
port_async rcv_async get get_cond syncal peval_async
));
our @EXPORT_OK = (@AnyEvent::MP::EXPORT_OK);
sub _new_coro {
my ($port, $threadcb) = @_;
my $coro = async_pool {
eval { $threadcb->() };
kil $SELF, die => $@ if $@;
};
$coro->swap_sv (\$SELF, \$port);
# killing the port cancels the coro
# delaying kil messages inside aemp guarantees
# (hopefully) that $coro != $Coro::current.
mon $port, sub { $coro->cancel (@_) };
# cancelling the coro kills the port
$coro->on_destroy (sub { kil $port, @_ });
$coro
}
=item NODE, $NODE, node_of, configure
=item $SELF, *SELF, SELF, %SELF, @SELF...
=item snd, mon, kil, psub
These variables and functions work exactly as in AnyEvent::MP, in fact,
they are exactly the same functions, and are used in much the same way.
When C<$threadcb> returns or the thread is canceled, the return/cancel
values become the C<kil> reason.
It is not allowed to call C<rcv_async> more than once on a given port.
=cut
sub rcv_async($$) {
my ($port, $threadcb) = @_;
my (@queue, $coro);
AnyEvent::MP::rcv $port, sub {
push @queue, \@_; # TODO, take copy?
$coro->ready; # TODO, maybe too many unwanted wake-ups?
};
$coro = _new_coro $port, $threadcb;
$coro->{_coro_mp_queue} = \@queue;
}
=item @msg = get $tag
=item @msg = get $tag, $timeout
Find, dequeue and return the next message with the specified C<$tag>. If
no matching message is currently queued, wait up to C<$timeout> seconds
(or forever if no C<$timeout> has been specified or it is C<undef>) for
one to arrive.
it. If no message was there, do not wait, but die.
my @msg = get "now", 0
or die "expected now emssage to be there, but it wasn't";
=cut
sub get($;$) {
my ($tag, $timeout) = @_;
my $queue = $Coro::current->{_coro_mp_queue}
or Carp::croak "Coro::MP::get called from thread not attached to any port";
my $i;
while () {
$queue->[$_][0] eq $tag
and return @{ splice @$queue, $_, 1 }
for $i..$#$queue;
$i = @$queue;
}
};
=cut
sub _true { 1 }
sub get_cond(;&$) {
my ($cond, $timeout) = @_;
my $queue = $Coro::current->{_coro_mp_queue}
or Carp::croak "Coro::MP::get_cond called from thread not attached to any port";
my ($i, $ok);
$cond ||= \&_true;
while () {
do
{
local *_ = $queue->[$_];
my $fh = aio_open ...
or die "open: $!";
aio_close $fh;
};
};
=cut
sub peval_async($$) {
_new_coro $_[0], $_[1]
}
=item @reply = syncal $port, @msg, $callback[, $timeout]
The synchronous form of C<cal>, a simple form of RPC - it sends a message
to the given C<$port> with the given contents (C<@msg>), but adds a reply
port to the message.
The reply port is created temporarily just for the purpose of receiving
the reply, and will be C<kil>ed when no longer needed.
( run in 0.314 second using v1.01-cache-2.11-cpan-3cd7ad12f66 )