Coro-MP
view release on metacpan or search on metacpan
my @reply = syncal 30, $port, 1, 2, 3;
=item $local_port = port_async { ... }
Creates a new local port, and returns its ID. A new thread is created and
attached to the port (see C<rcv_async>, below, for details).
=cut
sub rcv_async($$);
sub port_async(;&) {
my $id = "$UNIQ." . $ID++;
my $port = "$NODE#$id";
@_
? rcv_async $port, shift
: AnyEvent::MP::rcv $port, undef;
$port
}
The special variable C<$SELF> will be set to C<$port> during thread
execution.
When C<$threadcb> returns or the thread is canceled, the return/cancel
values become the C<kil> reason.
It is not allowed to call C<rcv_async> more than once on a given port.
=cut
sub rcv_async($$) {
my ($port, $threadcb) = @_;
my (@queue, $coro);
AnyEvent::MP::rcv $port, sub {
push @queue, \@_; # TODO, take copy?
$coro->ready; # TODO, maybe too many unwanted wake-ups?
};
$coro = _new_coro $port, $threadcb;
my @21 = get "p2";
Example: assume a message with tag C<now> is already in the queue and fetch
it. If no message was there, do not wait, but die.
my @msg = get "now", 0
or die "expected now emssage to be there, but it wasn't";
=cut
sub get($;$) {
my ($tag, $timeout) = @_;
my $queue = $Coro::current->{_coro_mp_queue}
or Carp::croak "Coro::MP::get called from thread not attached to any port";
my $i;
while () {
$queue->[$_][0] eq $tag
and return @{ splice @$queue, $_, 1 }
die "unexpected message $_[0] received";
} 30
or last;
}
};
=cut
sub _true { 1 }
sub get_cond(;&$) {
my ($cond, $timeout) = @_;
my $queue = $Coro::current->{_coro_mp_queue}
or Carp::croak "Coro::MP::get_cond called from thread not attached to any port";
my ($i, $ok);
$cond ||= \&_true;
while () {
my $fh = aio_open ...
or die "open: $!";
aio_close $fh;
};
};
=cut
sub peval_async($$) {
_new_coro $_[0], $_[1]
}
=item @reply = syncal $port, @msg, $callback[, $timeout]
The synchronous form of C<cal>, a simple form of RPC - it sends a message
to the given C<$port> with the given contents (C<@msg>), but adds a reply
port to the message.
The reply port is created temporarily just for the purpose of receiving
If the C<$timeout> is undef, then the local port will monitor the remote
port instead, so it eventually gets cleaned-up.
Example: call the string reverse example from C<get_cond>.
my $reversed = syncal 1, $reverse, reverse => "Rotator";
=cut
sub syncal($@) {
my ($timeout, @msg) = @_;
cal @msg, Coro::rouse_cb, $timeout;
Coro::rouse_wait
}
=back
=head1 SEE ALSO
( run in 0.640 second using v1.01-cache-2.11-cpan-65fba6d93b7 )