AnyEvent-MP
view release on metacpan or search on metacpan
MP/DataConn.pm view on Meta::CPAN
use AnyEvent ();
use AnyEvent::Util ();
use AnyEvent::MP;
use AnyEvent::MP::Kernel ();
our $ID = "a";
our %STATE;
# another node tells us to await a connection
sub _expect {
my ($id, $port, $timeout, $initfunc, @initdata) = @_;
$STATE{$id} = {
id => $id,
to => (AE::timer $timeout, 0, sub {
$STATE{$id}{done}(undef);
}),
done => sub {
my ($hdl, $error) = @_;
%{delete $STATE{$id}} = ();
if (defined $hdl) {
(AnyEvent::MP::Kernel::load_func $initfunc)->(@initdata, $hdl);
} else {
kil $port, AnyEvent::MP::DataConn:: => $error;
}
},
};
}
# AEMP::Transport call for dataconn-connections
sub _inject {
my ($conn, $error) = @_;
my $hdl = defined $error ? undef : delete $conn->{hdl};
my $id = $conn->{local_greeting}{dataconn_id} || $conn->{remote_greeting}{dataconn_id}
or return;
$conn->destroy;
($STATE{$id} or return)->{done}($hdl, $error);
}
# actively connect to some other node
sub _connect {
my ($id, $node) = @_;
my $state = $STATE{$id}
or return;
my $addr = $AnyEvent::MP::Global::addr{$node};
@$addr
or return $state->{done}(undef, "$node: no listeners found");
# I love hardcoded constants !
$state->{next} = AE::timer 0, 2, sub {
my $endpoint = shift @$addr
or return delete $state->{next};
my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
or return;
my $transport; $transport = AnyEvent::MP::Transport::mp_connect
$host, $port,
protocol => "aemp-dataconn",
local_greeting => { dataconn_id => $id },
sub { $transport->destroy }, #TODO: destroys handshaked connections too early
;
};
}
=item AnyEvent::MP::DataConn::connect_to $node, $timeout, $initfunc, @initdata, $cb->($handle)
Creates a socket connection between the local node and the node C<$node>
(which can also be specified as a port). One of the nodes must have
listeners ("binds").
When the connection could be successfully created, the C<$initfunc>
will be called with the given C<@initdata> on the remote node (similar
to C<snd_to_func> or C<spawn>), and the C<AnyEvent::Handle> object
representing the remote connection end as additional argument.
Also, the callback given as last argument will be called with the
AnyEvent::Handle object for the local side.
The AnyEvent::Handle objects will be in a "quiescent" state - you could rip
out the file handle and forget about it, but it is recommended to use it,
as the security settings might have called for a TLS connection. If you
opt to use it, you at least have to set an C<on_error> callback.
In case of any error (timeout etc.), nothing will be called on
the remote side, and the local port will be C<kil>'ed with an C<<
AnyEvent::MP::DataConn => "error message" >> kill reason.
The timeout should be large enough to cover at least four network
round-trips and one message round-trip.
Example: on node1, establish a connection to node2 and send a line of text,
on node2, provide a receiver function.
# node1, code executes in some port context
AnyEvent::MP::DataConn::connect_to "node2", 5, "pkg::receiver", 1, sub {
my ($hdl) = @_;
warn "connection established, sending line.\n"
$hdl->push_write ("blabla\n")
};
# node2
sub pkg::receiver {
my ($one, $hdl) = @_;
warn "connection established, wait for a line...\n"
$hdl->push_read (line => sub {
warn "received a line: $_[1]\n";
undef $hdl;
});
}
=cut
( run in 0.533 second using v1.01-cache-2.11-cpan-39bf76dae61 )