AnyEvent-MP
view release on metacpan or search on metacpan
MP/Transport.pm view on Meta::CPAN
my ($hdl, $rline) = @_;
my ($auth_method, $rauth2, $r_framing) = split /;/, $rline;
my $rauth =
$auth_method eq "hmac_sha3_512" ? hmac_sha3_512_hex $secret, "$rgreeting1\012$rgreeting2\012$lgreeting1\012$lgreeting2\012"
: $auth_method eq "cleartext" ? unpack "H*", $secret
: $auth_method eq "tls_anon" ? ($tls ? "" : "\012\012") # \012\012 never matches
: $auth_method eq "tls_sha3_512" ? ($tls ? Digest::SHA3::sha3_512_hex "$rgreeting1\012$rgreeting2\012$lgreeting1\012$lgreeting2\012" : "\012\012")
: return $self->error ("$auth_method: fatal, selected unsupported rcv auth method");
if ($rauth2 ne $rauth) {
return $self->error ("authentication failure/shared secret mismatch");
}
$self->{r_framing} = $r_framing;
$self->{s_framing} = $s_framing;
$hdl->rbuf_max (undef);
# we rely on TCP retransmit timeouts and keepalives
$self->{hdl}->rtimeout (undef);
$self->{remote_greeting}{untrusted} = 1
if $auth_method eq "tls_anon";
if ($protocol eq "aemp" and $self->{hdl}) {
# listener-less nodes need to continuously probe
# unless (@$AnyEvent::MP::Kernel::BINDS) {
# $self->{hdl}->wtimeout ($timeout);
# $self->{hdl}->on_wtimeout (sub { $self->{send}->([]) });
# }
# receive handling
$self->set_snd_framing;
$self->set_rcv_framing;
}
$self->connected;
});
});
});
}
$self
}
sub set_snd_framing {
my ($self) = @_;
my $framing = $self->{s_framing};
my $hdl = $self->{hdl};
my $push_write = $hdl->can ("push_write");
if ($framing eq "cbor") {
require CBOR::XS;
$self->{send} = sub {
$push_write->($hdl, CBOR::XS::encode_cbor ($_[0]));
};
} elsif ($framing eq "json") {
require JSON::XS;
$self->{send} = sub {
$push_write->($hdl, JSON::XS::encode_json ($_[0]));
};
} else {
$self->{send} = sub {
$push_write->($hdl, $framing => $_[0]);
};
}
}
sub set_rcv_framing {
my ($self) = @_;
my $node = $self->{remote_node};
my $framing = $self->{r_framing};
my $hdl = $self->{hdl};
my $push_read = $hdl->can ("push_read");
if ($framing eq "cbor") {
require CBOR::XS;
my $coder = CBOR::XS->new;
$hdl->on_read (sub {
$AnyEvent::MP::Kernel::SRCNODE = $node;
AnyEvent::MP::Kernel::_inject (@$_)
for $coder->incr_parse_multiple ($_[0]{rbuf});
()
});
} elsif ($framing eq "json") {
require JSON::XS;
my $coder = JSON::XS->new->utf8;
$hdl->on_read (sub {
$AnyEvent::MP::Kernel::SRCNODE = $node;
AnyEvent::MP::Kernel::_inject (@$_)
for $coder->incr_parse (delete $_[0]{rbuf});
()
});
} else {
my $rmsg; $rmsg = $self->{rmsg} = sub {
$push_read->($_[0], $framing => $rmsg);
$AnyEvent::MP::Kernel::SRCNODE = $node;
AnyEvent::MP::Kernel::_inject (@{ $_[1] });
};
eval {
$push_read->($hdl, $framing => $rmsg);
};
Scalar::Util::weaken $rmsg;
return $self->error ("$framing: unusable remote framing")
if $@;
}
}
sub error {
my ($self, $msg) = @_;
delete $self->{keepalive};
if ($self->{protocol}) {
$HOOK_PROTOCOL{$self->{protocol}}->($self, $msg);
} else {
AE::log 9 => "$self->{peerhost}:$self->{peerport} disconnected - $msg.";
$self->{node}->transport_error (transport_error => $self->{node}{id}, $msg)
if $self->{node} && $self->{node}{transport} == $self;
}
(delete $self->{release})->()
if exists $self->{release};
$self->destroy;
}
sub connected {
my ($self) = @_;
delete $self->{keepalive};
if ($self->{protocol}) {
$self->{hdl}->on_error (undef);
$HOOK_PROTOCOL{$self->{protocol}}->($self, undef);
} else {
AE::log 9 => "$self->{peerhost}:$self->{peerport} connected as $self->{remote_node}.";
my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node});
Scalar::Util::weaken ($self->{node} = $node);
$node->transport_connect ($self);
( run in 0.464 second using v1.01-cache-2.11-cpan-39bf76dae61 )