AnyEvent-MP
view release on metacpan or search on metacpan
if (my $mon = delete $self->{lmon}) {
$_->(@reason) for map @$_, values %$mon;
}
AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
unless $no_transport;
# we weaken the node reference, so it can go away if unused
Scalar::Util::weaken $AnyEvent::MP::Kernel::NODE{$self->{id}}
unless $self->{connect_to};
AE::log 9 => "@reason";
}
# called after handshake was successful
sub transport_connect {
my ($self, $transport) = @_;
delete $self->{trial};
$self->transport_error (transport_error => $self->{id}, "switched connections")
if $self->{transport};
delete $self->{connect_w};
delete $self->{connect_to};
$self->{transport} = $transport;
my $transport_send = $transport->{send};
AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1);
$self->{send} = $transport_send;
$transport_send->($_)
for @{ delete $self->{queue} || [] };
}
sub connect {
my ($self) = @_;
return if $self->{transport};
return if $self->{connect_w};
# we unweaken the node reference, in case it was weakened before
$AnyEvent::MP::Kernel::NODE{$self->{id}}
= $AnyEvent::MP::Kernel::NODE{$self->{id}};
Scalar::Util::weaken $self;
$self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
$self->transport_error (transport_error => $self->{id}, "connect timeout");
};
# maybe @$addresses?
my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
if ($addresses) {
$self->connect_to ($addresses);
} else {
# on global nodes, all bets are off now - we either know the node, or we don't
if ($AnyEvent::MP::Kernel::GLOBAL) {
$self->transport_error (transport_error => $self->{id}, "no known address");
} else {
AnyEvent::MP::Kernel::g_find ($self->{id});
}
}
}
sub connect_to {
my ($self, $addresses) = @_;
return if $self->{transport};
return if $self->{connect_w};
unless (@$addresses) {
$self->transport_error (transport_error => $self->{id}, "no known address");
return;
}
AE::log 9 => "connecting to $self->{id} with [@$addresses]";
my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
$interval = ($monitor - $interval) / @$addresses
if ($monitor - $interval) / @$addresses < $interval;
$interval = 0.4 if $interval < 0.4;
my @endpoints = reverse @$addresses;
$self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
my $endpoint = pop @endpoints
or return;
AE::log 9 => "connecting to $self->{id} at $endpoint";
$self->{trial}{$endpoint} ||= do {
my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
AnyEvent::MP::Transport::mp_connect
$host, $port,
sub { delete $self->{trial}{$endpoint} },
};
};
}
sub kill {
my ($self, $port, @reason) = @_;
$self->{send} (["", kil1 => $port, @reason]);
}
sub monitor {
my ($self, $portid, $cb) = @_;
my $list = $self->{lmon}{$portid} ||= [];
$self->send (["", mon1 => $portid])
( run in 0.894 second using v1.01-cache-2.11-cpan-39bf76dae61 )