AnyEvent-MP

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

TODO: maybe disbale warnings by default?
TODO: listener-scopes (10.0.0.1:4040@vpn) and connect-scopes ("vpn,public")
TODO: document env-variable usage
TODO: make node objects responsible for keepalive?

faq: can't se anything
faq: all is asynch
faq: how to interface to non-perl nodes?

TODO: check gproto, nproto, on connect
TODO: limiting reconnecting speed when unreachable? somehow use same interval timers as for seeding and keepalive?
TODO: multiple profiles? also some default profiles?
TODO: export keepalive?
TODO: $guard = con $cb->($up)
TODO: aemp readline support
TODO: gleeco re: AE::MP::DataConn -
TODO: version both in MP.pm and MP/Config.pm because of cpan indexer

2.02 Sun Jul 29 04:22:53 CEST 2018
	- hardcode version in MP.pm to help the CPAN indexer.

MP.pm  view on Meta::CPAN


   # monitoring
   mon $port, $cb->(@msg)      # callback is invoked on death
   mon $port, $localport       # kill localport on abnormal death
   mon $port, $localport, @msg # send message on death

   # temporarily execute code in port context
   peval $port, sub { die "kill the port!" };

   # execute callbacks in $SELF port context
   my $timer = AE::timer 1, 0, psub {
      die "kill the port, delayed";
   };

   # distributed database - modification
   db_set $family => $subkey [=> $value]  # add a subkey
   db_del $family => $subkey...           # delete one or more subkeys
   db_reg $family => $port [=> $value]    # register a port

   # distributed database - queries
   db_family $family => $cb->(\%familyhash)

MP.pm  view on Meta::CPAN

closure is executed, sets up the environment in the same way as in C<rcv>
callbacks, i.e. runtime errors will cause the port to get C<kil>ed.

The effect is basically as if it returned C<< sub { peval $SELF, sub {
BLOCK }, @_ } >>.

This is useful when you register callbacks from C<rcv> callbacks:

   rcv delayed_reply => sub {
      my ($delay, @reply) = @_;
      my $timer = AE::timer $delay, 0, psub {
         snd @reply, $SELF;
      };
   };

=cut

sub psub(&) {
   my $cb = shift;

   my $port = $SELF

MP.pm  view on Meta::CPAN

      and ($cb += 0, Guard::guard { $node->unmonitor ($port, $cb) })
}

=item $guard = mon_guard $port, $ref, $ref...

Monitors the given C<$port> and keeps the passed references. When the port
is killed, the references will be freed.

Optionally returns a guard that will stop the monitoring.

This function is useful when you create e.g. timers or other watchers and
want to free them when the port gets killed (note the use of C<psub>):

  $port->rcv (start => sub {
     my $timer; $timer = mon_guard $port, AE::timer 1, 1, psub {
        undef $timer if 0.9 < rand;
     });
  });

=cut

sub mon_guard {
   my ($port, @refs) = @_;

   #TODO: mon-less form?

MP.pm  view on Meta::CPAN


This is simply a utility function that comes in handy at times - the
AnyEvent::MP author is not convinced of the wisdom of having it, though,
so it may go away in the future.

=cut

sub after($@) {
   my ($timeout, @action) = @_;

   my $t; $t = AE::timer $timeout, 0, sub {
      undef $t;
      ref $action[0]
         ? $action[0]()
         : snd @action;
   };
}

#=item $cb2 = timeout $seconds, $cb[, @args]

=item cal $port, @msg, $callback[, $timeout]

MP.pm  view on Meta::CPAN

   my $timeout = ref $_[-1] ? undef : pop;
   my $cb = pop;

   my $port = port {
      undef $timeout;
      kil $SELF;
      &$cb;
   };

   if (defined $timeout) {
      $timeout = AE::timer $timeout, 0, sub {
         undef $timeout;
         kil $port;
         $cb->();
      };
   } else {
      mon $_[0], sub {
         kil $port;
         $cb->();
      };
   }

MP/DataConn.pm  view on Meta::CPAN


our $ID = "a";
our %STATE;

# another node tells us to await a connection
sub _expect {
   my ($id, $port, $timeout, $initfunc, @initdata) = @_;

   $STATE{$id} = {
      id   => $id,
      to   => (AE::timer $timeout, 0, sub {
         $STATE{$id}{done}(undef);
      }),
      done => sub {
         my ($hdl, $error) = @_;

         %{delete $STATE{$id}} = ();

         if (defined $hdl) {
            (AnyEvent::MP::Kernel::load_func $initfunc)->(@initdata, $hdl);
         } else {

MP/DataConn.pm  view on Meta::CPAN


   my $state = $STATE{$id}
      or return;

   my $addr = $AnyEvent::MP::Global::addr{$node};

   @$addr
      or return $state->{done}(undef, "$node: no listeners found");

   # I love hardcoded constants  !
   $state->{next} = AE::timer 0, 2, sub {
      my $endpoint = shift @$addr
         or return delete $state->{next};

      my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
         or return;

      my $transport; $transport = AnyEvent::MP::Transport::mp_connect
         $host, $port,
         protocol => "aemp-dataconn",
         local_greeting => { dataconn_id => $id },

MP/DataConn.pm  view on Meta::CPAN

   my $port = $SELF
      or Carp::croak "AnyEvent::MP::DataConn::connect_to must be called in port context";

   $node = node_of $node;

   my $id = (++$ID) . "\@$NODE";

   # damn, why do my simple state hashes resemble objects so quickly
   my $state = $STATE{$id} = {
      id   => (++$ID) . "\@$NODE",
      to   => (AE::timer $timeout, 0, sub {
         $STATE{$id}{done}(undef, "$node: unable to establish connection within $timeout seconds");
      }),
      done => sub {
         my ($hdl, $error) = @_;

         delete $AnyEvent::MP::Global::ON_SETUP{$id};
         %{delete $STATE{$id}} = ();

         if (defined $hdl) {
            $cb->($hdl);

MP/Kernel.pm  view on Meta::CPAN


our $DELAY_TIMER;
our @DELAY_QUEUE;

our $delay_run = sub {
   (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
};

sub delay($) {
   push @DELAY_QUEUE, shift;
   $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
}

=item $AnyEvent::MP::Kernel::SRCNODE

During execution of a message callback, this variable contains the node ID
of the origin node.

The main use of this variable is for debugging output - there are probably
very few other cases where you need to know the source node ID.

MP/Kernel.pm  view on Meta::CPAN


   if (@seeds) {
      # start connection attempt for every seed we are not connected to yet
      seed_connect $_
         for grep !exists $SEED_CONNECT{$_}, @seeds;

      $SEED_RETRY = $SEED_RETRY * 2;
      $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
         if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};

      $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;

   } else {
      # all seeds connected or connecting, no need to restart timer
      undef $SEED_WATCHER;
   }
}

sub seed_again {
   $SEED_RETRY = (1 + rand) * 0.6;
   $SEED_WATCHER ||= AE::timer 0, 0, \&seed_all;
}

# sets new seed list, starts connecting
sub set_seeds(@) {
   %SEED_NODE     = ();
   %NODE_SEED     = ();
   %SEED_CONNECT  = ();

   @SEED_NODE{@_} = ();

MP/Kernel.pm  view on Meta::CPAN

                . (join " ", keys %KEEPALIVE_DOWN)
                . ".";

   (add_node $_)->connect
      for keys %KEEPALIVE_DOWN;

   $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2;
   $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
      if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};

   $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
}

sub keepalive_again {
   $KEEPALIVE_RETRY = (1 + rand) * 0.3;
   keepalive_all;
}

sub keepalive_add {
   return if $KEEPALIVE{$_[0]}++;

MP/Node.pm  view on Meta::CPAN


   return if $self->{transport};
   return if $self->{connect_w};

   # we unweaken the node reference, in case it was weakened before
   $AnyEvent::MP::Kernel::NODE{$self->{id}}
      = $AnyEvent::MP::Kernel::NODE{$self->{id}};

   Scalar::Util::weaken $self;

   $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
      $self->transport_error (transport_error => $self->{id}, "connect timeout");
   };

   # maybe @$addresses?
   my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};

   if ($addresses) {
      $self->connect_to ($addresses);
   } else {
      # on global nodes, all bets are off now - we either know the node, or we don't

MP/Node.pm  view on Meta::CPAN

   my $monitor  = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
   my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};

   $interval = ($monitor - $interval) / @$addresses
      if ($monitor - $interval) / @$addresses < $interval;

   $interval = 0.4 if $interval < 0.4;

   my @endpoints = reverse @$addresses;

   $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
      my $endpoint = pop @endpoints
         or return;

      AE::log 9 => "connecting to $self->{id} at $endpoint";

      $self->{trial}{$endpoint} ||= do {
         my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
            or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";

         AnyEvent::MP::Transport::mp_connect

MP/Node.pm  view on Meta::CPAN


sub transport_reset {
   my ($self) = @_;

   Scalar::Util::weaken $self;

   $self->{send} = sub {
      if (++$DELAY > 0) {
         my $msg = $_[0];
         push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
         $DELAY_W ||= AE::timer 0, 0, $send_delayed;
         return;
      }

      local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
      AnyEvent::MP::Kernel::_inject (@{ $_[0] });
   };
}

sub transport_connect {
   my ($self, $tp) = @_;

MP/Node.pm  view on Meta::CPAN

   AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
}

sub kill {
   my (undef, @args) = @_;

   # we _always_ delay kil's, to avoid calling mon callbacks
   # from anything but the event loop context.
   $DELAY = 1;
   push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
   $DELAY_W ||= AE::timer 0, 0, $send_delayed;
}

sub monitor {
   # maybe always delay, too?
   if ($DELAY_W) {
      my @args = @_;
      push @DELAY, sub { AnyEvent::MP::Kernel::_monitor (@args) };
      return;
   }
   &AnyEvent::MP::Kernel::_monitor;

README  view on Meta::CPAN


       # monitoring
       mon $port, $cb->(@msg)      # callback is invoked on death
       mon $port, $localport       # kill localport on abnormal death
       mon $port, $localport, @msg # send message on death

       # temporarily execute code in port context
       peval $port, sub { die "kill the port!" };

       # execute callbacks in $SELF port context
       my $timer = AE::timer 1, 0, psub {
          die "kill the port, delayed";
       };

       # distributed database - modification
       db_set $family => $subkey [=> $value]  # add a subkey
       db_del $family => $subkey...           # delete one or more subkeys
       db_reg $family => $port [=> $value]    # register a port

       # distributed database - queries
       db_family $family => $cb->(\%familyhash)

README  view on Meta::CPAN

        "rcv" callbacks, i.e. runtime errors will cause the port to get
        "kil"ed.

        The effect is basically as if it returned "sub { peval $SELF, sub {
        BLOCK }, @_ }".

        This is useful when you register callbacks from "rcv" callbacks:

           rcv delayed_reply => sub {
              my ($delay, @reply) = @_;
              my $timer = AE::timer $delay, 0, psub {
                 snd @reply, $SELF;
              };
           };

    $guard = mon $port, $rcvport # kill $rcvport when $port dies
    $guard = mon $port # kill $SELF when $port dies
    $guard = mon $port, $cb->(@reason) # call $cb when $port dies
    $guard = mon $port, $rcvport, @msg # send a message when $port dies
        Monitor the given port and do something when the port is killed or
        messages to it were lost, and optionally return a guard that can be

README  view on Meta::CPAN

        Example: send us a restart message when another $port is killed.

           mon $port, $self => "restart";

    $guard = mon_guard $port, $ref, $ref...
        Monitors the given $port and keeps the passed references. When the
        port is killed, the references will be freed.

        Optionally returns a guard that will stop the monitoring.

        This function is useful when you create e.g. timers or other
        watchers and want to free them when the port gets killed (note the
        use of "psub"):

          $port->rcv (start => sub {
             my $timer; $timer = mon_guard $port, AE::timer 1, 1, psub {
                undef $timer if 0.9 < rand;
             });
          });

    kil $port[, @reason]
        Kill the specified port with the given @reason.

        If no @reason is specified, then the port is killed "normally" -
        monitor callback will be invoked, but the kil will not cause linked
        ports ("mon $mport, $lport" form) to get killed.

bin/aemp  view on Meta::CPAN


      delete $to{$node};

      @neigh = grep $_ ne $NODE, @neigh;

      print $node, " -> ", (join " ", @neigh), "\n";

      for my $neigh (@neigh) {
         unless ($seen{$neigh}++) {
            $cv->begin;
            $to{$neigh} = AE::timer 15, 0, sub {
               print "$neigh (timeout)\n";
               $exit = 1;
               $cv->end;
            };
            AnyEvent::MP::Kernel::eval_on $neigh, "AnyEvent::MP::Kernel::up_nodes" => $SELF => $neigh;
         }
      }

      $cv->end;
   };

bin/aemp  view on Meta::CPAN

   };
   $cv->recv;
}

sub node_eval {
   my ($node, $expr) = @_;

   init;

   my $cv = AE::cv;
   my $to = AE::timer 5, 0, sub { exit 1 };
   AnyEvent::MP::Kernel::eval_on $node, $expr, port { &$cv };
   mon $node, $cv;

   my ($err, @res) = $cv->recv;

   die "$err @res" if length $err;

   print +(substr JSON::XS->new->encode (\@res), 1, -1), "\n";
}

sub docmd;

our %CMD = (
   snd => sub {
      my $port = shift @ARGV;
      init;

      snd $port, @ARGV; @ARGV = ();

      my $cv = AE::cv;
      my $to = AE::timer 5, 0, sub { exit 1 };
      mon $port, $cv;
      my $reply = port sub { &$cv };
      snd node_of $port, snd => $reply, "message sent successfully";

      print join " ", $cv->recv, "\n";
   },

   cal => sub {
      my $port = shift @ARGV;
      init;



( run in 1.163 second using v1.01-cache-2.11-cpan-49f99fa48dc )