AnyEvent-MP

 view release on metacpan or  search on metacpan

MP/Kernel.pm  view on Meta::CPAN

   # monitoring
   mon0 => sub { # stop monitoring a port for another node
      my $portid = shift;
      # the if exists should not be needed, but there is apparently a bug
      # elsewhere, and this works around that, silently suppressing that bug. sigh.
      _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid}
         if exists $NODE{$SRCNODE};
   },
   mon1 => sub { # start monitoring a port for another node
      my $portid = shift;
      Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
      _monitor undef, $portid, $node->{rmon}{$portid} = sub {
         delete $node->{rmon}{$portid};
         $node->send (["", kil0 => $portid, @_])
            if $node && $node->{transport};
      };
   },
   # another node has killed a monitored port
   kil0 => sub {
      my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
         or return;

MP/Node.pm  view on Meta::CPAN

package AnyEvent::MP::Node::Remote; # a remote node

use base "AnyEvent::MP::Node";

# called at init time, mostly sets {send}
sub transport_reset {
   my ($self) = @_;

   delete $self->{transport};

   Scalar::Util::weaken $self;

   $self->{send} = sub {
      push @{$self->{queue}}, shift;
      $self->connect;
   };
}

# called each time we fail to establish a connection,
# or the existing connection failed
sub transport_error {

MP/Node.pm  view on Meta::CPAN

   delete $self->{queue};
   $self->transport_reset;

   if (my $mon = delete $self->{lmon}) {
      $_->(@reason) for map @$_, values %$mon;
   }

   AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
      unless $no_transport;

   # we weaken the node reference, so it can go away if unused
   Scalar::Util::weaken $AnyEvent::MP::Kernel::NODE{$self->{id}}
      unless $self->{connect_to};

   AE::log 9 => "@reason";
}

# called after handshake was successful
sub transport_connect {
   my ($self, $transport) = @_;

   delete $self->{trial};

MP/Node.pm  view on Meta::CPAN

   $transport_send->($_)
      for @{ delete $self->{queue} || [] };
}

sub connect {
   my ($self) = @_;

   return if $self->{transport};
   return if $self->{connect_w};

   # we unweaken the node reference, in case it was weakened before
   $AnyEvent::MP::Kernel::NODE{$self->{id}}
      = $AnyEvent::MP::Kernel::NODE{$self->{id}};

   Scalar::Util::weaken $self;

   $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
      $self->transport_error (transport_error => $self->{id}, "connect timeout");
   };

   # maybe @$addresses?
   my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};

   if ($addresses) {
      $self->connect_to ($addresses);

MP/Node.pm  view on Meta::CPAN

   $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
   (shift @DELAY)->()
      while @DELAY;
   undef $DELAY_W;
   $DELAY = -50;
};

sub transport_reset {
   my ($self) = @_;

   Scalar::Util::weaken $self;

   $self->{send} = sub {
      if (++$DELAY > 0) {
         my $msg = $_[0];
         push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
         $DELAY_W ||= AE::timer 0, 0, $send_delayed;
         return;
      }

      local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;

MP/Transport.pm  view on Meta::CPAN

sub hmac_sha3_512_hex($$) {
   Digest::HMAC::hmac_hex $_[1], $_[0], \&Digest::SHA3::sha3_512, 72
}

sub new {
   my ($class, %arg) = @_;

   my $self = bless \%arg, $class;

   {
      Scalar::Util::weaken (my $self = $self);

      my $config = $AnyEvent::MP::Kernel::CONFIG;

      my $timeout  = $config->{monitor_timeout};
      my $lframing = $config->{framing_format};
      my $auth_snd = $config->{auth_offer};
      my $auth_rcv = $config->{auth_accept};

      $self->{secret} = $config->{secret}
         unless exists $self->{secret};

MP/Transport.pm  view on Meta::CPAN

   } else {
      my $rmsg; $rmsg = $self->{rmsg} = sub {
         $push_read->($_[0], $framing => $rmsg);

         $AnyEvent::MP::Kernel::SRCNODE = $node;
         AnyEvent::MP::Kernel::_inject (@{ $_[1] });
      };
      eval {
         $push_read->($hdl, $framing => $rmsg);
      };
      Scalar::Util::weaken $rmsg;
      return $self->error ("$framing: unusable remote framing")
         if $@;
   }
}

sub error {
   my ($self, $msg) = @_;

   delete $self->{keepalive};

MP/Transport.pm  view on Meta::CPAN


   delete $self->{keepalive};

   if ($self->{protocol}) {
      $self->{hdl}->on_error (undef);
      $HOOK_PROTOCOL{$self->{protocol}}->($self, undef);
   } else {
      AE::log 9 => "$self->{peerhost}:$self->{peerport} connected as $self->{remote_node}.";

      my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node});
      Scalar::Util::weaken ($self->{node} = $node);
      $node->transport_connect ($self);

      $_->($self) for @HOOK_CONNECT;
   }

   (delete $self->{release})->()
      if exists $self->{release};

   (delete $self->{on_connect})->($self)
      if exists $self->{on_connect};



( run in 0.277 second using v1.01-cache-2.11-cpan-65fba6d93b7 )