IOMux-HTTP
view release on metacpan or search on metacpan
lib/IOMux/HTTP.pm view on Meta::CPAN
, HTTP_1_0 => 'HTTP/1.0'
, HTTP_1_1 => 'HTTP/1.1'
};
# oops, dirty hack
sub HTTP::Message::id() { shift->{IMH_id} }
my $conn_id = 'C0000000';
sub init($)
{ my ($self, $args) = @_;
$args->{name} ||= ++$conn_id;
$self->SUPER::init($args);
$self->{IMH_headers} = $args->{add_headers} || [];
$self->{IMH_requests} = [];
$self->{IMH_starttime} = time;
$self->{IMH_msgcount} = 0; # something unique for logging
$self;
}
sub startTime() {shift->{IMH_starttime}}
sub mux_input($)
{ my ($self, $refdata) = @_;
while($$refdata) # possibly more than one message in one TCP package
{
# Read header
my $msg = $self->{IMH_incoming};
unless($msg)
{ if($self->{IMH_no_more})
{ # ignore input for closing, connection can still be writing
$$refdata = '';
return;
}
$$refdata =~ s/^\s+//s; # strip leading blanks, sloppy remote
$$refdata =~ s/(.*?)\r?\n\r?\n//s
or return; # not whole header yet, wait for more
$msg = $self->{IMH_incoming} = $self->headerArrived($1);
#trace "new header ".$msg->uri if $msg->isa('HTTP::Request');
my $msgid = sprintf 'in-%s-%02d'
, $self->name, $self->{IMH_msgcount}++;
$msg->id($msgid);
}
my $headers = $msg->headers;
my $proto = $msg->protocol;
$msg->protocol($proto = HTTP_0_9)
unless $proto;
$self->{IMH_no_more}++
if $msg->protocol lt HTTP_1_1
|| lc($headers->header('Connection') || '') ne 'keep-alive';
$self->{IMH_take_all}++
if $proto lt HTTP_1_0;
return # simply wait until EOF
if $self->{IMH_take_all};
# Read body
my $result = $self->bodyComponentArrived($msg, $refdata)
or return; # message not ready yet
my $resp = $result->isa('HTTP::Response') ? $result : undef;
$self->shutdown(0)
if $self->{IMH_no_more};
delete $self->{IMH_incoming};
$self->messageArrived($msg, $resp);
}
}
sub mux_outputbuffer_empty()
{ my $more = shift->{IMH_more_output} or return;
$more->();
}
sub mux_eof($)
{ my ($self, $refdata) = @_;
my $msg = delete $self->{IMH_incoming}; # headers only
if($msg && length($$refdata) && $self->{IMH_take_all})
{ $msg->content_ref($refdata);
}
else
{ trace "trailing ".length($$refdata)." bytes ignored"
if $$refdata =~ m/\S/;
}
$self->messageArrived($msg)
if $msg;
$self->SUPER::mux_eof($refdata);
}
sub bodyComponentArrived($$)
{ my ($self, $msg, $refdata) = @_;
my $headers = $msg->headers;
if(my $cl = $headers->header('Content-Length'))
{ return if length($$refdata) < $cl; # wait for more
$msg->content(substr $$refdata, 0, $cl, '');
return $msg;
}
# No Content-Length for multiparts?
my $ct = $headers->header('Content-Type') || '';
if($ct =~ m/^multipart\/\w+\s*;.*boundary\s*=(["']?)\s*(\w+)\1/i)
{ $$refdata =~ s/(.*?\r?\n--\Q$2\E--\r?\n)//
or return; # multipart terminator not received yet
$msg->content($1);
return $msg;
}
# No Content-Length and not multipart, then no body.
$msg;
}
sub headerArrived($) {panic}
sub messageArrived($) {panic}
#--------------
sub sendMessage($$)
{ my ($self, $msg, $callback) = @_;
if($self->mux_output_waiting || $self->{IMH_more_output})
{ # Arggg. Well, some message content still being written.
# Do not flood the outbufs with stringified requests.
# For instance, a number of large files to be sent back
# or uploaded as chunked.
push @{$self->{IMH_queued}}, [$msg, $callback];
return;
}
# Write the message now, and after that, but do not forget to
# handle messages which arrived during this sending after it.
my $queue_cb;
$queue_cb = sub
{ my $queued = shift @{$self->{IMH_queued}} or return;
my ($next_msg, $user_cb) = @$queued; # the next msg
$self->writeMessage($next_msg, sub {$queue_cb->(); $user_cb->()});
};
$self->writeMessage($msg, $queue_cb);
}
sub writeMessage($$)
{ my ($self, $msg, $callback) = @_;
my $header = $msg->headers;
$header->push_header
( Date => time2str(time)
, Connection => ($self->{IMH_no_more} ? 'close' : 'keep-alive')
, @{$self->{IMH_headers}}
);
my $content = $msg->content;
if(ref $content eq 'CODE')
{ # create chunked
$header->push_header(Transfer_Encoding => 'chunked');
my $size = 0;
$self->{IMH_more_output} = sub
{ my $chunk = $content->();
unless(defined $chunk)
{ delete $self->{IMH_more_output};
$self->write("0\r\n\r\n"); # end chunks and no footer
$size += 5;
info "sent CHUNKED msg ".$msg->id.' '.$msg->status." ${size}b";
return $callback->();
}
length $chunk or return;
my $hexlen = sprintf "%x", length $chunk;
$size += length($hexlen) + length($chunk) + 4;
$self->write("$hexlen\r\n$chunk\r\n");
};
$self->write(\$header->as_string);
}
else
{ # write message in one go.
$header->push_header(Content_Length => length $content);
$msg->content_ref(\$content);
my $text = $msg->as_string;
$self->write(\$text);
info "sent msg ".length($text)."b "
.(ref $msg).' '.($msg->isa('HTTP::Request') ? $msg->uri : $msg->content);
$callback->();
}
}
sub closeConnection() { shift->{IMH_no_more} = 1 }
1;
( run in 1.790 second using v1.01-cache-2.11-cpan-39bf76dae61 )