AnyEvent-MP
view release on metacpan or search on metacpan
MP/Transport.pm view on Meta::CPAN
# receive handling
$self->set_snd_framing;
$self->set_rcv_framing;
}
$self->connected;
});
});
});
}
$self
}
sub set_snd_framing {
my ($self) = @_;
my $framing = $self->{s_framing};
my $hdl = $self->{hdl};
my $push_write = $hdl->can ("push_write");
if ($framing eq "cbor") {
require CBOR::XS;
$self->{send} = sub {
$push_write->($hdl, CBOR::XS::encode_cbor ($_[0]));
};
} elsif ($framing eq "json") {
require JSON::XS;
$self->{send} = sub {
$push_write->($hdl, JSON::XS::encode_json ($_[0]));
};
} else {
$self->{send} = sub {
$push_write->($hdl, $framing => $_[0]);
};
}
}
sub set_rcv_framing {
my ($self) = @_;
my $node = $self->{remote_node};
my $framing = $self->{r_framing};
my $hdl = $self->{hdl};
my $push_read = $hdl->can ("push_read");
if ($framing eq "cbor") {
require CBOR::XS;
my $coder = CBOR::XS->new;
$hdl->on_read (sub {
$AnyEvent::MP::Kernel::SRCNODE = $node;
AnyEvent::MP::Kernel::_inject (@$_)
for $coder->incr_parse_multiple ($_[0]{rbuf});
()
});
} elsif ($framing eq "json") {
require JSON::XS;
my $coder = JSON::XS->new->utf8;
$hdl->on_read (sub {
$AnyEvent::MP::Kernel::SRCNODE = $node;
AnyEvent::MP::Kernel::_inject (@$_)
for $coder->incr_parse (delete $_[0]{rbuf});
()
});
} else {
my $rmsg; $rmsg = $self->{rmsg} = sub {
$push_read->($_[0], $framing => $rmsg);
$AnyEvent::MP::Kernel::SRCNODE = $node;
AnyEvent::MP::Kernel::_inject (@{ $_[1] });
};
eval {
$push_read->($hdl, $framing => $rmsg);
};
Scalar::Util::weaken $rmsg;
return $self->error ("$framing: unusable remote framing")
if $@;
}
}
sub error {
my ($self, $msg) = @_;
delete $self->{keepalive};
if ($self->{protocol}) {
$HOOK_PROTOCOL{$self->{protocol}}->($self, $msg);
} else {
AE::log 9 => "$self->{peerhost}:$self->{peerport} disconnected - $msg.";
$self->{node}->transport_error (transport_error => $self->{node}{id}, $msg)
if $self->{node} && $self->{node}{transport} == $self;
}
(delete $self->{release})->()
if exists $self->{release};
$self->destroy;
}
sub connected {
my ($self) = @_;
delete $self->{keepalive};
if ($self->{protocol}) {
$self->{hdl}->on_error (undef);
$HOOK_PROTOCOL{$self->{protocol}}->($self, undef);
} else {
AE::log 9 => "$self->{peerhost}:$self->{peerport} connected as $self->{remote_node}.";
my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node});
Scalar::Util::weaken ($self->{node} = $node);
$node->transport_connect ($self);
( run in 2.244 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )