App-MHFS
view release on metacpan or search on metacpan
lib/MHFS/HTTP/Server/Client.pm view on Meta::CPAN
package MHFS::HTTP::Server::Client v0.7.0;
use 5.014;
use strict; use warnings;
use feature 'say';
use Time::HiRes qw( usleep clock_gettime CLOCK_REALTIME CLOCK_MONOTONIC);
use IO::Socket::INET;
use Errno qw(EINTR EIO :POSIX);
use Fcntl qw(:seek :mode);
use File::stat;
use IO::Poll qw(POLLIN POLLOUT POLLHUP);
use Scalar::Util qw(looks_like_number weaken);
use Data::Dumper;
use Carp;
use MHFS::HTTP::Server::Client::Request;
sub new {
my ($class, $sock, $server, $serverhostinfo, $ip) = @_;
$sock->blocking(0);
my %self = ('sock' => $sock, 'server' => $server, 'time' => clock_gettime(CLOCK_MONOTONIC), 'inbuf' => '', 'serverhostname' => $serverhostinfo->{'hostname'}, 'absurl' => $serverhostinfo->{'absurl'}, 'ip' => $ip, 'X-MHFS-PROXY-KEY' => $serverhosti...
$self{'CONN-ID'} = int($self{'time'} * rand()); # insecure uid
$self{'outheaders'}{'X-MHFS-CONN-ID'} = sprintf("%X", $self{'CONN-ID'});
bless \%self, $class;
$self{'request'} = MHFS::HTTP::Server::Client::Request->new(\%self);
return \%self;
}
# add a connection timeout timer
sub AddClientCloseTimer {
my ($self, $timelength, $id, $is_requesttimeout) = @_;
weaken($self); #don't allow this timer to keep the client object alive
my $server = $self->{'server'};
say "CCT | add timer: $id";
$server->{'evp'}->add_timer($timelength, 0, sub {
if(! defined $self) {
say "CCT | $id self undef";
return undef;
}
# Commented out as with connection reuse on, Apache 2.4.10 seems sometimes
# pass 408 on to the next client.
#if($is_requesttimeout) {
# say "CCT | \$timelength ($timelength) exceeded, sending 408";
# $self->{request}->Send408;
# CT_WRITE($self);
#}
say "CCT | \$timelength ($timelength) exceeded, closing CONN $id";
say "-------------------------------------------------";
$server->{'evp'}->remove($self->{'sock'});
say "poll has " . scalar ( $server->{'evp'}{'poll'}->handles) . " handles";
return undef;
}, $id);
return $id;
}
sub KillClientCloseTimer {
my ($self, $id) = @_;
my $server = $self->{'server'};
say "CCT | removing timer: $id";
$server->{'evp'}->remove_timer_by_id($id);
}
sub SetEvents {
my ($self, $events) = @_;
$self->{'server'}{'evp'}->set($self->{'sock'}, $self, $events);
}
use constant {
RECV_SIZE => 65536,
CT_YIELD => 1,
CT_DONE => undef,
#CT_READ => 1,
#CT_PROCESS = 2,
#CT_WRITE => 3
};
# The "client_thread" consists of 5 states, CT_READ, CT_PROCESS, CT_WRITE, CT_YIELD, and CT_DONE
# CT_READ reads input data from the socket
## on data read transitions to CT_PROCESS
## on error transitions to CT_DONE
## otherwise CT_YIELD
# CT_PROCESS processes the input data
## on processing done, switches to CT_WRITE or CT_READ to read more data to process
## on error transitions to CT_DONE
## otherwise CT_YIELD
# CT_WRITE outputs data to the socket
## on all data written transitions to CT_PROCESS unless Connection: close is set.
## on error transitions to CT_DONE
## otherwise CT_YIELD
# CT_YIELD just returns control to the poll loop to wait for IO or allow another client thread to run
# CT_DONE also returns control to the poll loop, it is called on error or when the client connection should be closed or is closed
sub CT_READ {
my ($self) = @_;
my $tempdata;
if(!defined($self->{'sock'}->recv($tempdata, RECV_SIZE))) {
if(! ($!{EAGAIN} || $!{EWOULDBLOCK})) {
print ("CT_READ RECV errno: $!\n");
return CT_DONE;
}
say "CT_YIELD: $!";
return CT_YIELD;
}
if(length($tempdata) == 0) {
say 'Server::Client read 0 bytes, client read closed';
return CT_DONE;
}
$self->{'inbuf'} .= $tempdata;
goto &CT_PROCESS;
}
sub CT_PROCESS {
my ($self) = @_;
$self->{'request'} //= MHFS::HTTP::Server::Client::Request->new($self);
if(!defined($self->{'request'}{'on_read_ready'})) {
die("went into CT_PROCESS in bad state");
return CT_YIELD;
}
my $res = $self->{'request'}{'on_read_ready'}->($self->{'request'});
if(!$res) {
return $res;
}
if(defined $self->{'request'}{'response'}) {
goto &CT_WRITE;
}
elsif(defined $self->{'request'}{'on_read_ready'}) {
goto &CT_READ;
}
return $res;
}
sub CT_WRITE {
my ($self) = @_;
if(!defined $self->{'request'}{'response'}) {
die("went into CT_WRITE in bad state");
return CT_YIELD;
}
# TODO only TrySendResponse if there is data in buf or to be read
my $tsrRet = $self->TrySendResponse;
if(!defined($tsrRet)) {
say "-------------------------------------------------";
return CT_DONE;
}
elsif($tsrRet ne '') {
if($self->{'request'}{'outheaders'}{'Connection'} && ($self->{'request'}{'outheaders'}{'Connection'} eq 'close')) {
say "Connection close header set closing conn";
say "-------------------------------------------------";
return CT_DONE;
}
$self->{'request'} = undef;
goto &CT_PROCESS;
}
return CT_YIELD;
}
sub do_on_data {
my ($self) = @_;
my $res = $self->{'request'}{'on_read_ready'}->($self->{'request'});
if($res) {
if(defined $self->{'request'}{'response'}) {
#say "do_on_data: goto onWriteReady";
goto &onWriteReady;
#return onWriteReady($self);
}
#else {
elsif(defined $self->{'request'}{'on_read_ready'}) {
#say "do_on_data: goto onReadReady inbuf " . length($self->{'inbuf'});
goto &onReadReady;
#return onReadReady($self);
}
else {
say "do_on_data: response and on_read_ready not defined, response by timer or poll?";
}
}
return $res;
}
sub onReadReady {
goto &CT_READ;
my ($self) = @_;
my $tempdata;
if(defined($self->{'sock'}->recv($tempdata, RECV_SIZE))) {
if(length($tempdata) == 0) {
say 'Server::Client read 0 bytes, client read closed';
return undef;
}
$self->{'inbuf'} .= $tempdata;
goto &do_on_data;
}
if(! $!{EAGAIN}) {
print ("MHFS::HTTP::Server::Client onReadReady RECV errno: $!\n");
return undef;
}
return '';
}
sub onWriteReady {
goto &CT_WRITE;
my ($client) = @_;
# send the response
if(defined $client->{'request'}{'response'}) {
# TODO only TrySendResponse if there is data in buf or to be read
my $tsrRet = $client->TrySendResponse;
if(!defined($tsrRet)) {
say "-------------------------------------------------";
return undef;
}
elsif($tsrRet ne '') {
if($client->{'request'}{'outheaders'}{'Connection'} && ($client->{'request'}{'outheaders'}{'Connection'} eq 'close')) {
say "Connection close header set closing conn";
say "-------------------------------------------------";
return undef;
}
$client->{'request'} = MHFS::HTTP::Server::Client::Request->new($client);
# handle possible existing read data
goto &do_on_data;
}
}
else {
say "response not defined, probably set later by a timer or poll";
}
return 1;
}
sub _TSRReturnPrint {
my ($sentthiscall) = @_;
if($sentthiscall > 0) {
say "wrote $sentthiscall bytes";
}
}
sub TrySendResponse {
my ($client) = @_;
my $csock = $client->{'sock'};
my $dataitem = $client->{'request'}{'response'};
defined($dataitem->{'buf'}) or die("dataitem must always have a buf");
my $sentthiscall = 0;
do {
# Try to send the buf if set
if(length($dataitem->{'buf'})) {
my $sret = TrySendItem($csock, \$dataitem->{'buf'});
# critical conn error
if(! defined($sret)) {
_TSRReturnPrint($sentthiscall);
return undef;
}
if($sret) {
$sentthiscall += $sret;
# if we sent data, kill the send timer
if(defined $client->{'sendresponsetimerid'}) {
$client->KillClientCloseTimer($client->{'sendresponsetimerid'});
$client->{'sendresponsetimerid'} = undef;
}
}
# not all data sent, add timer
if(length($dataitem->{'buf'}) > 0) {
$client->{'sendresponsetimerid'} //= $client->AddClientCloseTimer($client->{'server'}{'settings'}{'sendresponsetimeout'}, $client->{'CONN-ID'});
_TSRReturnPrint($sentthiscall);
return '';
}
#we sent the full buf
}
# read more data
my $newdata;
if(defined $dataitem->{'fh'}) {
my $FH = $dataitem->{'fh'};
my $req_length = $dataitem->{'get_current_length'}->();
my $filepos = $dataitem->{'fh_pos'};
# TODO, remove this assert
if($filepos != tell($FH)) {
die('tell mismatch');
}
if($req_length && ($filepos >= $req_length)) {
if($filepos > $req_length) {
say "Reading too much tell: $filepos req_length: $req_length";
}
say "file read done";
close($FH);
}
else {
my $readamt = 24000;
if($req_length) {
my $tmpsend = $req_length - $filepos;
$readamt = $tmpsend if($tmpsend < $readamt);
}
# this is blocking, it shouldn't block for long but it could if it's a pipe especially
my $bytesRead = read($FH, $newdata, $readamt);
if(! defined($bytesRead)) {
$newdata = undef;
say "READ ERROR: $!";
}
elsif($bytesRead == 0) {
# read EOF, better remove the error
if(! $req_length) {
say '$req_length not set and read 0 bytes, treating as EOF';
$newdata = undef;
}
else {
say 'FH EOF ' .$filepos;
seek($FH, 0, 1);
_TSRReturnPrint($sentthiscall);
return '';
}
}
else {
$dataitem->{'fh_pos'} += $bytesRead;
}
}
}
elsif(defined $dataitem->{'cb'}) {
$newdata = $dataitem->{'cb'}->($dataitem);
}
my $encode_chunked = $dataitem->{'is_chunked'};
# if we got to here and there's no data, fetching newdata is done
( run in 1.006 second using v1.01-cache-2.11-cpan-02777c243ea )