AnyEvent-PacketForwarder
view release on metacpan or search on metacpan
lib/AnyEvent/PacketForwarder.pm view on Meta::CPAN
package AnyEvent::PacketForwarder;
use strict;
use warnings;
our $VERSION = '0.01';
require Exporter;
our @ISA = qw(Exporter);
our @EXPORT = qw(packet_forwarder);
use AnyEvent;
use AnyEvent::PacketReader;
use Errno qw(EPIPE EMSGSIZE EINTR EAGAIN EWOULDBLOCK ENODATA);
use Carp;
our @CARP_NOT = qw(AnyEvent::PacketReader);
our $QUEUE_SIZE = 10;
sub packet_forwarder {
my $cb = pop;
my ($in, $out, $templ, $max_load_length, $queue_size) = @_;
$queue_size ||= $QUEUE_SIZE;
# data is: 0:reader, 1:out, 2:queue_size, 3:queue, 4:cb, 5:out_watcher
my $data = [ undef , $out , $queue_size , [] , $cb , undef ];
$data->[0] = packet_reader $in, $templ, $max_load_length, sub { _packet($_[0], $data) };
my $obj = \$data;
bless $obj;
}
sub _push {
my $data = $_[1];
# use Data::Dumper;
# print STDERR Data::Dumper->Dump([\@_, $data], [qw(@_ $data)]);
if (length $_[0]) {
my $queue = $data->[3];
push @$queue, $_[0];
$data->[0]->pause if @$queue == $data->[2];
$data->[5] ||= AE::io $data->[1], 1, sub { _write($data) };
}
}
sub _packet {
my $data = $_[1];
if (defined $_[0]) {
# use Data::Dumper;
# print STDERR Data::Dumper->Dump([$data], [qw($data)]);
$data->[4]->($_[0]);
_push(@_);
return;
}
$data->[4]->();
_fatal_write($data, ENODATA) unless defined $data->[5];
undef $data->[0];
}
sub _write {
my $data = shift;
my $queue = $data->[3];
while (@$queue) {
unless (length $queue->[0]) {
$data->[0]->resume if @$queue == $data->[2];
shift @$queue;
next;
}
my $bytes = syswrite($data->[1], $queue->[0]);
if ($bytes) {
substr($queue->[0], 0, $bytes, '');
}
elsif (defined $bytes) {
_fatal_write($data, EPIPE);
}
else {
$! == $_ and return for (EINTR, EAGAIN, EWOULDBLOCK);
_fatal_write($data);
}
return;
}
unless (defined $data->[0]) {
return _fatal_write($data, ENODATA);
}
undef $data->[5];
}
sub _fatal_write {
my $data = shift;
local $! = shift if @_;
$data->[4]->(undef, 1);
}
sub push {
my $data = ${shift()};
_push($_[0], $data);
}
1;
__END__
=head1 NAME
AnyEvent::PacketForwarder - Forward packets between two sockets
=head1 SYNOPSIS
use AnyEvent::PacketForwarder;
( run in 0.691 second using v1.01-cache-2.11-cpan-39bf76dae61 )