AnyEvent-MP

 view release on metacpan or  search on metacpan

MP/Transport.pm  view on Meta::CPAN

         peerport => $port,
         %arg,
      ;
      $tp->{keepalive} = $tp;
   }, delete $arg{prepare}
}

=item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)

=cut

sub mp_connect {
   my $release = pop;
   my ($host, $port, @args) = @_;

   new AnyEvent::MP::Transport
      connect  => [$host, $port],
      peerhost => $host,
      peerport => $port,
      release  => $release,
      @args,
   ;
}

=item new AnyEvent::MP::Transport

Create a new transport - usually used via C<mp_server> or C<mp_connect>
instead.

   # immediately starts negotiation
   my $transport = new AnyEvent::MP::Transport
      # mandatory
      fh         => $filehandle,
      local_id   => $identifier,
      on_recv    => sub { receive-callback },
      on_error   => sub { error-callback },

      # optional
      on_greet   => sub { before sending greeting },
      on_greeted => sub { after receiving greeting },
      on_connect => sub { successful-connect-callback },
      greeting   => { key => value },

      # tls support
      tls_ctx    => AnyEvent::TLS,
      peername   => $peername, # for verification
   ;

=cut

sub hmac_sha3_512_hex($$) {
   Digest::HMAC::hmac_hex $_[1], $_[0], \&Digest::SHA3::sha3_512, 72
}

sub new {
   my ($class, %arg) = @_;

   my $self = bless \%arg, $class;

   {
      Scalar::Util::weaken (my $self = $self);

      my $config = $AnyEvent::MP::Kernel::CONFIG;

      my $timeout  = $config->{monitor_timeout};
      my $lframing = $config->{framing_format};
      my $auth_snd = $config->{auth_offer};
      my $auth_rcv = $config->{auth_accept};

      $self->{secret} = $config->{secret}
         unless exists $self->{secret};

      my $secret = $self->{secret};

      if (exists $config->{cert}) {
         $self->{tls_ctx} = {
            sslv2   => 0,
            sslv3   => 0,
            tlsv1   => 1,
            verify  => 1,
            cert    => $config->{cert},
            ca_cert => $config->{cert},
            verify_require_client_cert => 1,
         };
      }

      $self->{hdl} = new AnyEvent::Handle
         +($self->{fh} ? (fh => $self->{fh}) : (connect => $self->{connect})),
         autocork  => $config->{autocork},
         no_delay  => exists $config->{nodelay} ? $config->{nodelay} : 1,
         keepalive => 1,
         on_error  => sub {
            $self->error ($_[2]);
         },
         rtimeout  => $timeout,
      ;

      my $greeting_kv = $self->{local_greeting} ||= {};

      $greeting_kv->{tls}      = "1.0" if $self->{tls_ctx};
      $greeting_kv->{provider} = "AE-$AnyEvent::MP::Config::VERSION";
      $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport};

      my $protocol = $self->{protocol} || "aemp";

      # can modify greeting_kv
      $_->($self) for $protocol eq "aemp" ? @HOOK_GREET : ();
      (delete $self->{on_greet})->($self)
         if exists $self->{on_greet};

      # send greeting
      my $lgreeting1 = "$protocol;$PROTOCOL_VERSION"
                     . ";$AnyEvent::MP::Kernel::NODE"
                     . ";" . (join ",", @$auth_rcv)
                     . ";" . (join ",", @$lframing)
                     . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);

      my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Kernel::nonce (66), "";

      $self->{hdl}->push_write ("$lgreeting1\012$lgreeting2\012");
      return unless $self;

MP/Transport.pm  view on Meta::CPAN


   if ($framing eq "cbor") {
      require CBOR::XS;
      $self->{send} = sub {
         $push_write->($hdl, CBOR::XS::encode_cbor ($_[0]));
      };
   } elsif ($framing eq "json") {
      require JSON::XS;
      $self->{send} = sub {
         $push_write->($hdl, JSON::XS::encode_json ($_[0]));
      };
   } else {
      $self->{send} = sub {
         $push_write->($hdl, $framing => $_[0]);
      };
   }
}

sub set_rcv_framing {
   my ($self) = @_;

   my $node       = $self->{remote_node};
   my $framing    = $self->{r_framing};
   my $hdl        = $self->{hdl};
   my $push_read  = $hdl->can ("push_read");

   if ($framing eq "cbor") {
      require CBOR::XS;
      my $coder = CBOR::XS->new;

      $hdl->on_read (sub {
         $AnyEvent::MP::Kernel::SRCNODE = $node;

         AnyEvent::MP::Kernel::_inject (@$_)
            for $coder->incr_parse_multiple ($_[0]{rbuf});

         ()
      });
   } elsif ($framing eq "json") {
      require JSON::XS;
      my $coder = JSON::XS->new->utf8;

      $hdl->on_read (sub {
         $AnyEvent::MP::Kernel::SRCNODE = $node;

         AnyEvent::MP::Kernel::_inject (@$_)
            for $coder->incr_parse (delete $_[0]{rbuf});

         ()
      });
   } else {
      my $rmsg; $rmsg = $self->{rmsg} = sub {
         $push_read->($_[0], $framing => $rmsg);

         $AnyEvent::MP::Kernel::SRCNODE = $node;
         AnyEvent::MP::Kernel::_inject (@{ $_[1] });
      };
      eval {
         $push_read->($hdl, $framing => $rmsg);
      };
      Scalar::Util::weaken $rmsg;
      return $self->error ("$framing: unusable remote framing")
         if $@;
   }
}

sub error {
   my ($self, $msg) = @_;

   delete $self->{keepalive};

   if ($self->{protocol}) {
      $HOOK_PROTOCOL{$self->{protocol}}->($self, $msg);
   } else {
      AE::log 9 => "$self->{peerhost}:$self->{peerport} disconnected - $msg.";

      $self->{node}->transport_error (transport_error => $self->{node}{id}, $msg)
         if $self->{node} && $self->{node}{transport} == $self;
   }

   (delete $self->{release})->()
      if exists $self->{release};
   
   $self->destroy;
}

sub connected {
   my ($self) = @_;

   delete $self->{keepalive};

   if ($self->{protocol}) {
      $self->{hdl}->on_error (undef);
      $HOOK_PROTOCOL{$self->{protocol}}->($self, undef);
   } else {
      AE::log 9 => "$self->{peerhost}:$self->{peerport} connected as $self->{remote_node}.";

      my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node});
      Scalar::Util::weaken ($self->{node} = $node);
      $node->transport_connect ($self);

      $_->($self) for @HOOK_CONNECT;
   }

   (delete $self->{release})->()
      if exists $self->{release};

   (delete $self->{on_connect})->($self)
      if exists $self->{on_connect};
}

sub destroy {
   my ($self) = @_;

   (delete $self->{release})->()
      if exists $self->{release};

   $self->{hdl}->destroy
      if $self->{hdl};

   (delete $self->{on_destroy})->($self)
      if exists $self->{on_destroy};
   $_->($self) for $self->{protocol} ? () : @HOOK_DESTROY;

   $self->{protocol} = "destroyed"; # to keep hooks from invoked twice.
}

sub DESTROY {
   my ($self) = @_;

   $self->destroy;
}

=back

=head1 PROTOCOL

The AEMP protocol is comparatively simple, and consists of three phases
which are symmetrical for both sides: greeting (followed by optionally
switching to TLS mode), authentication and packet exchange.

The protocol is designed to allow both full-text and binary streams.

The greeting consists of two text lines that are ended by either an ASCII
CR LF pair, or a single ASCII LF (recommended).

=head2 GREETING

All the lines until after authentication must not exceed 4kb in length,
including line delimiter. Afterwards there is no limit on the packet size
that can be received.

=head3 First Greeting Line

Example:

   aemp;0;rain;tls_sha3_512,hmac_sha3_512,tls_anon,cleartext;cbor,json,storable;timeout=12;peeraddr=10.0.0.1:48082

The first line contains strings separated (not ended) by C<;>



( run in 2.094 seconds using v1.01-cache-2.11-cpan-99c4e6809bf )