Plack-App-MCCS

 view release on metacpan or  search on metacpan

local/lib/perl5/Twiggy/Server.pm  view on Meta::CPAN

        ($host, $port) = split /:/, $listen;
        $host = undef if $host eq '';
        $port = undef if $port == 0;
        $is_tcp = 1;
    } else {
        $host = "unix/";
        $port = $listen;
    }

    my($listen_host, $listen_port);

    return tcp_server $host, $port, $self->_accept_handler($app, $is_tcp, \$listen_host, \$listen_port),
        $self->_accept_prepare_handler(\$listen_host, \$listen_port);
}

sub _accept_prepare_handler {
    my($self, $listen_host_r, $listen_port_r) = @_;

    return sub {
        my ( $fh, $host, $port ) = @_;
        DEBUG && warn "Listening on $host:$port\n";
        $$listen_host_r = $host;
        $$listen_port_r = $port;
        $self->{server_ready}->({
            host => $host,
            port => $port,
            server_software => 'Twiggy',
        }) if $self->{server_ready};

        return $self->{backlog} || 0;
    };
}

sub _accept_handler {
    my ( $self, $app, $is_tcp, $listen_host_r, $listen_port_r ) = @_;

    return sub {
        my ( $sock, $peer_host, $peer_port ) = @_;

        DEBUG && warn "$sock Accepted connection from $peer_host:$peer_port\n";
        return unless $sock;
        $self->{exit_guard}->begin;

        if ( $is_tcp && $self->{no_delay} ) {
            setsockopt($sock, IPPROTO_TCP, TCP_NODELAY, 1)
                or die "setsockopt(TCP_NODELAY) failed:$!";
        }

        my $headers = "";
        my $try_parse = sub {
            if ( $self->_try_read_headers($sock, $headers) ) {
                my $env = {
                    SERVER_NAME         => $$listen_host_r,
                    SERVER_PORT         => $$listen_port_r,
                    SCRIPT_NAME         => '',
                    REMOTE_ADDR         => $peer_host,
                    'psgi.version'      => [ 1, 0 ],
                    'psgi.errors'       => *STDERR,
                    'psgi.url_scheme'   => 'http',
                    'psgi.nonblocking'  => Plack::Util::TRUE,
                    'psgi.streaming'    => Plack::Util::TRUE,
                    'psgi.run_once'     => Plack::Util::FALSE,
                    'psgi.multithread'  => Plack::Util::FALSE,
                    'psgi.multiprocess' => Plack::Util::FALSE,
                    'psgi.input'        => undef, # will be set by _run_app()
                    'psgix.io'          => $sock,
                    'psgix.input.buffered' => Plack::Util::TRUE,
                };

                my $reqlen = parse_http_request($headers, $env);
                DEBUG && warn "$sock Parsed HTTP headers: request length=$reqlen\n";

                if ( $reqlen < 0 ) {
                    die "bad request";
                } else {
                    return $env;
                }
            }

            return;
        };

        local $@;
        unless ( eval {
            if ( my $env = $try_parse->() ) {
                # the request data is already available, no need to parse more
                $self->_run_app($app, $env, $sock);
            } else {
                # there's not yet enough data to parse the request,
                # set up a watcher
                $self->_create_req_parsing_watcher( $sock, $try_parse, $app );
            };

            1;
        }) {
            my $disconnected = ($@ =~ /^client disconnected/);
            $self->_bad_request($sock, $disconnected);
        }
    };
}

# returns a closure that tries to parse
# this is not a method because it needs a buffer per socket
sub _try_read_headers {
    my ( $self, $sock, undef ) = @_;

    # FIXME add a timer to manage read timeouts
    local $/ = "\012";

    read_more: for my $headers ( $_[2] ) {
        if ( defined(my $line = <$sock>) ) {
            $headers .= $line;

            if ( $line eq "\015\012" or $line eq "\012" ) {
                # got an empty line, we're done reading the headers
                return 1;
            } else {
                # try to read more lines using buffered IO
                redo read_more;
            }
        } elsif ($! and $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK ) {

local/lib/perl5/Twiggy/Server.pm  view on Meta::CPAN

            my $rlen = read($sock, $data, $read_size, length($data));

            if (defined $rlen and $rlen > 0) {
                $remaining -= $rlen;

                if ($remaining <= 0) {
                    $cb->($data);
                    return 1;
                } else {
                    redo READ_MORE;
                }
            } elsif (defined $rlen) {
                $cb->($data);
                return 1;
            } elsif ($! and $! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
                die $!;
            } elsif (!$!) {
                die "client disconnected";
            }
        }

        return;
    };

    unless ($try_read->()) {
        my $rw; $rw = AE::io($sock, 0, sub {
            try {
                if ($try_read->()) {
                    undef $rw;
                }
            } catch {
                undef $rw;
                $self->_bad_request($sock);
            };
        });
    }
}

sub _run_app {
    my($self, $app, $env, $sock) = @_;

    unless ($env->{'psgi.input'}) {
        if ($env->{CONTENT_LENGTH}) {
            $self->_read_chunk($sock, $env->{CONTENT_LENGTH}, sub {
                my ($data) = @_;
                open my $input, '<', \$data;
                $env->{'psgi.input'} = $input;
                $self->_run_app($app, $env, $sock);
            });
            return;
        } else {
            $env->{'psgi.input'} = $null_io;
        }
    }

    my $res = Plack::Util::run_app $app, $env;

    if ( ref $res eq 'ARRAY' ) {
        $self->_write_psgi_response($sock, $res);
    } elsif ( blessed($res) and $res->isa("AnyEvent::CondVar") ) {
        Carp::carp("Returning AnyEvent condvar is deprecated and will be removed in the next release of Twiggy. Use the streaming callback interface intstead.");
        $res->cb(sub { $self->_write_psgi_response($sock, shift->recv) });
    } elsif ( ref $res eq 'CODE' ) {
        my $created_writer;

        $res->(
            sub {
                my $res = shift;

                if ( @$res < 2 ) {
                    croak "Insufficient arguments";
                } elsif ( @$res == 2 ) {
                    my ( $status, $headers ) = @$res;

                    $self->_flush($sock);

                    my $writer = Twiggy::Writer->new($sock, $self->{exit_guard});
                    $created_writer = 1;

                    my $buf = $self->_format_headers($status, $headers);
                    $writer->write($$buf);

                    return $writer;
                } else {
                    my ( $status, $headers, $body, $post ) = @$res;
                    my $cv = $self->_write_psgi_response($sock, [ $status, $headers, $body ]);
                    $cv->cb(sub { $post->() }) if $post;
                }
            },
            $sock,
        );

        if($created_writer) {
            $self->{exit_guard}->end; # normally _write_psgi_response calls this, but it doesn't get called when we use a writer!
        }
    } else {
        croak("Unknown response type: $res");
    }
}

sub _write_psgi_response {
    my ( $self, $sock, $res ) = @_;

    if ( ref $res eq 'ARRAY' ) {
        if ( scalar @$res == 0 ) {
            # no response
            $self->{exit_guard}->end;
            return;
        }

        my ( $status, $headers, $body ) = @$res;

        my $cv = AE::cv;

        $self->_write_headers( $sock, $status, $headers )->cb(sub {
            local $@;
            if ( eval { $_[0]->recv; 1 } ) {
                $self->_write_body($sock, $body)->cb(sub {
                    shutdown $sock, 1;
                    close $sock;
                    $self->{exit_guard}->end;



( run in 0.794 second using v1.01-cache-2.11-cpan-97f6503c9c8 )