AnyEvent-MP

 view release on metacpan or  search on metacpan

MP/DataConn.pm  view on Meta::CPAN

   my ($node, $timeout, $initfunc, @initdata) = @_;

   my $port = $SELF
      or Carp::croak "AnyEvent::MP::DataConn::connect_to must be called in port context";

   $node = node_of $node;

   my $id = (++$ID) . "\@$NODE";

   # damn, why do my simple state hashes resemble objects so quickly
   my $state = $STATE{$id} = {
      id   => (++$ID) . "\@$NODE",
      to   => (AE::timer $timeout, 0, sub {
         $STATE{$id}{done}(undef, "$node: unable to establish connection within $timeout seconds");
      }),
      done => sub {
         my ($hdl, $error) = @_;

         delete $AnyEvent::MP::Global::ON_SETUP{$id};
         %{delete $STATE{$id}} = ();

         if (defined $hdl) {
            $cb->($hdl);
         } else {
            kil $port, AnyEvent::MP::DataConn:: => $error;
         }
      },
   };

   if (AnyEvent::MP::Kernel::port_is_local $node) {
      # teh sucks

      require AnyEvent::Util;
      my ($fh1, $fh2) = AnyEvent::Util::portable_socketpair ()
         or return kil $port, AnyEvent::MP::DataConn:: => "cannot create local socketpair: $!";

      use AnyEvent::Handle;
      my $hdl1 = new AnyEvent::Handle fh => $fh1;
      my $hdl2 = new AnyEvent::Handle fh => $fh2;

      (AnyEvent::MP::Kernel::load_func $initfunc)->(@initdata, $hdl2);
      $cb->($hdl1);

   } else {
      AnyEvent::MP::Kernel::snd_to_func $node,
         AnyEvent::MP::DataConn::_expect:: => $id, $port, $timeout, $initfunc, @initdata;

      $state->{wait} = sub {
         if (my $addr = $AnyEvent::MP::Global::addr{$node}) {
            delete $AnyEvent::MP::Global::ON_SETUP{$id};

            # continue connect
            if (@$addr) {
               # node has listeners, so connect
               _connect $id, $node;
            } else {
               # no listeners, ask it to connect to us
               AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id, $NODE;
            }
         } else {
            # wait for the next global setup handshake
            # due to the round-trip at the beginning, this should never be necessary
            $AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait};
         };
      };

      # we actually have to make sure that the connection arrives after the expect message, and
      # the easiest way to do this is to use an rpc call.
      AnyEvent::MP::Kernel::snd_on $node, port { $state->{wait}() };
   }
}

=back

=head1 SEE ALSO

L<AnyEvent::MP>.

=head1 AUTHOR

 Marc Lehmann <schmorp@schmorp.de>
 http://home.schmorp.de/

=cut

1



( run in 0.691 second using v1.01-cache-2.11-cpan-39bf76dae61 )