AnyEvent-MP

 view release on metacpan or  search on metacpan

MP/Transport.pm  view on Meta::CPAN

               my ($hdl, $rline) = @_;

               my ($auth_method, $rauth2, $r_framing) = split /;/, $rline;

               my $rauth =
                  $auth_method eq "hmac_sha3_512" ? hmac_sha3_512_hex $secret, "$rgreeting1\012$rgreeting2\012$lgreeting1\012$lgreeting2\012"
                : $auth_method eq "cleartext"     ? unpack "H*", $secret
                : $auth_method eq "tls_anon"      ? ($tls ? "" : "\012\012") # \012\012 never matches
                : $auth_method eq "tls_sha3_512"  ? ($tls ? Digest::SHA3::sha3_512_hex "$rgreeting1\012$rgreeting2\012$lgreeting1\012$lgreeting2\012" : "\012\012")
                : return $self->error ("$auth_method: fatal, selected unsupported rcv auth method");

               if ($rauth2 ne $rauth) {
                  return $self->error ("authentication failure/shared secret mismatch");
               }

               $self->{r_framing} = $r_framing;
               $self->{s_framing} = $s_framing;

               $hdl->rbuf_max (undef);

               # we rely on TCP retransmit timeouts and keepalives
               $self->{hdl}->rtimeout (undef);

               $self->{remote_greeting}{untrusted} = 1
                  if $auth_method eq "tls_anon";

               if ($protocol eq "aemp" and $self->{hdl}) {
                  # listener-less nodes need to continuously probe
#                  unless (@$AnyEvent::MP::Kernel::BINDS) {
#                     $self->{hdl}->wtimeout ($timeout);
#                     $self->{hdl}->on_wtimeout (sub { $self->{send}->([]) });
#                  }

                  # receive handling
                  $self->set_snd_framing;
                  $self->set_rcv_framing;
               }

               $self->connected;
            });
         });
      });
   }

   $self
}

sub set_snd_framing {
   my ($self) = @_;

   my $framing    = $self->{s_framing};
   my $hdl        = $self->{hdl};
   my $push_write = $hdl->can ("push_write");

   if ($framing eq "cbor") {
      require CBOR::XS;
      $self->{send} = sub {
         $push_write->($hdl, CBOR::XS::encode_cbor ($_[0]));
      };
   } elsif ($framing eq "json") {
      require JSON::XS;
      $self->{send} = sub {
         $push_write->($hdl, JSON::XS::encode_json ($_[0]));
      };
   } else {
      $self->{send} = sub {
         $push_write->($hdl, $framing => $_[0]);
      };
   }
}

sub set_rcv_framing {
   my ($self) = @_;

   my $node       = $self->{remote_node};
   my $framing    = $self->{r_framing};
   my $hdl        = $self->{hdl};
   my $push_read  = $hdl->can ("push_read");

   if ($framing eq "cbor") {
      require CBOR::XS;
      my $coder = CBOR::XS->new;

      $hdl->on_read (sub {
         $AnyEvent::MP::Kernel::SRCNODE = $node;

         AnyEvent::MP::Kernel::_inject (@$_)
            for $coder->incr_parse_multiple ($_[0]{rbuf});

         ()
      });
   } elsif ($framing eq "json") {
      require JSON::XS;
      my $coder = JSON::XS->new->utf8;

      $hdl->on_read (sub {
         $AnyEvent::MP::Kernel::SRCNODE = $node;

         AnyEvent::MP::Kernel::_inject (@$_)
            for $coder->incr_parse (delete $_[0]{rbuf});

         ()
      });
   } else {
      my $rmsg; $rmsg = $self->{rmsg} = sub {
         $push_read->($_[0], $framing => $rmsg);

         $AnyEvent::MP::Kernel::SRCNODE = $node;
         AnyEvent::MP::Kernel::_inject (@{ $_[1] });
      };
      eval {
         $push_read->($hdl, $framing => $rmsg);
      };
      Scalar::Util::weaken $rmsg;
      return $self->error ("$framing: unusable remote framing")
         if $@;
   }
}

sub error {
   my ($self, $msg) = @_;

   delete $self->{keepalive};

   if ($self->{protocol}) {
      $HOOK_PROTOCOL{$self->{protocol}}->($self, $msg);
   } else {
      AE::log 9 => "$self->{peerhost}:$self->{peerport} disconnected - $msg.";

      $self->{node}->transport_error (transport_error => $self->{node}{id}, $msg)
         if $self->{node} && $self->{node}{transport} == $self;
   }

   (delete $self->{release})->()
      if exists $self->{release};
   
   $self->destroy;
}

sub connected {
   my ($self) = @_;

   delete $self->{keepalive};

   if ($self->{protocol}) {
      $self->{hdl}->on_error (undef);
      $HOOK_PROTOCOL{$self->{protocol}}->($self, undef);
   } else {
      AE::log 9 => "$self->{peerhost}:$self->{peerport} connected as $self->{remote_node}.";

      my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node});
      Scalar::Util::weaken ($self->{node} = $node);
      $node->transport_connect ($self);



( run in 0.464 second using v1.01-cache-2.11-cpan-39bf76dae61 )