Coro-MP

 view release on metacpan or  search on metacpan

MP.pm  view on Meta::CPAN


   my @reply = syncal 30, $port, 1, 2, 3;

=item $local_port = port_async { ... }

Creates a new local port, and returns its ID. A new thread is created and
attached to the port (see C<rcv_async>, below, for details).

=cut

sub rcv_async($$);

sub port_async(;&) {
   my $id = "$UNIQ." . $ID++;
   my $port = "$NODE#$id";

   @_
      ? rcv_async $port, shift
      : AnyEvent::MP::rcv $port, undef;

   $port
}

MP.pm  view on Meta::CPAN

The special variable C<$SELF> will be set to C<$port> during thread
execution.

When C<$threadcb> returns or the thread is canceled, the return/cancel
values become the C<kil> reason.

It is not allowed to call C<rcv_async> more than once on a given port.

=cut

sub rcv_async($$) {
   my ($port, $threadcb) = @_;

   my (@queue, $coro);

   AnyEvent::MP::rcv $port, sub {
      push @queue, \@_; # TODO, take copy?
      $coro->ready; # TODO, maybe too many unwanted wake-ups?
   };

   $coro = _new_coro $port, $threadcb;

MP.pm  view on Meta::CPAN

   my @21 = get "p2";

Example: assume a message with tag C<now> is already in the queue and fetch
it. If no message was there, do not wait, but die.

   my @msg = get "now", 0
      or die "expected now emssage to be there, but it wasn't";

=cut

sub get($;$) {
   my ($tag, $timeout) = @_;

   my $queue = $Coro::current->{_coro_mp_queue}
      or Carp::croak "Coro::MP::get called from thread not attached to any port";

   my $i;

   while () {
      $queue->[$_][0] eq $tag
         and return @{ splice @$queue, $_, 1 }

MP.pm  view on Meta::CPAN

            die "unexpected message $_[0] received";
         } 30
            or last;
      }
   };

=cut

sub _true { 1 }

sub get_cond(;&$) {
   my ($cond, $timeout) = @_;

   my $queue = $Coro::current->{_coro_mp_queue}
      or Carp::croak "Coro::MP::get_cond called from thread not attached to any port";

   my ($i, $ok);

   $cond ||= \&_true;

   while () {

MP.pm  view on Meta::CPAN


         my $fh = aio_open ...
            or die "open: $!";

         aio_close $fh;
      };
   };

=cut

sub peval_async($$) {
   _new_coro $_[0], $_[1]
}

=item @reply = syncal $port, @msg, $callback[, $timeout]

The synchronous form of C<cal>, a simple form of RPC - it sends a message
to the given C<$port> with the given contents (C<@msg>), but adds a reply
port to the message.

The reply port is created temporarily just for the purpose of receiving

MP.pm  view on Meta::CPAN


If the C<$timeout> is undef, then the local port will monitor the remote
port instead, so it eventually gets cleaned-up.

Example: call the string reverse example from C<get_cond>.

   my $reversed = syncal 1, $reverse, reverse => "Rotator";

=cut

sub syncal($@) {
   my ($timeout, @msg) = @_;

   cal @msg, Coro::rouse_cb, $timeout;
   Coro::rouse_wait
}

=back

=head1 SEE ALSO



( run in 0.340 second using v1.01-cache-2.11-cpan-1f129e94a17 )