AnyEvent-MP

 view release on metacpan or  search on metacpan

MP.pm  view on Meta::CPAN


   port rcv mon mon_guard psub peval spawn cal
   db_set db_del db_reg
   db_mon db_family db_keys db_values

   after
);

our $SELF;

sub _self_die() {
   my $msg = $@;
   $msg =~ s/\n+$// unless ref $msg;
   kil $SELF, die => $msg;
}

=item $thisnode = NODE / $NODE

The C<NODE> function returns, and the C<$NODE> variable contains, the node
ID of the node running in the current process. This value is initialised by
a call to C<configure>.

MP.pm  view on Meta::CPAN

If you want to stop/destroy the port, simply C<kil> it:

   my $port = port {
      my @msg = @_;
      ...
      kil $SELF;
   };

=cut

sub rcv($@);

my $KILME = sub {
   (my $tag = substr $_[0], 0, 30) =~ s/([^\x20-\x7e])/./g;
   kil $SELF, unhandled_message => "no callback found for message '$tag'";
};

sub port(;&) {
   my $id = $UNIQ . ++$ID;
   my $port = "$NODE#$id";

   rcv $port, shift || $KILME;

   $port
}

=item rcv $local_port, $callback->(@msg)

MP.pm  view on Meta::CPAN

(e.g. for an rpc reply) and unregister it after a message was received.

   rcv $port, $otherport => sub {
      my @reply = @_;

      rcv $SELF, $otherport;
   };

=cut

sub rcv($@) {
   my $port = shift;
   my ($nodeid, $portid) = split /#/, $port, 2;

   $nodeid eq $NODE
      or Carp::croak "$port: rcv can only be called on local ports, caught";

   while (@_) {
      if (ref $_[0]) {
         if (my $self = $PORT_DATA{$portid}) {
            "AnyEvent::MP::Port" eq ref $self

MP.pm  view on Meta::CPAN


   my $port = port { ... };

   peval $port, sub {
      init
         or die "unable to init";
   };

=cut

sub peval($$) {
   local $SELF = shift;
   my $cb = shift;

   if (wantarray) {
      my @res = eval { &$cb };
      _self_die if $@;
      @res
   } else {
      my $res = eval { &$cb };
      _self_die if $@;

MP.pm  view on Meta::CPAN


   rcv delayed_reply => sub {
      my ($delay, @reply) = @_;
      my $timer = AE::timer $delay, 0, psub {
         snd @reply, $SELF;
      };
   };

=cut

sub psub(&) {
   my $cb = shift;

   my $port = $SELF
      or Carp::croak "psub can only be called from within rcv or psub callbacks, not";

   sub {
      local $SELF = $port;

      if (wantarray) {
         my @res = eval { &$cb };

MP.pm  view on Meta::CPAN

   my $init = shift;

   # rcv will create the actual port
   local $SELF = "$NODE#$port";
   eval {
      &{ load_func $init }
   };
   _self_die if $@;
}

sub spawn(@) {
   my ($nodeid, undef) = split /#/, shift, 2;

   my $id = $RUNIQ . ++$ID;

   $_[0] =~ /::/
      or Carp::croak "spawn init function must be a fully-qualified name, caught";

   snd_to_func $nodeid, "AnyEvent::MP::_spawn" => $id, @_;

   "$nodeid#$id"

MP.pm  view on Meta::CPAN


Either sends the given message, or call the given callback, after the
specified number of seconds.

This is simply a utility function that comes in handy at times - the
AnyEvent::MP author is not convinced of the wisdom of having it, though,
so it may go away in the future.

=cut

sub after($@) {
   my ($timeout, @action) = @_;

   my $t; $t = AE::timer $timeout, 0, sub {
      undef $t;
      ref $action[0]
         ? $action[0]()
         : snd @action;
   };
}

MP.pm  view on Meta::CPAN


If no time-out is given (or it is C<undef>), then the local port will
monitor the remote port instead, so it eventually gets cleaned-up.

Currently this function returns the temporary port, but this "feature"
might go in future versions unless you can make a convincing case that
this is indeed useful for something.

=cut

sub cal(@) {
   my $timeout = ref $_[-1] ? undef : pop;
   my $cb = pop;

   my $port = port {
      undef $timeout;
      kil $SELF;
      &$cb;
   };

   if (defined $timeout) {

MP.pm  view on Meta::CPAN


If C<$value> is missing, C<undef> is used. If C<$port> is missing, then
C<$SELF> is used.

This function is most useful to register a port in some port group (which
is just another name for a database family), and have it removed when the
port is gone. This works best when the port is a local port.

=cut

sub db_reg($$;$) {
   my $family = shift;
   my $port = @_ ? shift : $SELF;

   my $clr = sub { db_del $family => $port };
   mon $port, $clr;

   db_set $family => $port => $_[0];

   defined wantarray
      and &Guard::guard ($clr)

MP/Config.pm  view on Meta::CPAN

   unlink "$CONFIG_FILE~";
   link $CONFIG_FILE, "$CONFIG_FILE~";
   rename "$CONFIG_FILE~new~", $CONFIG_FILE
      or Carp::croak "$CONFIG_FILE: $!";
}

sub config {
   \%CFG
}

sub _find_profile($);
sub _find_profile($) {
   my ($name) = @_;

   if (defined $name) {
      my $profile = $CFG{profile}{$name};
      return _find_profile $profile->{parent}, %$profile;
   } else {
      return %CFG;
   }
}

sub find_profile($;%) {
   my ($name, %kv) = @_;

   my $norc  = delete $kv{norc};
   my $force = delete $kv{force};

   %kv = (
      monitor_timeout  => 30,
      connect_interval => 2,
      framing_format   => [qw(cbor json storable)], # framing types we offer and accept, in order of preference
      auth_offer       => [qw(tls_sha3_512 hmac_sha3_512)], # what we will send

MP/DataConn.pm  view on Meta::CPAN

      warn "connection established, wait for a line...\n"

      $hdl->push_read (line => sub {
         warn "received a line: $_[1]\n";
         undef $hdl;
      });
   }

=cut

sub connect_to($$$$@) {
   my $cb = pop;
   my ($node, $timeout, $initfunc, @initdata) = @_;

   my $port = $SELF
      or Carp::croak "AnyEvent::MP::DataConn::connect_to must be called in port context";

   $node = node_of $node;

   my $id = (++$ID) . "\@$NODE";

MP/Global.pm  view on Meta::CPAN

   # tell subscribers we have changed the family
   if (%$set || %local_set || @del_local) {
      @$set{keys %local_set} = values %local_set;

      snd $_ => g_chg2 => $family, $set, \@del_local
         for keys %{ $GLOBAL_MON{$family} };
   }
}

# set the whole (node-local) database - previous value must be empty
sub g_set($$) {
   my ($node, $db) = @_;

   while (my ($f, $k) = each %$db) {
      g_upd $node, $f, $k;
   }
}

# delete all keys from a database
sub g_clr($) {
   my ($node) = @_;

   my $db = $LOCAL_DBS{$node};

   while (my ($f, $k) = each %$db) {
      g_upd $node, $f, undef, [keys %$k];
   }

   delete $LOCAL_DBS{$node};
}

MP/Global.pm  view on Meta::CPAN

   snd $SRCNODE, g_reply => $id, [keys %{ $GLOBAL_DB{$family} } ];
};

$NODE_REQ{g_db_values} = sub {
   my ($family, $id) = @_;
   snd $SRCNODE, g_reply => $id, [values %{ $GLOBAL_DB{$family} } ];
};

# monitoring

sub g_disconnect($) {
   my ($node) = @_;

   delete $GLOBAL_NODE{$node}; # also done in Kernel.pm, but doing it here avoids overhead

   db_del "'g" => $node;
   db_del "'l" => $node;
   g_clr $node;

   if (my $mon = delete $GLOBAL_SLAVE{$node}) {
      while (my ($f, $fv) = each %$mon) {

MP/Global.pm  view on Meta::CPAN

};

#############################################################################
# compatibility functions for aemp 1.0

package AnyEvent::MP::Global;

use base "Exporter";
our @EXPORT = qw(grp_reg grp_get grp_mon);

sub grp_reg($$) {
   &db_reg
}

sub grp_get($) {
   my @ports = keys %{ $AnyEvent::MP::Kernel::GLOBAL_DB{$_[0]} };

   @ports ? \@ports : undef
}

sub grp_mon($$) {
   my ($grp, $cb) = @_;

   db_mon $grp => sub {
      my ($ports, $add, $chg, $del) = @_;

      $cb->([keys %$ports], $add, $del);
   };
}

=head1 SEE ALSO

MP/Kernel.pm  view on Meta::CPAN

);

our @EXPORT = qw(
   snd_to_func snd_on eval_on
   port_is_local
   up_nodes mon_nodes node_is_up
);

our @CARP_NOT = (AnyEvent::MP::);

sub load_func($) {
   my $func = $_[0];

   unless (defined &$func) {
      my $pkg = $func;
      do {
         $pkg =~ s/::[^:]+$//
            or return sub { die "unable to resolve function '$func'" };

         local $@;
         unless (eval "require $pkg; 1") {

MP/Kernel.pm  view on Meta::CPAN

               or return sub { die $error };
         }
      } until defined &$func;
   }

   \&$func
}

my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');

sub nonce($) {
   join "", map chr rand 256, 1 .. $_[0]
}

sub nonce62($) {
   join "", map $alnum[rand 62], 1 .. $_[0]
}

our $CONFIG; # this node's configuration
our $SECURE;

our $RUNIQ; # remote uniq value
our $UNIQ;  # per-process/node unique cookie
our $NODE;
our $ID = "a";

MP/Kernel.pm  view on Meta::CPAN

      ) . nonce62 4
   ;

   # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
   $RUNIQ = nonce62 10;
   $RUNIQ =~ s/(.)$/\U$1/;

   $NODE = "";
}

sub NODE() {
   $NODE
}

sub node_of($) {
   my ($node, undef) = split /#/, $_[0], 2;

   $node
}

BEGIN {
   *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
      ? sub () { 1 }
      : sub () { 0 };
}

our $DELAY_TIMER;
our @DELAY_QUEUE;

our $delay_run = sub {
   (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
};

sub delay($) {
   push @DELAY_QUEUE, shift;
   $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
}

=item $AnyEvent::MP::Kernel::SRCNODE

During execution of a message callback, this variable contains the node ID
of the origin node.

The main use of this variable is for debugging output - there are probably

MP/Kernel.pm  view on Meta::CPAN

      my ($node) = @_;

      length $node
         or Carp::croak "'undef' or the empty string are not valid node/port IDs";

      # registers itself in %NODE
      new AnyEvent::MP::Node::Remote $node
   }
}

sub snd(@) {
   my ($nodeid, $portid) = split /#/, shift, 2;

   warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;

   ($NODE{$nodeid} || add_node $nodeid)
      ->{send} (["$portid", @_]);
}

sub port_is_local($) {
   my ($nodeid, undef) = split /#/, $_[0], 2;

   $nodeid eq $NODE
}

=item snd_to_func $node, $func, @args

Expects a node ID and a name of a function. Asynchronously tries to call
this function with the given arguments on that node.

This function can be used to implement C<spawn>-like interfaces.

=cut

sub snd_to_func($$;@) {
   my $nodeid = shift;

   # on $NODE, we artificially delay... (for spawn)
   # this is very ugly - maybe we should simply delay ALL messages,
   # to avoid deep recursion issues. but that's so... slow...
   $AnyEvent::MP::Node::Self::DELAY = 1
      if $nodeid ne $NODE;

   ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
}

=item snd_on $node, @msg

Executes C<snd> with the given C<@msg> (which must include the destination
port) on the given node.

=cut

sub snd_on($@) {
   my $node = shift;
   snd $node, snd => @_;
}

=item eval_on $node, $string[, @reply]

Evaluates the given string as Perl expression on the given node. When
@reply is specified, then it is used to construct a reply message with
C<"$@"> and any results from the eval appended.

=cut

sub eval_on($$;@) {
   my $node = shift;
   snd $node, eval => @_;
}

sub kil(@) {
   my ($nodeid, $portid) = split /#/, shift, 2;

   length $portid
      or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";

   ($NODE{$nodeid} || add_node $nodeid)
      ->kill ("$portid", @_);
}

#############################################################################

MP/Kernel.pm  view on Meta::CPAN


Returns true if the given node is "up", that is, the kernel thinks it has
a working connection to it.

More precisely, if the node is up, returns C<1>. If the node is currently
connecting or otherwise known but not connected, returns C<0>. If nothing
is known about the node, returns C<undef>.

=cut

sub node_is_up($) {
   ($_[0] eq $NODE) || ($NODE{$_[0]} or return)->{transport}
      ? 1 : 0
}

=item @nodes = up_nodes

Return the node IDs of all nodes that are currently connected (excluding
the node itself).

=cut

sub up_nodes() {
   map $_->{id}, grep $_->{transport}, values %NODE
}

=item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)

Registers a callback that is called each time a node goes up (a connection
is established) or down (the connection is lost).

Node up messages can only be followed by node down messages for the same
node, and vice versa.

MP/Kernel.pm  view on Meta::CPAN

Example: make sure you call function C<newnode> for all nodes that are up
or go up (and down).

   newnode $_, 1 for up_nodes;
   mon_nodes \&newnode;

=cut

our %MON_NODES;

sub mon_nodes($) {
   my ($cb) = @_;

   $MON_NODES{$cb+0} = $cb;

   defined wantarray
      and Guard::guard { delete $MON_NODES{$cb+0} }
}

sub _inject_nodeevent($$;@) {
   my ($node, $up, @reason) = @_;

   AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");

   for my $cb (values %MON_NODES) {
      eval { $cb->($node->{id}, $up, @reason); 1 }
         or AE::log die => $@;
   }
}

MP/Kernel.pm  view on Meta::CPAN

      undef $SEED_WATCHER;
   }
}

sub seed_again {
   $SEED_RETRY = (1 + rand) * 0.6;
   $SEED_WATCHER ||= AE::timer 0, 0, \&seed_all;
}

# sets new seed list, starts connecting
sub set_seeds(@) {
   %SEED_NODE     = ();
   %NODE_SEED     = ();
   %SEED_CONNECT  = ();

   @SEED_NODE{@_} = ();

   seed_again;
}

# normal nodes only record global node connections

MP/Kernel.pm  view on Meta::CPAN

   &{ $NODE_REQ{g_slave} }
};

#############################################################################
# local database operations

# canonical probably not needed
our $sv_eq_coder = JSON::XS->new->utf8->allow_nonref;

# are the two scalars equal? very very ugly and slow, need better way
sub sv_eq($$) {
   ref $_[0] || ref $_[1]
      ? (JSON::XS::encode $sv_eq_coder, $_[0]) eq (JSON::XS::encode $sv_eq_coder, $_[1])
      : $_[0] eq $_[1]
        && defined $_[0] == defined $_[1]
}

# local database management

sub db_del($@) {
   my $family = shift;

   my @del = grep exists $LOCAL_DB{$family}{$_}, @_;

   return unless @del;

   delete @{ $LOCAL_DB{$family} }{@del};
   snd $MASTER, g_upd => $family => undef, \@del
      if defined $MASTER;
}

sub db_set($$;$) {
   my ($family, $subkey) = @_;

#   if (ref $_[1]) {
#      # bulk
#      my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
#      $LOCAL_DB{$_[0]} = $_[1];
#      snd $MASTER, g_upd => $_[0] => $_[1], \@del
#         if defined $MASTER;
#   } else {
      # single-key

MP/Kernel.pm  view on Meta::CPAN

sub db_values {
   my ($family, $cb) = @_;
   global_call g_db_values => $family, $cb;
}

# database monitoring

our %LOCAL_MON; # f, reply
our %MON_DB;    # f, k, value

sub db_mon($@) {
   my ($family, $cb) = @_;

   if (my $db = $MON_DB{$family}) {
      # we already monitor, so create a "dummy" change event
      # this is postponed, which might be too late (we could process
      # change events), so disable the callback at first
      $LOCAL_MON{$family}{$cb+0} = sub { };
      AE::postpone {
         return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already

MP/Kernel.pm  view on Meta::CPAN

};

#############################################################################
# configure

sub nodename {
   require POSIX;
   (POSIX::uname ())[1]
}

sub _resolve($) {
   my ($nodeid) = @_;

   my $cv = AE::cv;
   my @res;

   $cv->begin (sub {
      my %seen;
      my @refs;
      for (sort { $a->[0] <=> $b->[0] } @res) {
         push @refs, $_->[1] unless $seen{$_->[1]}++

MP/Kernel.pm  view on Meta::CPAN

   }

   $cv->end;

   $cv
}

our @POST_CONFIGURE;

# not yet documented
sub post_configure(&) {
   die "AnyEvent::MP::Kernel::post_configure must be called in void context" if defined wantarray;

   push @POST_CONFIGURE, @_;
   (shift @POST_CONFIGURE)->() while $NODE && @POST_CONFIGURE;
}

sub configure(@) {
   unshift @_, "profile" if @_ & 1;
   my (%kv) = @_;

   my $profile = delete $kv{profile};

   $profile = nodename
      unless defined $profile;

   $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;

MP/Transport.pm  view on Meta::CPAN


Creates a listener on the given host/port using
C<AnyEvent::Socket::tcp_server>.

See C<new>, below, for constructor arguments.

Defaults for peerhost, peerport and fh are provided.

=cut

sub mp_server($$;%) {
   my ($host, $port, %arg) = @_;

   AnyEvent::Socket::tcp_server $host, $port, sub {
      my ($fh, $host, $port) = @_;

      my $tp = new AnyEvent::MP::Transport
         fh       => $fh,
         peerhost => $host,
         peerport => $port,
         %arg,

MP/Transport.pm  view on Meta::CPAN

      on_connect => sub { successful-connect-callback },
      greeting   => { key => value },

      # tls support
      tls_ctx    => AnyEvent::TLS,
      peername   => $peername, # for verification
   ;

=cut

sub hmac_sha3_512_hex($$) {
   Digest::HMAC::hmac_hex $_[1], $_[0], \&Digest::SHA3::sha3_512, 72
}

sub new {
   my ($class, %arg) = @_;

   my $self = bless \%arg, $class;

   {
      Scalar::Util::weaken (my $self = $self);



( run in 0.273 second using v1.01-cache-2.11-cpan-a5abf4f5562 )