AnyEvent-MP
view release on metacpan or search on metacpan
AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1);
$self->{send} = $transport_send;
$transport_send->($_)
for @{ delete $self->{queue} || [] };
}
sub connect {
my ($self) = @_;
return if $self->{transport};
return if $self->{connect_w};
# we unweaken the node reference, in case it was weakened before
$AnyEvent::MP::Kernel::NODE{$self->{id}}
= $AnyEvent::MP::Kernel::NODE{$self->{id}};
Scalar::Util::weaken $self;
$self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
$self->transport_error (transport_error => $self->{id}, "connect timeout");
};
# maybe @$addresses?
my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
if ($addresses) {
$self->connect_to ($addresses);
} else {
# on global nodes, all bets are off now - we either know the node, or we don't
if ($AnyEvent::MP::Kernel::GLOBAL) {
$self->transport_error (transport_error => $self->{id}, "no known address");
} else {
AnyEvent::MP::Kernel::g_find ($self->{id});
}
}
}
sub connect_to {
my ($self, $addresses) = @_;
return if $self->{transport};
return if $self->{connect_w};
unless (@$addresses) {
$self->transport_error (transport_error => $self->{id}, "no known address");
return;
}
AE::log 9 => "connecting to $self->{id} with [@$addresses]";
my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
$interval = ($monitor - $interval) / @$addresses
if ($monitor - $interval) / @$addresses < $interval;
$interval = 0.4 if $interval < 0.4;
my @endpoints = reverse @$addresses;
$self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
my $endpoint = pop @endpoints
or return;
AE::log 9 => "connecting to $self->{id} at $endpoint";
$self->{trial}{$endpoint} ||= do {
my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
AnyEvent::MP::Transport::mp_connect
$host, $port,
sub { delete $self->{trial}{$endpoint} },
};
};
}
sub kill {
my ($self, $port, @reason) = @_;
$self->{send} (["", kil1 => $port, @reason]);
}
sub monitor {
my ($self, $portid, $cb) = @_;
my $list = $self->{lmon}{$portid} ||= [];
$self->send (["", mon1 => $portid])
unless @$list || !length $portid;
push @$list, $cb;
}
sub unmonitor {
my ($self, $portid, $cb) = @_;
my $list = $self->{lmon}{$portid}
or return;
@$list = grep $_ != $cb, @$list;
unless (@$list) {
$self->send (["", mon0 => $portid]);
delete $self->{monitor}{$portid};
}
}
package AnyEvent::MP::Node::Self; # the local node
use base "AnyEvent::MP::Node";
sub connect {
# we are trivially connected
}
# delay every so often to avoid recursion, also used to delay after spawn
our $DELAY = -50;
our @DELAY;
our $DELAY_W;
our $send_delayed = sub {
$AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
(shift @DELAY)->()
while @DELAY;
undef $DELAY_W;
$DELAY = -50;
};
sub transport_reset {
my ($self) = @_;
Scalar::Util::weaken $self;
( run in 0.647 second using v1.01-cache-2.11-cpan-39bf76dae61 )