AnyEvent-MP
view release on metacpan or search on metacpan
MP/Kernel.pm view on Meta::CPAN
# monitoring
mon0 => sub { # stop monitoring a port for another node
my $portid = shift;
# the if exists should not be needed, but there is apparently a bug
# elsewhere, and this works around that, silently suppressing that bug. sigh.
_unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid}
if exists $NODE{$SRCNODE};
},
mon1 => sub { # start monitoring a port for another node
my $portid = shift;
Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
_monitor undef, $portid, $node->{rmon}{$portid} = sub {
delete $node->{rmon}{$portid};
$node->send (["", kil0 => $portid, @_])
if $node && $node->{transport};
};
},
# another node has killed a monitored port
kil0 => sub {
my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
or return;
package AnyEvent::MP::Node::Remote; # a remote node
use base "AnyEvent::MP::Node";
# called at init time, mostly sets {send}
sub transport_reset {
my ($self) = @_;
delete $self->{transport};
Scalar::Util::weaken $self;
$self->{send} = sub {
push @{$self->{queue}}, shift;
$self->connect;
};
}
# called each time we fail to establish a connection,
# or the existing connection failed
sub transport_error {
delete $self->{queue};
$self->transport_reset;
if (my $mon = delete $self->{lmon}) {
$_->(@reason) for map @$_, values %$mon;
}
AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
unless $no_transport;
# we weaken the node reference, so it can go away if unused
Scalar::Util::weaken $AnyEvent::MP::Kernel::NODE{$self->{id}}
unless $self->{connect_to};
AE::log 9 => "@reason";
}
# called after handshake was successful
sub transport_connect {
my ($self, $transport) = @_;
delete $self->{trial};
$transport_send->($_)
for @{ delete $self->{queue} || [] };
}
sub connect {
my ($self) = @_;
return if $self->{transport};
return if $self->{connect_w};
# we unweaken the node reference, in case it was weakened before
$AnyEvent::MP::Kernel::NODE{$self->{id}}
= $AnyEvent::MP::Kernel::NODE{$self->{id}};
Scalar::Util::weaken $self;
$self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
$self->transport_error (transport_error => $self->{id}, "connect timeout");
};
# maybe @$addresses?
my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
if ($addresses) {
$self->connect_to ($addresses);
$AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
(shift @DELAY)->()
while @DELAY;
undef $DELAY_W;
$DELAY = -50;
};
sub transport_reset {
my ($self) = @_;
Scalar::Util::weaken $self;
$self->{send} = sub {
if (++$DELAY > 0) {
my $msg = $_[0];
push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
$DELAY_W ||= AE::timer 0, 0, $send_delayed;
return;
}
local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
MP/Transport.pm view on Meta::CPAN
sub hmac_sha3_512_hex($$) {
Digest::HMAC::hmac_hex $_[1], $_[0], \&Digest::SHA3::sha3_512, 72
}
sub new {
my ($class, %arg) = @_;
my $self = bless \%arg, $class;
{
Scalar::Util::weaken (my $self = $self);
my $config = $AnyEvent::MP::Kernel::CONFIG;
my $timeout = $config->{monitor_timeout};
my $lframing = $config->{framing_format};
my $auth_snd = $config->{auth_offer};
my $auth_rcv = $config->{auth_accept};
$self->{secret} = $config->{secret}
unless exists $self->{secret};
MP/Transport.pm view on Meta::CPAN
} else {
my $rmsg; $rmsg = $self->{rmsg} = sub {
$push_read->($_[0], $framing => $rmsg);
$AnyEvent::MP::Kernel::SRCNODE = $node;
AnyEvent::MP::Kernel::_inject (@{ $_[1] });
};
eval {
$push_read->($hdl, $framing => $rmsg);
};
Scalar::Util::weaken $rmsg;
return $self->error ("$framing: unusable remote framing")
if $@;
}
}
sub error {
my ($self, $msg) = @_;
delete $self->{keepalive};
MP/Transport.pm view on Meta::CPAN
delete $self->{keepalive};
if ($self->{protocol}) {
$self->{hdl}->on_error (undef);
$HOOK_PROTOCOL{$self->{protocol}}->($self, undef);
} else {
AE::log 9 => "$self->{peerhost}:$self->{peerport} connected as $self->{remote_node}.";
my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node});
Scalar::Util::weaken ($self->{node} = $node);
$node->transport_connect ($self);
$_->($self) for @HOOK_CONNECT;
}
(delete $self->{release})->()
if exists $self->{release};
(delete $self->{on_connect})->($self)
if exists $self->{on_connect};
( run in 0.277 second using v1.01-cache-2.11-cpan-65fba6d93b7 )