AnyEvent-MP

 view release on metacpan or  search on metacpan

MP.pm  view on Meta::CPAN

commandline clients.

   configure nodeid => "myscript/%n/%u";

Example: configure a node using a profile called seed, which is suitable
for a seed node as it binds on all local addresses on a fixed port (4040,
customary for aemp).

   # use the aemp commandline utility
   # aemp profile seed binds '*:4040'

   # then use it
   configure profile => "seed";

   # or simply use aemp from the shell again:
   # aemp run profile seed

   # or provide a nicer-to-remember nodeid
   # aemp run profile seed nodeid "$(hostname)"

=item $SELF

Contains the current port id while executing C<rcv> callbacks or C<psub>
blocks.

=item *SELF, SELF, %SELF, @SELF...

Due to some quirks in how perl exports variables, it is impossible to
just export C<$SELF>, all the symbols named C<SELF> are exported by this
module, but only C<$SELF> is currently used.

=item snd $port, type => @data

=item snd $port, @msg

Send the given message to the given port, which can identify either a
local or a remote port, and must be a port ID.

While the message can be almost anything, it is highly recommended to
use a string as first element (a port ID, or some word that indicates a
request type etc.) and to consist if only simple perl values (scalars,
arrays, hashes) - if you think you need to pass an object, think again.

The message data logically becomes read-only after a call to this
function: modifying any argument (or values referenced by them) is
forbidden, as there can be considerable time between the call to C<snd>
and the time the message is actually being serialised - in fact, it might
never be copied as within the same process it is simply handed to the
receiving port.

The type of data you can transfer depends on the transport protocol: when
JSON is used, then only strings, numbers and arrays and hashes consisting
of those are allowed (no objects). When Storable is used, then anything
that Storable can serialise and deserialise is allowed, and for the local
node, anything can be passed. Best rely only on the common denominator of
these.

=item $local_port = port

Create a new local port object and returns its port ID. Initially it has
no callbacks set and will throw an error when it receives messages.

=item $local_port = port { my @msg = @_ }

Creates a new local port, and returns its ID. Semantically the same as
creating a port and calling C<rcv $port, $callback> on it.

The block will be called for every message received on the port, with the
global variable C<$SELF> set to the port ID. Runtime errors will cause the
port to be C<kil>ed. The message will be passed as-is, no extra argument
(i.e. no port ID) will be passed to the callback.

If you want to stop/destroy the port, simply C<kil> it:

   my $port = port {
      my @msg = @_;
      ...
      kil $SELF;
   };

=cut

sub rcv($@);

my $KILME = sub {
   (my $tag = substr $_[0], 0, 30) =~ s/([^\x20-\x7e])/./g;
   kil $SELF, unhandled_message => "no callback found for message '$tag'";
};

sub port(;&) {
   my $id = $UNIQ . ++$ID;
   my $port = "$NODE#$id";

   rcv $port, shift || $KILME;

   $port
}

=item rcv $local_port, $callback->(@msg)

Replaces the default callback on the specified port. There is no way to
remove the default callback: use C<sub { }> to disable it, or better
C<kil> the port when it is no longer needed.

The global C<$SELF> (exported by this module) contains C<$port> while
executing the callback. Runtime errors during callback execution will
result in the port being C<kil>ed.

The default callback receives all messages not matched by a more specific
C<tag> match.

=item rcv $local_port, tag => $callback->(@msg_without_tag), ...

Register (or replace) callbacks to be called on messages starting with the
given tag on the given port (and return the port), or unregister it (when
C<$callback> is C<$undef> or missing). There can only be one callback
registered for each tag.

The original message will be passed to the callback, after the first
element (the tag) has been removed. The callback will use the same
environment as the default callback (see above).

MP.pm  view on Meta::CPAN

=cut

sub rcv($@) {
   my $port = shift;
   my ($nodeid, $portid) = split /#/, $port, 2;

   $nodeid eq $NODE
      or Carp::croak "$port: rcv can only be called on local ports, caught";

   while (@_) {
      if (ref $_[0]) {
         if (my $self = $PORT_DATA{$portid}) {
            "AnyEvent::MP::Port" eq ref $self
               or Carp::croak "$port: rcv can only be called on message matching ports, caught";

            $self->[0] = shift;
         } else {
            my $cb = shift;
            $PORT{$portid} = sub {
               local $SELF = $port;
               eval { &$cb }; _self_die if $@;
            };
         }
      } elsif (defined $_[0]) {
         my $self = $PORT_DATA{$portid} ||= do {
            my $self = bless [$PORT{$portid} || sub { }, { }, $port], "AnyEvent::MP::Port";

            $PORT{$portid} = sub {
               local $SELF = $port;

               if (my $cb = $self->[1]{$_[0]}) {
                  shift;
                  eval { &$cb }; _self_die if $@;
               } else {
                  &{ $self->[0] };
               }
            };

            $self
         };

         "AnyEvent::MP::Port" eq ref $self
            or Carp::croak "$port: rcv can only be called on message matching ports, caught";

         my ($tag, $cb) = splice @_, 0, 2;

         if (defined $cb) {
            $self->[1]{$tag} = $cb;
         } else {
            delete $self->[1]{$tag};
         }
      }
   }

   $port
}

=item peval $port, $coderef[, @args]

Evaluates the given C<$codref> within the context of C<$port>, that is,
when the code throws an exception the C<$port> will be killed.

Any remaining args will be passed to the callback. Any return values will
be returned to the caller.

This is useful when you temporarily want to execute code in the context of
a port.

Example: create a port and run some initialisation code in it's context.

   my $port = port { ... };

   peval $port, sub {
      init
         or die "unable to init";
   };

=cut

sub peval($$) {
   local $SELF = shift;
   my $cb = shift;

   if (wantarray) {
      my @res = eval { &$cb };
      _self_die if $@;
      @res
   } else {
      my $res = eval { &$cb };
      _self_die if $@;
      $res
   }
}

=item $closure = psub { BLOCK }

Remembers C<$SELF> and creates a closure out of the BLOCK. When the
closure is executed, sets up the environment in the same way as in C<rcv>
callbacks, i.e. runtime errors will cause the port to get C<kil>ed.

The effect is basically as if it returned C<< sub { peval $SELF, sub {
BLOCK }, @_ } >>.

This is useful when you register callbacks from C<rcv> callbacks:

   rcv delayed_reply => sub {
      my ($delay, @reply) = @_;
      my $timer = AE::timer $delay, 0, psub {
         snd @reply, $SELF;
      };
   };

=cut

sub psub(&) {
   my $cb = shift;

   my $port = $SELF
      or Carp::croak "psub can only be called from within rcv or psub callbacks, not";

   sub {



( run in 0.690 second using v1.01-cache-2.11-cpan-71847e10f99 )