IO-Lambda
view release on metacpan or search on metacpan
lib/IO/Lambda/HTTP/Server.pm view on Meta::CPAN
package IO::Lambda::HTTP::Server;
use vars qw(@ISA @EXPORT_OK $DEBUG);
@EXPORT = qw(http_server);
use base qw(Exporter IO::Lambda);
our $DEBUG = $IO::Lambda::DEBUG{httpd} || 0;
use strict;
use warnings;
use Socket;
use Exporter;
use IO::Socket::INET;
use HTTP::Request;
use HTTP::Response;
use IO::Lambda qw(:lambda :stream);
use IO::Lambda::Socket qw(accept);
use Time::HiRes qw(time);
my $CRLF = "\x0d\x0a";
sub _close($$)
{
warn "$_[1]\n" if $DEBUG;
close($_[0]);
}
sub _msg
{
my ( $status, $msg, $close) = @_;
my $resp = "HTTP/1.1 $status${CRLF}Content-Length: ".length($msg)."$CRLF";
$resp .= "Connection: ".($close ? 'close' : 'keep-alive')."$CRLF";
$resp .= "Date: ". scalar(localtime).$CRLF;
$resp .= "Content-Type: text/plain$CRLF" if length($msg);
$resp .= $CRLF . $msg;
return $resp;
}
sub _bye
{
my ( $self, $conn, $close, $msg) = @_;
tail {
my $resp = _msg( $msg, '', $close);
context writebuf, $conn, \$resp, length($resp), 0, $self->{timeout};
tail {
if ( $close ) {
warn "[$self->{sessions}->{$conn}->{remote}] disconnect\n" if $DEBUG;
if ( !close($conn)) {
warn "close error:$!\n" if $DEBUG;
}
}
}}
}
sub _bad_request
{
my ( $self, $conn, $close) = @_;
$self->_bye( $conn, $close, "400 Bad Request");
}
sub _timeout
{
my ( $self, $conn) = @_;
$self->_bye($conn, 1, "408 Timeout");
}
sub handle_connection
{
my ($self, $conn, $cb) = @_;
my %session;
my $session_data = $self->{sessions}->{"$conn"};
my $buf = '';
lambda {
$session_data->{active} = 0;
context readbuf, $conn, \$buf, 1, $self->{timeout};
tail {
$session_data->{active} = 1;
context readbuf, $conn, \$buf, qr/^.*?\x{0D}?\x{0A}\x{0D}?\x{0A}/s, $self->{timeout};
tail {
my ( $match, $error) = @_;
return $self->_timeout($conn) if defined($error) and $error eq 'timeout';
return _close $conn, $error unless defined $match;
warn length($buf), " bytes read\n" if $DEBUG > 1;
my $req = HTTP::Request-> parse( $match);
return $self->_bad_request($conn, 1) unless $req;
my $proto = (( $req->protocol // '') =~ /^HTTP\/([\d\.]+)$/i) ? $1 : 1.0;
my $keep_alive =
$proto >= 1.1 &&
(lc( $req->header('Connection') // 'keep-alive') eq 'keep-alive');
$keep_alive = 0 if $self->{shutdown};
my $cl = length($match) + ($req->header('Content-Length') // 0);
context readbuf, $conn, \$buf, $cl, $self->{timeout};
tail {
my ( undef, $error) = @_;
return $self->_timeout($conn) if defined($error) and $error eq 'timeout';
return _close $conn, $error if defined $error;
warn length($buf), " bytes read\n" if $DEBUG > 1;
unless ($req = HTTP::Request-> parse( $buf)) {
return lambda {
context $self->_bad_request($conn, !$keep_alive);
tail {
this->start if $keep_alive && !($self->{shutdown} && !length($buf));
}};
}
substr( $buf, 0, $cl, '');
my $resp;
eval { ($resp, $error) = $cb->($req, \%session); };
if ($@) {
$error = $@;
warn $@;
}
context UNIVERSAL::isa( $resp, 'IO::Lambda') ?
$resp : lambda { $resp, $error };
tail {
my $error;
($resp, $error) = @_;
$keep_alive = 0 if $self->{shutdown};
if ( $error ) {
$resp = _msg("500 Server Error", $error, !$keep_alive);
} elsif ( UNIVERSAL::isa( $resp, 'HTTP::Response')) {
$resp->header(Connection => ($keep_alive ? 'keep-alive' : 'close'));
$resp->protocol("HTTP/1.1");
$resp = $resp->as_string($CRLF);
} else {
$resp = _msg("200 OK", $resp // '', !$keep_alive);
}
context writebuf, $conn, \$resp, length($resp), 0, $self->{timeout};
tail {
my ( undef, $error) = @_;
return _close $conn, $error if defined $error;
warn length($resp), " bytes written\n" if $DEBUG > 1;
return this->start if $keep_alive && !($self->{shutdown} && !length($buf));
warn "[$session_data->{remote}] disconnect\n" if $DEBUG;
if ( !close($conn)) {
warn "error during response:$!\n" if $DEBUG;
}
}}}}}}
}
sub http_server(&$;@)
{
my ( $cb, $listen, %opt) = @_;
my $port = 80;
unless ( ref $listen ) {
($listen, $port) = ($1, $2) if $listen =~ /^(.*)\:(\d+)$/;
$listen = IO::Socket::INET->new(
Listen => 5,
LocalAddr => $listen,
LocalPort => $port,
Proto => 'tcp',
ReuseAddr => 1,
);
unless ( $listen ) {
warn "$!\n" if $DEBUG;
return (undef, $!);
}
} else {
$port = $listen->sockport;
}
return __PACKAGE__->new(
socket => $listen,
port => $port,
callback => $cb,
%opt
);
}
sub new
{
my ( $class, %opt ) = @_;
my $cb = delete $opt{callback};
my $self;
$self = lambda {
context $self->{socket};
$self->{accept_event} = accept {
return if $self->{shutdown};
my $conn = shift;
$self->{accept_event} = again;
unless ( ref($conn)) {
warn "accept() error:$conn\n" if $DEBUG;
return;
}
$conn-> blocking(0);
my $sess = $self->{sessions}->{"$conn"} = {
active => 0,
};
( run in 0.758 second using v1.01-cache-2.11-cpan-39bf76dae61 )