AnyEvent-MP

 view release on metacpan or  search on metacpan

MP/DataConn.pm  view on Meta::CPAN


use AnyEvent ();
use AnyEvent::Util ();

use AnyEvent::MP;
use AnyEvent::MP::Kernel ();

our $ID = "a";
our %STATE;

# another node tells us to await a connection
sub _expect {
   my ($id, $port, $timeout, $initfunc, @initdata) = @_;

   $STATE{$id} = {
      id   => $id,
      to   => (AE::timer $timeout, 0, sub {
         $STATE{$id}{done}(undef);
      }),
      done => sub {
         my ($hdl, $error) = @_;

         %{delete $STATE{$id}} = ();

         if (defined $hdl) {
            (AnyEvent::MP::Kernel::load_func $initfunc)->(@initdata, $hdl);
         } else {
            kil $port, AnyEvent::MP::DataConn:: => $error;
         }
      },
   };
}

# AEMP::Transport call for dataconn-connections
sub _inject {
   my ($conn, $error) = @_;

   my $hdl = defined $error ? undef : delete $conn->{hdl};
   my $id = $conn->{local_greeting}{dataconn_id} || $conn->{remote_greeting}{dataconn_id}
      or return;

   $conn->destroy;

   ($STATE{$id} or return)->{done}($hdl, $error);
}

# actively connect to some other node
sub _connect {
   my ($id, $node) = @_;

   my $state = $STATE{$id}
      or return;

   my $addr = $AnyEvent::MP::Global::addr{$node};

   @$addr
      or return $state->{done}(undef, "$node: no listeners found");

   # I love hardcoded constants  !
   $state->{next} = AE::timer 0, 2, sub {
      my $endpoint = shift @$addr
         or return delete $state->{next};

      my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
         or return;

      my $transport; $transport = AnyEvent::MP::Transport::mp_connect
         $host, $port,
         protocol => "aemp-dataconn",
         local_greeting => { dataconn_id => $id },
         sub { $transport->destroy }, #TODO: destroys handshaked connections too early
      ;
   };
}

=item AnyEvent::MP::DataConn::connect_to $node, $timeout, $initfunc, @initdata, $cb->($handle)

Creates a socket connection between the local node and the node C<$node>
(which can also be specified as a port). One of the nodes must have
listeners ("binds").

When the connection could be successfully created, the C<$initfunc>
will be called with the given C<@initdata> on the remote node (similar
to C<snd_to_func> or C<spawn>), and the C<AnyEvent::Handle> object
representing the remote connection end as additional argument.

Also, the callback given as last argument will be called with the
AnyEvent::Handle object for the local side.

The AnyEvent::Handle objects will be in a "quiescent" state - you could rip
out the file handle and forget about it, but it is recommended to use it,
as the security settings might have called for a TLS connection. If you
opt to use it, you at least have to set an C<on_error> callback.

In case of any error (timeout etc.), nothing will be called on
the remote side, and the local port will be C<kil>'ed with an C<<
AnyEvent::MP::DataConn => "error message" >> kill reason.

The timeout should be large enough to cover at least four network
round-trips and one message round-trip.

Example: on node1, establish a connection to node2 and send a line of text,
on node2, provide a receiver function.

   # node1, code executes in some port context
   AnyEvent::MP::DataConn::connect_to "node2", 5, "pkg::receiver", 1, sub {
      my ($hdl) = @_;
      warn "connection established, sending line.\n"
      $hdl->push_write ("blabla\n")
   };

   # node2
   sub pkg::receiver {
      my ($one, $hdl) = @_;
      warn "connection established, wait for a line...\n"

      $hdl->push_read (line => sub {
         warn "received a line: $_[1]\n";
         undef $hdl;
      });
   }

=cut



( run in 0.533 second using v1.01-cache-2.11-cpan-39bf76dae61 )