App-HTTP_Proxy_IMP
view release on metacpan or search on metacpan
lib/App/HTTP_Proxy_IMP/Relay.pm view on Meta::CPAN
# - nowhere to read and no open requests
# - nowhere to write too
sub closeIfDone {
my $self = shift;
my $sink = my $drain = '';
for my $fo (@{$self->{fds}}) {
$fo && $fo->{fd} or next;
return if $fo->{rbuf} ne ''; # has unprocessed data
return if $fo->{wbuf} ne ''; # has unwritten data
$drain .= $fo->{dir} if not $fo->{status} & 0b100; # not read-closed
$sink .= $fo->{dir} if not $fo->{status} & 0b010; # not write-closed
}
if ( $sink eq '' ) { # nowhere to write
$DEBUG && $self->xdebug( "close relay because all fd done sink='$sink' ");
# close relay
return $self->close;
}
if ( $drain ne '01' ) { # no reading from both sides
my $conn = $self->{conn};
if ( ! $conn or ! $conn->open_requests ) {
# close relay
$DEBUG && $self->xdebug( "close relay because nothing to read and all done");
return $self->close;
}
}
$DEBUG && $self->xdebug("drain=$drain sink=$sink rq=".$self->{conn}->open_requests." - keeping open");
return;
}
# dump state to debug
sub dump_state {
my $self = shift;
my $conn = $self->{conn};
my $msg = '';
if ( my $fds = $self->{fds} ) {
my @st;
for( my $i=0;$i<@$fds;$i++) {
push @st, sprintf("%d=%03b",$i,$fds->[$i]{status} || 0);
}
$msg .= " fd:".join(',',@st);
}
$msg = $conn->dump_state().$msg;
return $msg if defined wantarray;
debug($msg);
}
my $idlet = AnyEvent->timer(
after => 5,
interval => 5, cb => sub {
@relays = grep { $_ } @relays or return;
#debug("check timeouts for %d conn",+@relays);
my $now = AnyEvent->now;
RELAY: for my $r (@relays) {
# timeout depends on the state of the relay and child
# if there are active requests set it to 60, if not (e.g.
# idle keep-alive connections) to 30. If this is a forked
# child with no listener which should close after all
# requests are done close idle keep-alive connections faster,
# e.g. set timeout to 1
my $idle = ! $r->{conn}->open_requests;
my $timeout =
! $idle ? 60 :
$exit_if_no_relays ? 1 :
30;
for my $fo (@{$r->{fds}}) {
next RELAY if $_->{didit} + $timeout > $now;
}
$r->xdebug("close because of timeout");
$r->close
}
}
);
############################################################################
# Filehandle
############################################################################
package App::HTTP_Proxy_IMP::Relay::FD;
use Carp 'croak';
use Scalar::Util 'weaken';
use App::HTTP_Proxy_IMP::Debug;
use AnyEvent::Socket qw(tcp_connect format_address);
use IO::Socket::SSL;
use fields (
'dir', # direction 0,1
'fd', # file descriptor
'host', # destination hostname
'status', # bitmap of read_shutdown|write_shutdown|connected
'relay', # weak link to relay
'didit', # time of last activity (read/write)
'rbuf', # read buffer (read but not processed)
'rsub', # read handler
'rwatch', # AnyEvent watcher - undef if read is disabled
'wbuf', # write buffer (not yet written to socket)
'wsub', # write handler
'wwatch', # AnyEvent watcher - undef if write is disabled
'wsrc', # source of writes for stalled handling
);
sub new {
my ($class,$dir,$fd,$relay,$connected) = @_;
my $self = fields::new($class);
$self->{dir} = $dir;
$self->{fd} = $fd;
$self->{status} = $connected ? 0b001 : 0;
#weaken( $self->{relay} = $relay );
$self->{relay} = $relay;
$self->{rbuf} = $self->{wbuf} = '';
return $self;
}
sub xdebug {
my $self = shift;
my $conn = $self->{relay}{conn};
if ( my $xdebug = UNIVERSAL::can($conn,'xdebug') ) {
my $msg = "[$self->{dir}] ".shift(@_);
unshift @_,$conn,$msg;
( run in 1.792 second using v1.01-cache-2.11-cpan-df04353d9ac )