App-HTTP_Proxy_IMP
view release on metacpan or search on metacpan
lib/App/HTTP_Proxy_IMP/Relay.pm view on Meta::CPAN
use strict;
use warnings;
package App::HTTP_Proxy_IMP::Relay;
use fields (
'fds', # file descriptors
'conn', # App::HTTP_Proxy_IMP::HTTPConn object
'acct', # collect accounting
);
use App::HTTP_Proxy_IMP::Debug;
use Scalar::Util 'weaken';
use IO::Socket::SSL;
use AnyEvent;
use POSIX '_exit';
# set if the child should destroy itself after last connection closed
my $exit_if_no_relays;
sub exit_if_no_relays { $exit_if_no_relays = pop; }
# active relay, inserted in new, removed in $idlet timer
my @relays;
sub relays { return grep { $_ } @relays }
# creates new relay and puts it into @relays as weak reference
sub new {
my ($class,$cfd,$upstream,$conn) = @_;
my $self = fields::new($class);
debug("create relay $self");
if ( $upstream && ! ref($upstream)) {
$upstream =~m{\A(?:\[([a-f\d:.]+)\]|([\da-z_\-.]+)):(\d+)\Z} or
die "invalid upstream specification: $upstream";
$upstream = [ $1||$2, $3 ];
}
my $cobj = $conn->new_connection({
daddr => $cfd->sockhost,
dport => $cfd->sockport,
saddr => $cfd->peerhost,
sport => $cfd->peerport,
upstream => $upstream,
},$self);
#debug("create connection $cobj");
$self->{conn} = $cobj;
my $cfo = $self->{fds}[0] = App::HTTP_Proxy_IMP::Relay::FD->new(0,$cfd,$self,1);
$cfo->mask( r => 1 ); # enable read
push @relays, $self;
weaken($relays[-1]);
return $self;
}
sub DESTROY {
my $self = shift;
$self->account('destroy');
$self->xdebug("destroy relay $self");
if ( $exit_if_no_relays && ! $self->relays ) {
# der letzte macht das Licht aus
debug("exit child $$ after last connection");
_exit(0)
}
}
sub acctinfo {
my ($self,$acct) = @_;
$self->{acct} = $acct;
}
sub account {
my ($self,$what,%args) = @_;
my $acct = $self->{acct};
lib/App/HTTP_Proxy_IMP/Relay.pm view on Meta::CPAN
$DEBUG && $self->xdebug("drain=$drain sink=$sink rq=".$self->{conn}->open_requests." - keeping open");
return;
}
# dump state to debug
sub dump_state {
my $self = shift;
my $conn = $self->{conn};
my $msg = '';
if ( my $fds = $self->{fds} ) {
my @st;
for( my $i=0;$i<@$fds;$i++) {
push @st, sprintf("%d=%03b",$i,$fds->[$i]{status} || 0);
}
$msg .= " fd:".join(',',@st);
}
$msg = $conn->dump_state().$msg;
return $msg if defined wantarray;
debug($msg);
}
my $idlet = AnyEvent->timer(
after => 5,
interval => 5, cb => sub {
@relays = grep { $_ } @relays or return;
#debug("check timeouts for %d conn",+@relays);
my $now = AnyEvent->now;
RELAY: for my $r (@relays) {
# timeout depends on the state of the relay and child
# if there are active requests set it to 60, if not (e.g.
# idle keep-alive connections) to 30. If this is a forked
# child with no listener which should close after all
# requests are done close idle keep-alive connections faster,
# e.g. set timeout to 1
my $idle = ! $r->{conn}->open_requests;
my $timeout =
! $idle ? 60 :
$exit_if_no_relays ? 1 :
30;
for my $fo (@{$r->{fds}}) {
next RELAY if $_->{didit} + $timeout > $now;
}
$r->xdebug("close because of timeout");
$r->close
}
}
);
############################################################################
# Filehandle
############################################################################
package App::HTTP_Proxy_IMP::Relay::FD;
use Carp 'croak';
use Scalar::Util 'weaken';
use App::HTTP_Proxy_IMP::Debug;
use AnyEvent::Socket qw(tcp_connect format_address);
use IO::Socket::SSL;
use fields (
'dir', # direction 0,1
'fd', # file descriptor
'host', # destination hostname
'status', # bitmap of read_shutdown|write_shutdown|connected
'relay', # weak link to relay
'didit', # time of last activity (read/write)
'rbuf', # read buffer (read but not processed)
'rsub', # read handler
'rwatch', # AnyEvent watcher - undef if read is disabled
'wbuf', # write buffer (not yet written to socket)
'wsub', # write handler
'wwatch', # AnyEvent watcher - undef if write is disabled
'wsrc', # source of writes for stalled handling
);
sub new {
my ($class,$dir,$fd,$relay,$connected) = @_;
my $self = fields::new($class);
$self->{dir} = $dir;
$self->{fd} = $fd;
$self->{status} = $connected ? 0b001 : 0;
#weaken( $self->{relay} = $relay );
$self->{relay} = $relay;
$self->{rbuf} = $self->{wbuf} = '';
return $self;
}
sub xdebug {
my $self = shift;
my $conn = $self->{relay}{conn};
if ( my $xdebug = UNIVERSAL::can($conn,'xdebug') ) {
my $msg = "[$self->{dir}] ".shift(@_);
unshift @_,$conn,$msg;
goto &$xdebug;
} else {
goto &debug;
}
}
sub close:method {
my $self = shift;
$self->xdebug("close");
if ( $self->{fd} ) {
$self->{fd} = undef;
delete $self->{relay}{fds}[$self->{dir}];
$self->{relay}->closeIfDone;
}
%$self = ();
}
sub reset {
my $self = shift;
$self->xdebug("reset");
close($self->{fd}) if $self->{fd};
$self->{fd} =
$self->{rwatch} = $self->{rsub} =
$self->{wwatch} = $self->{wsub} =
$self->{host} =
lib/App/HTTP_Proxy_IMP/Relay.pm view on Meta::CPAN
}
return $self->{relay}->fatal(
"connection should have taken all remaining bytes on eof")
if !$n && $self->{rbuf} ne '';
$self->shutdown('r') if ! $n;
}
sub connect:method {
my ($self,$host,$port,$callback,$reconnect) = @_;
# down existing connection if we should connect to another host
$self->reset if $self->{fd} and
( $reconnect or $self->{host}||'' ne "$host.$port" );
# if we have a connection already, keep it
if ( $self->{status} & 0b001 ) { # already connected
$callback->();
return 1;
}
# (re)connect
$self->xdebug("connecting to $host.$port");
# async dns lookup + connect
App::HTTP_Proxy_IMP::Relay::DNS::lookup($host, sub {
$self->{relay} or return; # relay already closed
if ( my $addr = shift ) {
tcp_connect($addr,$port, sub {
if ( my $fd = shift ) {
$self->{relay} or return; # relay already closed
$self->{fd} = $fd;
$self->{status} = 0b001;
$self->{host} = "$host.$port";
$self->xdebug("connect done");
$self->mask( r => 1 );
$callback->();
} else {
App::HTTP_Proxy_IMP::Relay::DNS::uncache($host,$addr);
$self->{relay} or return; # relay already closed
$self->{relay}->fatal("connect to $host.$port failed: $!");
}
});
} else {
$self->{relay}->fatal(
"connect to $host.$port failed: no such host (DNS)");
}
});
return -1;
}
sub startssl {
my $self = shift;
$self->{rbuf} eq '' or return
$self->{relay}->fatal("read buf $self->{dir} not empty before starting SSL: '$self->{rbuf}'");
$self->{wbuf} eq '' or return
$self->{relay}->fatal("write buf $self->{dir} not empty before starting SSL: '$self->{wbuf}'");
my $callback = @_%2 ? pop(@_):undef;
my %sslargs = @_;
IO::Socket::SSL->start_SSL( $self->{fd},
%sslargs,
SSL_startHandshake => 0,
) or die "failed to upgrade socket to SSL";
my $sub = $sslargs{SSL_server}
? \&IO::Socket::SSL::accept_SSL
: \&IO::Socket::SSL::connect_SSL;
_ssl($self,$sub,$callback,\%sslargs);
}
sub _ssl {
my ($self,$sub,$cb,$sslargs) = @_;
if ( $sub->($self->{fd}) ) {
$self->xdebug("ssl handshake success");
$cb->($self) if $cb;
} elsif ( $!{EAGAIN} ) {
# retry
my $dir =
$SSL_ERROR == SSL_WANT_READ ? 'r' :
$SSL_ERROR == SSL_WANT_WRITE ? 'w' :
return $self->{relay}->fatal( "unhandled $SSL_ERROR on EAGAIN" );
$self->mask( $dir => sub { _ssl($self,$sub,$cb,$sslargs) });
} elsif ( $sslargs->{SSL_server} ) {
return $self->{relay}->fatal( "error on accept_SSL: $SSL_ERROR|$!" );
} else {
return $self->{relay}->fatal(
"error on connect_SSL to $sslargs->{SSL_verifycn_name}: $SSL_ERROR|$!" );
}
}
############################################################################
# DNS cache
############################################################################
package App::HTTP_Proxy_IMP::Relay::DNS;
use AnyEvent::DNS;
use Socket qw(AF_INET AF_INET6 inet_pton);
my %cache;
sub uncache {
my ($host,$addr) = @_;
my $e = $cache{lc($host)} or return;
@$e = grep { $_ ne $addr } @$e;
delete $cache{lc($host)} if !@$e;
}
sub lookup {
my ($host,$cb) = @_;
$host = lc($host);
if ( my $e = $cache{$host} ) {
return $cb->(@$e);
} elsif ( inet_pton(AF_INET,$host) || inet_pton(AF_INET6,$host) ) {
return $cb->($host);
}
AnyEvent::DNS::a($host,sub {
if ( @_ ) {
$cache{$host} = [ @_ ];
return $cb->(@_);
}
# try AAAA
AnyEvent::DNS::aaaa($host,sub {
$cache{$host} = [ @_ ] if @_;
return $cb->(@_);
( run in 0.502 second using v1.01-cache-2.11-cpan-39bf76dae61 )