AnyEvent-MP

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

Revision history for AnyEvent::MP

TODO: testsuite
TODO: intro: maybe simple job pool example?
TODO: intro: mention watchdog...
TODO: maybe disbale warnings by default?
TODO: listener-scopes (10.0.0.1:4040@vpn) and connect-scopes ("vpn,public")
TODO: document env-variable usage
TODO: make node objects responsible for keepalive?

faq: can't se anything
faq: all is asynch
faq: how to interface to non-perl nodes?

TODO: check gproto, nproto, on connect
TODO: limiting reconnecting speed when unreachable? somehow use same interval timers as for seeding and keepalive?
TODO: multiple profiles? also some default profiles?
TODO: export keepalive?
TODO: $guard = con $cb->($up)
TODO: aemp readline support
TODO: gleeco re: AE::MP::DataConn -
TODO: version both in MP.pm and MP/Config.pm because of cpan indexer

2.02 Sun Jul 29 04:22:53 CEST 2018
	- hardcode version in MP.pm to help the CPAN indexer.

2.01 Tue Jul 24 09:02:34 CEST 2018
	- try to work around a race condition that we can't identify

Changes  view on Meta::CPAN

          hopefully improve error messages.

1.30 Thu Jun 30 11:30:39 CEST 2011
	- connection errors at the right time would kill node
          connections in a bad way (patch by Malcolm Studd).

1.29 Fri May  7 20:13:39 CEST 2010
	- codename "Sadrak".
        - fix error in callback at AnyEvent/MP/Global.pm line 339
          (found by Sadrak).
	- listener-less nodes had trouble sending keepalive
          messages on write timeouts (found by Sadrak).
        - the monitor guard could cause a memleak due
          to autovivification if a mon was cleared after
          the port was gone (analysed by Sadrak).
        - do not overwrite the config file if we couldn't read it
          for some reason (lesson demonstrated by Sadrak).

1.28 Thu Apr  1 21:23:54 CEST 2010
	- accepted connections didn't correctly set up the SRCNODE
          in some cases, leading to intra-node messages (such as monitoring)

Changes  view on Meta::CPAN

        - the mon_guard return value no longer keeps an additional
          reference to the passed refs.

1.1  Fri Sep 11 04:34:03 CEST 2009
	- bumped the transport protocol to version 1 - sorry, but there
          were too many bugfixes/changes.
	- new function AnyEvent::MP::cal for simple rpc.
	- renamed AnyEvent::MP::Global functions to grp_reg|get|mon.
        - implemented monitoring for Global groups.
	- removed all userspace time-outs from the transport, instead
          rely on tcp retransmit timeouts and tcp keepalive.
	- spawn now delays spawns on the local node artificially, so
          it can return before invoking the init function.
        - break endless recursion between ping-pong ports on the local
          node after ~50 iterations.
        - support JSON-encoded aemp arguments.
        - added aemp restart.
        - support for starting init functions with parameters added.
        - data_format, auth_offer and auth_accept are now configurable, albeit
          not documented.
        - new service: AnyEvent::MP::LogCatcher.

MP.pm  view on Meta::CPAN

In the third form (callback), the callback is simply called with any
number of C<@reason> elements (empty @reason means that the port was deleted
"normally"). Note also that I<< the callback B<must> never die >>, so use
C<eval> if unsure.

In the last form (message), a message of the form C<$rcvport, @msg,
@reason> will be C<snd>.

Monitoring-actions are one-shot: once messages are lost (and a monitoring
alert was raised), they are removed and will not trigger again, even if it
turns out that the port is still alive.

As a rule of thumb, monitoring requests should always monitor a remote
port locally (using a local C<$rcvport> or a callback). The reason is that
kill messages might get lost, just like any other message. Another less
obvious reason is that even monitoring requests can get lost (for example,
when the connection to the other node goes down permanently). When
monitoring a port locally these problems do not exist.

C<mon> effectively guarantees that, in the absence of hardware failures,
after starting the monitor, either all messages sent to the port will

MP/Global.pm  view on Meta::CPAN


      # if the node is global, connect
      g_global_connect $_
         if exists $GLOBAL_NODE{$_};
   }

   # from here on we should be able to act "normally"

   # maintain connections to all global nodes that we know of
   db_mon "'g" => sub {
      keepalive_add $_ for @{ $_[1] };
      keepalive_del $_ for @{ $_[3] };
   };
};

#############################################################################
# compatibility functions for aemp 1.0

package AnyEvent::MP::Global;

use base "Exporter";
our @EXPORT = qw(grp_reg grp_get grp_mon);

MP/Intro.pod  view on Meta::CPAN

  };

The C<peval> function temporarily replaces C<$SELF> by the given C<$port>
and then executes the given sub in a port context.

=head3 Network Errors and the AEMP Guarantee

Earlier we mentioned another important source of monitoring failures:
network problems. When a node loses connection to another node, it will
invoke all monitoring actions, just as if the port was killed, I<even if
it is possible that the port is still happily alive on another node> (not
being able to talk to a node means we have no clue what's going on with
it, it could be crashed, but also still running without knowing we lost
the connection).

So another way to view monitors is: "notify me when some of my messages
couldn't be delivered". AEMP has a guarantee about message delivery to a
port:  After starting a monitor, any message sent to a port will either
be delivered, or, when it is lost, any further messages will also be lost
until the monitoring action is invoked. After that, further messages
I<might> get delivered again.

MP/Kernel.pm  view on Meta::CPAN

      my @res = do { package main; eval shift };
      snd @_, "$@", @res if @_;
   },
   time => sub {
      snd @_, AE::now;
   },
   devnull => sub {
      #
   },
   "" => sub {
      # empty messages are keepalives or similar devnull-applications
   },
);

# the node port
new AnyEvent::MP::Node::Self $NODE; # registers itself in %NODE

$PORT{""} = sub {
   my $tag = shift;
   eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
   AE::log die => "error processing node message from $SRCNODE: $@" if $@;

MP/Kernel.pm  view on Meta::CPAN

      # each time a connection to a seed node goes up, make
      # sure it runs the global service.
      snd $_[0], "g_slave";
   } else {
      # if we lost the connection to a seed node, make sure we are seeding
      seed_again;
   }
};

#############################################################################
# keepalive code - used to kepe conenctions to certain nodes alive
# only used by global code atm., but ought to be exposed somehow.
#TODO: should probbaly be done directly by node objects

our $KEEPALIVE_RETRY;
our $KEEPALIVE_WATCHER;
our %KEEPALIVE; # we want to keep these nodes alive
our %KEEPALIVE_DOWN; # nodes that are down currently

sub keepalive_all {
   AE::log 9 => "keepalive: trying to establish connections with: "
                . (join " ", keys %KEEPALIVE_DOWN)
                . ".";

   (add_node $_)->connect
      for keys %KEEPALIVE_DOWN;

   $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2;
   $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
      if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};

   $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
}

sub keepalive_again {
   $KEEPALIVE_RETRY = (1 + rand) * 0.3;
   keepalive_all;
}

sub keepalive_add {
   return if $KEEPALIVE{$_[0]}++;

   return if node_is_up $_[0];
   undef $KEEPALIVE_DOWN{$_[0]};
   keepalive_again;
}

sub keepalive_del {
   return if --$KEEPALIVE{$_[0]};

   delete $KEEPALIVE     {$_[0]};
   delete $KEEPALIVE_DOWN{$_[0]};

   undef $KEEPALIVE_WATCHER
      unless %KEEPALIVE_DOWN;
}

mon_nodes sub {
   return unless exists $KEEPALIVE{$_[0]};

   if ($_[1]) {
      delete $KEEPALIVE_DOWN{$_[0]};

      undef $KEEPALIVE_WATCHER
         unless %KEEPALIVE_DOWN;
   } else {
      # lost the conenction, try to connect again
      undef $KEEPALIVE_DOWN{$_[0]};
      keepalive_again;
   }
};

#############################################################################
# talk with/to global nodes

# protocol messages:
#
# sent by global nodes
# g_global                  - global nodes send this to all others

MP/Transport.pm  view on Meta::CPAN


   AnyEvent::Socket::tcp_server $host, $port, sub {
      my ($fh, $host, $port) = @_;

      my $tp = new AnyEvent::MP::Transport
         fh       => $fh,
         peerhost => $host,
         peerport => $port,
         %arg,
      ;
      $tp->{keepalive} = $tp;
   }, delete $arg{prepare}
}

=item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)

=cut

sub mp_connect {
   my $release = pop;
   my ($host, $port, @args) = @_;

MP/Transport.pm  view on Meta::CPAN

            cert    => $config->{cert},
            ca_cert => $config->{cert},
            verify_require_client_cert => 1,
         };
      }

      $self->{hdl} = new AnyEvent::Handle
         +($self->{fh} ? (fh => $self->{fh}) : (connect => $self->{connect})),
         autocork  => $config->{autocork},
         no_delay  => exists $config->{nodelay} ? $config->{nodelay} : 1,
         keepalive => 1,
         on_error  => sub {
            $self->error ($_[2]);
         },
         rtimeout  => $timeout,
      ;

      my $greeting_kv = $self->{local_greeting} ||= {};

      $greeting_kv->{tls}      = "1.0" if $self->{tls_ctx};
      $greeting_kv->{provider} = "AE-$AnyEvent::MP::Config::VERSION";

MP/Transport.pm  view on Meta::CPAN


               if ($rauth2 ne $rauth) {
                  return $self->error ("authentication failure/shared secret mismatch");
               }

               $self->{r_framing} = $r_framing;
               $self->{s_framing} = $s_framing;

               $hdl->rbuf_max (undef);

               # we rely on TCP retransmit timeouts and keepalives
               $self->{hdl}->rtimeout (undef);

               $self->{remote_greeting}{untrusted} = 1
                  if $auth_method eq "tls_anon";

               if ($protocol eq "aemp" and $self->{hdl}) {
                  # listener-less nodes need to continuously probe
#                  unless (@$AnyEvent::MP::Kernel::BINDS) {
#                     $self->{hdl}->wtimeout ($timeout);
#                     $self->{hdl}->on_wtimeout (sub { $self->{send}->([]) });

MP/Transport.pm  view on Meta::CPAN

      };
      Scalar::Util::weaken $rmsg;
      return $self->error ("$framing: unusable remote framing")
         if $@;
   }
}

sub error {
   my ($self, $msg) = @_;

   delete $self->{keepalive};

   if ($self->{protocol}) {
      $HOOK_PROTOCOL{$self->{protocol}}->($self, $msg);
   } else {
      AE::log 9 => "$self->{peerhost}:$self->{peerport} disconnected - $msg.";

      $self->{node}->transport_error (transport_error => $self->{node}{id}, $msg)
         if $self->{node} && $self->{node}{transport} == $self;
   }

   (delete $self->{release})->()
      if exists $self->{release};
   
   $self->destroy;
}

sub connected {
   my ($self) = @_;

   delete $self->{keepalive};

   if ($self->{protocol}) {
      $self->{hdl}->on_error (undef);
      $HOOK_PROTOCOL{$self->{protocol}}->($self, undef);
   } else {
      AE::log 9 => "$self->{peerhost}:$self->{peerport} connected as $self->{remote_node}.";

      my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node});
      Scalar::Util::weaken ($self->{node} = $node);
      $node->transport_connect ($self);

MP/Transport.pm  view on Meta::CPAN


   ["", "Some::Function::name", "myownport", 1, 2, 3]

This would call the function C<Some::Function::name> with the string
C<myownport> and some additional arguments.

=head2 MONITORING

Monitoring the connection itself is transport-specific. For TCP, all
connection monitoring is currently left to TCP retransmit time-outs
on a busy link, and TCP keepalive (which should be enabled) for idle
connections.

This is not sufficient for listener-less nodes, however: they need
to regularly send data (30 seconds, or the monitoring interval, is
recommended), so TCP actively probes.

Future implementations of AnyEvent::MP::Transport might query the kernel TCP
buffer after a write timeout occurs, and if it is non-empty, shut down the
connections, but this is an area of future research :)

README  view on Meta::CPAN

        In the third form (callback), the callback is simply called with any
        number of @reason elements (empty @reason means that the port was
        deleted "normally"). Note also that *the callback must never die*,
        so use "eval" if unsure.

        In the last form (message), a message of the form "$rcvport, @msg,
        @reason" will be "snd".

        Monitoring-actions are one-shot: once messages are lost (and a
        monitoring alert was raised), they are removed and will not trigger
        again, even if it turns out that the port is still alive.

        As a rule of thumb, monitoring requests should always monitor a
        remote port locally (using a local $rcvport or a callback). The
        reason is that kill messages might get lost, just like any other
        message. Another less obvious reason is that even monitoring
        requests can get lost (for example, when the connection to the other
        node goes down permanently). When monitoring a port locally these
        problems do not exist.

        "mon" effectively guarantees that, in the absence of hardware



( run in 2.670 seconds using v1.01-cache-2.11-cpan-df04353d9ac )