AnyEvent-MP
view release on metacpan or search on metacpan
MP/Transport.pm view on Meta::CPAN
peerport => $port,
%arg,
;
$tp->{keepalive} = $tp;
}, delete $arg{prepare}
}
=item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)
=cut
sub mp_connect {
my $release = pop;
my ($host, $port, @args) = @_;
new AnyEvent::MP::Transport
connect => [$host, $port],
peerhost => $host,
peerport => $port,
release => $release,
@args,
;
}
=item new AnyEvent::MP::Transport
Create a new transport - usually used via C<mp_server> or C<mp_connect>
instead.
# immediately starts negotiation
my $transport = new AnyEvent::MP::Transport
# mandatory
fh => $filehandle,
local_id => $identifier,
on_recv => sub { receive-callback },
on_error => sub { error-callback },
# optional
on_greet => sub { before sending greeting },
on_greeted => sub { after receiving greeting },
on_connect => sub { successful-connect-callback },
greeting => { key => value },
# tls support
tls_ctx => AnyEvent::TLS,
peername => $peername, # for verification
;
=cut
sub hmac_sha3_512_hex($$) {
Digest::HMAC::hmac_hex $_[1], $_[0], \&Digest::SHA3::sha3_512, 72
}
sub new {
my ($class, %arg) = @_;
my $self = bless \%arg, $class;
{
Scalar::Util::weaken (my $self = $self);
my $config = $AnyEvent::MP::Kernel::CONFIG;
my $timeout = $config->{monitor_timeout};
my $lframing = $config->{framing_format};
my $auth_snd = $config->{auth_offer};
my $auth_rcv = $config->{auth_accept};
$self->{secret} = $config->{secret}
unless exists $self->{secret};
my $secret = $self->{secret};
if (exists $config->{cert}) {
$self->{tls_ctx} = {
sslv2 => 0,
sslv3 => 0,
tlsv1 => 1,
verify => 1,
cert => $config->{cert},
ca_cert => $config->{cert},
verify_require_client_cert => 1,
};
}
$self->{hdl} = new AnyEvent::Handle
+($self->{fh} ? (fh => $self->{fh}) : (connect => $self->{connect})),
autocork => $config->{autocork},
no_delay => exists $config->{nodelay} ? $config->{nodelay} : 1,
keepalive => 1,
on_error => sub {
$self->error ($_[2]);
},
rtimeout => $timeout,
;
my $greeting_kv = $self->{local_greeting} ||= {};
$greeting_kv->{tls} = "1.0" if $self->{tls_ctx};
$greeting_kv->{provider} = "AE-$AnyEvent::MP::Config::VERSION";
$greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport};
my $protocol = $self->{protocol} || "aemp";
# can modify greeting_kv
$_->($self) for $protocol eq "aemp" ? @HOOK_GREET : ();
(delete $self->{on_greet})->($self)
if exists $self->{on_greet};
# send greeting
my $lgreeting1 = "$protocol;$PROTOCOL_VERSION"
. ";$AnyEvent::MP::Kernel::NODE"
. ";" . (join ",", @$auth_rcv)
. ";" . (join ",", @$lframing)
. (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Kernel::nonce (66), "";
$self->{hdl}->push_write ("$lgreeting1\012$lgreeting2\012");
return unless $self;
MP/Transport.pm view on Meta::CPAN
if ($framing eq "cbor") {
require CBOR::XS;
$self->{send} = sub {
$push_write->($hdl, CBOR::XS::encode_cbor ($_[0]));
};
} elsif ($framing eq "json") {
require JSON::XS;
$self->{send} = sub {
$push_write->($hdl, JSON::XS::encode_json ($_[0]));
};
} else {
$self->{send} = sub {
$push_write->($hdl, $framing => $_[0]);
};
}
}
sub set_rcv_framing {
my ($self) = @_;
my $node = $self->{remote_node};
my $framing = $self->{r_framing};
my $hdl = $self->{hdl};
my $push_read = $hdl->can ("push_read");
if ($framing eq "cbor") {
require CBOR::XS;
my $coder = CBOR::XS->new;
$hdl->on_read (sub {
$AnyEvent::MP::Kernel::SRCNODE = $node;
AnyEvent::MP::Kernel::_inject (@$_)
for $coder->incr_parse_multiple ($_[0]{rbuf});
()
});
} elsif ($framing eq "json") {
require JSON::XS;
my $coder = JSON::XS->new->utf8;
$hdl->on_read (sub {
$AnyEvent::MP::Kernel::SRCNODE = $node;
AnyEvent::MP::Kernel::_inject (@$_)
for $coder->incr_parse (delete $_[0]{rbuf});
()
});
} else {
my $rmsg; $rmsg = $self->{rmsg} = sub {
$push_read->($_[0], $framing => $rmsg);
$AnyEvent::MP::Kernel::SRCNODE = $node;
AnyEvent::MP::Kernel::_inject (@{ $_[1] });
};
eval {
$push_read->($hdl, $framing => $rmsg);
};
Scalar::Util::weaken $rmsg;
return $self->error ("$framing: unusable remote framing")
if $@;
}
}
sub error {
my ($self, $msg) = @_;
delete $self->{keepalive};
if ($self->{protocol}) {
$HOOK_PROTOCOL{$self->{protocol}}->($self, $msg);
} else {
AE::log 9 => "$self->{peerhost}:$self->{peerport} disconnected - $msg.";
$self->{node}->transport_error (transport_error => $self->{node}{id}, $msg)
if $self->{node} && $self->{node}{transport} == $self;
}
(delete $self->{release})->()
if exists $self->{release};
$self->destroy;
}
sub connected {
my ($self) = @_;
delete $self->{keepalive};
if ($self->{protocol}) {
$self->{hdl}->on_error (undef);
$HOOK_PROTOCOL{$self->{protocol}}->($self, undef);
} else {
AE::log 9 => "$self->{peerhost}:$self->{peerport} connected as $self->{remote_node}.";
my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node});
Scalar::Util::weaken ($self->{node} = $node);
$node->transport_connect ($self);
$_->($self) for @HOOK_CONNECT;
}
(delete $self->{release})->()
if exists $self->{release};
(delete $self->{on_connect})->($self)
if exists $self->{on_connect};
}
sub destroy {
my ($self) = @_;
(delete $self->{release})->()
if exists $self->{release};
$self->{hdl}->destroy
if $self->{hdl};
(delete $self->{on_destroy})->($self)
if exists $self->{on_destroy};
$_->($self) for $self->{protocol} ? () : @HOOK_DESTROY;
$self->{protocol} = "destroyed"; # to keep hooks from invoked twice.
}
sub DESTROY {
my ($self) = @_;
$self->destroy;
}
=back
=head1 PROTOCOL
The AEMP protocol is comparatively simple, and consists of three phases
which are symmetrical for both sides: greeting (followed by optionally
switching to TLS mode), authentication and packet exchange.
The protocol is designed to allow both full-text and binary streams.
The greeting consists of two text lines that are ended by either an ASCII
CR LF pair, or a single ASCII LF (recommended).
=head2 GREETING
All the lines until after authentication must not exceed 4kb in length,
including line delimiter. Afterwards there is no limit on the packet size
that can be received.
=head3 First Greeting Line
Example:
aemp;0;rain;tls_sha3_512,hmac_sha3_512,tls_anon,cleartext;cbor,json,storable;timeout=12;peeraddr=10.0.0.1:48082
The first line contains strings separated (not ended) by C<;>
( run in 2.094 seconds using v1.01-cache-2.11-cpan-99c4e6809bf )