App-HTTP_Proxy_IMP

 view release on metacpan or  search on metacpan

lib/App/HTTP_Proxy_IMP/Relay.pm  view on Meta::CPAN

use strict;
use warnings;

package App::HTTP_Proxy_IMP::Relay;
use fields (
    'fds',      # file descriptors
    'conn',     # App::HTTP_Proxy_IMP::HTTPConn object
    'acct',     # collect accounting
);

use App::HTTP_Proxy_IMP::Debug;
use Scalar::Util 'weaken';
use IO::Socket::SSL;
use AnyEvent;
use POSIX '_exit';

# set if the child should destroy itself after last connection closed
my $exit_if_no_relays;
sub exit_if_no_relays { $exit_if_no_relays = pop; }

# active relay, inserted in new, removed in $idlet timer
my @relays;
sub relays { return grep { $_ } @relays }

# creates new relay and puts it into @relays as weak reference
sub new {
    my ($class,$cfd,$upstream,$conn) = @_;
    my $self = fields::new($class);
    debug("create relay $self");

    if ( $upstream && ! ref($upstream)) {
	$upstream =~m{\A(?:\[([a-f\d:.]+)\]|([\da-z_\-.]+)):(\d+)\Z} or
	    die "invalid upstream specification: $upstream";
	$upstream = [ $1||$2, $3 ];
    }

    my $cobj = $conn->new_connection({
	daddr => $cfd->sockhost,
	dport => $cfd->sockport,
	saddr => $cfd->peerhost,
	sport => $cfd->peerport,
	upstream => $upstream,
    },$self);

    #debug("create connection $cobj");
    $self->{conn} = $cobj;
    my $cfo = $self->{fds}[0] = App::HTTP_Proxy_IMP::Relay::FD->new(0,$cfd,$self,1);
    $cfo->mask( r => 1 ); # enable read 

    push @relays, $self;
    weaken($relays[-1]);

    return $self;
}

sub DESTROY {
    my $self = shift;
    $self->account('destroy');
    $self->xdebug("destroy relay $self");
    if ( $exit_if_no_relays && ! $self->relays ) {
	# der letzte macht das Licht aus
	debug("exit child $$ after last connection");
	_exit(0)
    }
}

sub acctinfo {
    my ($self,$acct) = @_;
    $self->{acct} = $acct;
}
sub account {
    my ($self,$what,%args) = @_;
    my $acct = $self->{acct};

lib/App/HTTP_Proxy_IMP/Relay.pm  view on Meta::CPAN


    $DEBUG && $self->xdebug("drain=$drain sink=$sink rq=".$self->{conn}->open_requests." - keeping open");
    return;
}


# dump state to debug
sub dump_state {
    my $self = shift;
    my $conn = $self->{conn};
    my $msg = '';
    if ( my $fds = $self->{fds} ) {
        my @st;
        for( my $i=0;$i<@$fds;$i++) {
            push @st, sprintf("%d=%03b",$i,$fds->[$i]{status} || 0);
        }
	$msg .= " fd:".join(',',@st);
    }
    $msg = $conn->dump_state().$msg;
    return $msg if defined wantarray;
    debug($msg);
}


my $idlet = AnyEvent->timer( 
    after => 5, 
    interval => 5, cb => sub {
        @relays = grep { $_ } @relays or return;
        #debug("check timeouts for %d conn",+@relays);
        my $now = AnyEvent->now;
	RELAY: for my $r (@relays) {
	    # timeout depends on the state of the relay and child
	    # if there are active requests set it to 60, if not (e.g.
	    # idle keep-alive connections) to 30. If this is a forked
	    # child with no listener which should close after all
	    # requests are done close idle keep-alive connections faster,
	    # e.g. set timeout to 1
	    my $idle = ! $r->{conn}->open_requests;
	    my $timeout = 
		! $idle ? 60 :
		$exit_if_no_relays ? 1 :
		30;
	    for my $fo (@{$r->{fds}}) {
		next RELAY if $_->{didit} + $timeout > $now;
	    }
	    $r->xdebug("close because of timeout");
            $r->close
        }
    }
);

############################################################################
# Filehandle
############################################################################

package App::HTTP_Proxy_IMP::Relay::FD;
use Carp 'croak';
use Scalar::Util 'weaken';
use App::HTTP_Proxy_IMP::Debug;
use AnyEvent::Socket qw(tcp_connect format_address);
use IO::Socket::SSL;

use fields (
    'dir',        # direction 0,1
    'fd',         # file descriptor
    'host',       # destination hostname
    'status',     # bitmap of read_shutdown|write_shutdown|connected
    'relay',      # weak link to relay
    'didit',      # time of last activity (read/write)
    'rbuf',       # read buffer (read but not processed)
    'rsub',       # read handler
    'rwatch',     # AnyEvent watcher - undef if read is disabled
    'wbuf',       # write buffer (not yet written to socket)
    'wsub',       # write handler
    'wwatch',     # AnyEvent watcher - undef if write is disabled
    'wsrc',       # source of writes for stalled handling
);

sub new {
    my ($class,$dir,$fd,$relay,$connected) = @_;
    my $self = fields::new($class);
    $self->{dir} = $dir;
    $self->{fd} = $fd;
    $self->{status} = $connected ? 0b001 : 0;
    #weaken( $self->{relay} = $relay );
    $self->{relay} = $relay;
    $self->{rbuf} = $self->{wbuf} = '';
    return $self;
}

sub xdebug {
    my $self = shift;
    my $conn = $self->{relay}{conn};
    if ( my $xdebug = UNIVERSAL::can($conn,'xdebug') ) {
	my $msg = "[$self->{dir}] ".shift(@_);
	unshift @_,$conn,$msg;
	goto &$xdebug;
    } else {
	goto &debug;
    }
}

sub close:method { 
    my $self = shift;
    $self->xdebug("close");
    if ( $self->{fd} ) {
	$self->{fd} = undef;
	delete $self->{relay}{fds}[$self->{dir}];
	$self->{relay}->closeIfDone;
    }
    %$self = ();
}

sub reset {
    my $self = shift;
    $self->xdebug("reset");
    close($self->{fd}) if $self->{fd};
    $self->{fd} = 
	$self->{rwatch} = $self->{rsub} = 
	$self->{wwatch} = $self->{wsub} = 
	$self->{host} =

lib/App/HTTP_Proxy_IMP/Relay.pm  view on Meta::CPAN

    }

    return $self->{relay}->fatal(
	"connection should have taken all remaining bytes on eof")
	if !$n && $self->{rbuf} ne '';

    $self->shutdown('r') if ! $n;
}

sub connect:method {
    my ($self,$host,$port,$callback,$reconnect) = @_;

    # down existing connection if we should connect to another host
    $self->reset if $self->{fd} and 
	( $reconnect or $self->{host}||'' ne "$host.$port" );

    # if we have a connection already, keep it
    if ( $self->{status} & 0b001 ) { # already connected 
	$callback->();
	return 1;
    }

    # (re)connect
    $self->xdebug("connecting to $host.$port");
    # async dns lookup + connect
    App::HTTP_Proxy_IMP::Relay::DNS::lookup($host, sub {
	$self->{relay} or return; # relay already closed
	if ( my $addr = shift ) {
	    tcp_connect($addr,$port, sub {
		if ( my $fd = shift ) {
		    $self->{relay} or return; # relay already closed
		    $self->{fd} = $fd;
		    $self->{status} = 0b001;
		    $self->{host} = "$host.$port";
		    $self->xdebug("connect done");
		    $self->mask( r => 1 );
		    $callback->();
		} else {
		    App::HTTP_Proxy_IMP::Relay::DNS::uncache($host,$addr);
		    $self->{relay} or return; # relay already closed
		    $self->{relay}->fatal("connect to $host.$port failed: $!");
		}
	    });
	} else {
	    $self->{relay}->fatal(
		"connect to $host.$port failed: no such host (DNS)");
	}
    });
    return -1;
}

sub startssl {
    my $self = shift;
    $self->{rbuf} eq '' or return 
	$self->{relay}->fatal("read buf $self->{dir} not empty before starting SSL: '$self->{rbuf}'");
    $self->{wbuf} eq '' or return 
	$self->{relay}->fatal("write buf $self->{dir} not empty before starting SSL: '$self->{wbuf}'");

    my $callback = @_%2 ? pop(@_):undef;
    my %sslargs = @_;
    IO::Socket::SSL->start_SSL( $self->{fd},
	%sslargs,
	SSL_startHandshake => 0,
    ) or die "failed to upgrade socket to SSL";
    my $sub = $sslargs{SSL_server} 
	? \&IO::Socket::SSL::accept_SSL
	: \&IO::Socket::SSL::connect_SSL;
    _ssl($self,$sub,$callback,\%sslargs);
}

sub _ssl {
    my ($self,$sub,$cb,$sslargs) = @_;
    if ( $sub->($self->{fd}) ) {
	$self->xdebug("ssl handshake success");
	$cb->($self) if $cb;
    } elsif ( $!{EAGAIN} ) {
	# retry
	my $dir = 
	    $SSL_ERROR == SSL_WANT_READ ? 'r' :
	    $SSL_ERROR == SSL_WANT_WRITE ? 'w' :
	    return $self->{relay}->fatal( "unhandled $SSL_ERROR on EAGAIN" );
	$self->mask( $dir => sub { _ssl($self,$sub,$cb,$sslargs) });
    } elsif ( $sslargs->{SSL_server} ) {
	return $self->{relay}->fatal( "error on accept_SSL: $SSL_ERROR|$!" );
    } else {
	return $self->{relay}->fatal( 
	    "error on connect_SSL to $sslargs->{SSL_verifycn_name}: $SSL_ERROR|$!" );
    }
}


############################################################################
# DNS cache
############################################################################

package App::HTTP_Proxy_IMP::Relay::DNS;
use AnyEvent::DNS;
use Socket qw(AF_INET AF_INET6 inet_pton);

my %cache;
sub uncache {
    my ($host,$addr) = @_;
    my $e = $cache{lc($host)} or return;
    @$e = grep { $_ ne $addr } @$e;
    delete $cache{lc($host)} if !@$e;
}

sub lookup {
    my ($host,$cb) = @_;
    $host = lc($host);

    if ( my $e = $cache{$host} ) {
	return $cb->(@$e);
    } elsif ( inet_pton(AF_INET,$host) || inet_pton(AF_INET6,$host) ) {
	return $cb->($host);
    }

    AnyEvent::DNS::a($host,sub {
	if ( @_ ) {
	    $cache{$host} = [ @_ ];
	    return $cb->(@_);
	}

	# try AAAA
	AnyEvent::DNS::aaaa($host,sub {
	    $cache{$host} = [ @_ ] if @_;
	    return $cb->(@_);



( run in 0.502 second using v1.01-cache-2.11-cpan-39bf76dae61 )