Catalyst-Engine-HTTP-POE

 view release on metacpan or  search on metacpan

lib/Catalyst/Engine/HTTP/POE.pm  view on Meta::CPAN

package Catalyst::Engine::HTTP::POE;

use strict;
use warnings;
use base 'Catalyst::Engine::HTTP';
use Data::Dump qw(dump);
use HTTP::Body;
use HTTP::Date ();
use HTTP::Headers;
use HTTP::Response;
use HTTP::Status ();
use POE;
use POE::Filter::Line;
use POE::Filter::Stream;
use POE::Wheel::ReadWrite;
use POE::Wheel::SocketFactory;
use Socket;
use Time::HiRes;

use Catalyst::Engine::HTTP::Restarter::Watcher;

our $VERSION = '0.08';

# Enable for helpful debugging information
sub DEBUG () { $ENV{CATALYST_POE_DEBUG} || 0 }
sub BENCH () { $ENV{CATALYST_POE_BENCH} || 0 }

# Max processes (including parent)
sub MAX_PROC () { $ENV{CATALYST_POE_MAX_PROC} || 1 }

# Keep-alive connection timeout in seconds
sub KEEPALIVE_TIMEOUT () { 300 }

# Benchmark::Stopwatch for profiling
if ( BENCH ) {
    require Benchmark::Stopwatch;
}

sub run { 
    my ( $self, $class, @args ) = @_;
    
    $self->spawn( $class, @args );
    
    POE::Kernel->run;
}

sub spawn {
    my ( $self, $class, $port, $host, $options ) = @_;
    
    my $addr = $host ? inet_aton($host) : INADDR_ANY;
    if ( $addr eq INADDR_ANY ) {
        require Sys::Hostname;
        $host = lc Sys::Hostname::hostname();
    }
    else {
        $host = gethostbyaddr( $addr, AF_INET ) || inet_ntoa($addr);
    }
    
    $self->{alias}  = delete $options->{alias} || 'catalyst-poe';
    
    $self->{config} = {
        appclass   => $class,
        addr       => $addr,
        port       => $port,
        host       => $host,
        options    => $options,
        children   => {},
        is_a_child => 0,
    };
    
    POE::Session->create(
        object_states => [
            $self => [
                qw/_start
                   _stop
                   shutdown
                   child_shutdown
                   dump_state
                   status
                   
                   prefork
                   sig_chld
                   
                   check_restart
                   restart
                   
                   accept_new_client
                   accept_failed

                   client_flushed
                   client_error
                   
                   read_input
                   process_input
                   process

                   handle_prepare
                   prepare_done

                   handle_finalize
                   finalize_done
                   client_done
                   
                   keepalive_timeout
               /
           ],
       ],
   );
   
   return $self;
}

# start the server
sub _start {
    my ( $kernel, $self ) = @_[ KERNEL, OBJECT ];
    
    $kernel->alias_set( $self->{alias} );

    # take a copy of %ENV
    $self->{global_env} = \%ENV;
    
    $self->{listener} = POE::Wheel::SocketFactory->new(
         ( defined ( $self->{config}->{addr} ) 
            ? ( BindAddress => $self->{config}->{addr} ) 
            : () 
         ),
         ( defined ( $self->{config}->{port} ) 
            ? ( BindPort => $self->{config}->{port} ) 
            : ( BindPort => 3000 ) 
         ),
         SuccessEvent   => 'accept_new_client',
         FailureEvent   => 'accept_failed',
         SocketDomain   => AF_INET,
         SocketType     => SOCK_STREAM,
         SocketProtocol => 'tcp',
         Reuse          => 'on',
    );

    # dump our state if we get a SIGUSR1
    $kernel->sig( USR1 => 'dump_state' );

    # shutdown on INT and TERM
    $kernel->sig( INT  => 'shutdown' );
    $kernel->sig( TERM => 'shutdown' );
    
    # restart on HUP
    $kernel->sig( HUP => 'restart' );
    
    # Pre-fork if requested
    $self->{config}->{options}->{max_proc} ||= MAX_PROC;
    if ( $self->{config}->{options}->{max_proc} > 1 ) {
        $kernel->sig( CHLD => 'sig_chld' );
        $kernel->yield( 'prefork' );
    }
    
    # Init restarter
    if ( $self->{config}->{options}->{restart} ) {
        my $delay = $self->{config}->{options}->{restart_delay} || 1;
        $kernel->delay_set( 'check_restart', $delay );
    }
    
    my $url = 'http://' . $self->{config}->{host};
    $url .= ':' . $self->{config}->{port}
        unless $self->{config}->{port} == 80;

lib/Catalyst/Engine/HTTP/POE.pm  view on Meta::CPAN

        FlushedEvent => 'client_flushed',
        ErrorEvent   => 'client_error',
        HighMark     => 128 * 1024,
        HighEvent    => sub {}, # useless, never gets called
        LowMark      => 8 * 1024,
        LowEvent     => sub {}, # also useless, we can use FlushedEvent
    );

    # get the local connection information
    my $local_sockaddr = getsockname($socket);
    my ( undef, $localiaddr ) = sockaddr_in($local_sockaddr);
    my $localaddr = inet_ntoa($localiaddr) || '127.0.0.1';
    my $localname = gethostbyaddr( $localiaddr, AF_INET ) || 'localhost';
    
    my $ID = $wheel->ID;
    
    $self->{clients}->{$ID} = {
        wheel     => $wheel,
        socket    => $socket,
        peeraddr  => $peeraddr,
        peerport  => $peerport,
        localaddr => $localaddr,
        localname => $localname,
        
        requests  => 0,
        inputbuf  => '',
        written   => 0,
        
        stopwatch => $stopwatch,
    };
    
    DEBUG && warn "[$ID] [$$] New connection (wheel $ID from $peeraddr:$peerport)\n";

    # Wait for some data to read
    $poe_kernel->select_read( $socket, 'read_input', $ID );
}

sub accept_failed {
    my ( $kernel, $self, $op, $errnum, $errstr ) = @_[ KERNEL, OBJECT, ARG0 .. ARG2 ];
    
    warn "Unable to start server: $op error $errnum: $errstr\n";
    
    $kernel->yield('shutdown');
}

sub client_error {
    my ( $kernel, $self, $op, $errnum, $errstr, $ID ) = @_[ KERNEL, OBJECT, ARG0 .. ARG3 ];
    
    DEBUG && warn "[$ID] [$$] Wheel generated $op error $errnum: $errstr\n";
    
    delete $self->{clients}->{$ID};
}

sub read_input {
    my ( $kernel, $self, $handle, $ID ) = @_[ KERNEL, OBJECT, ARG0, ARG2 ];
    
    my $client = $self->{clients}->{$ID} || return;
    
    BENCH && $client->{stopwatch}->lap('read_input');
    
    # Clear the keepalive timeout timer if set
    if ( my $timer = delete $client->{_timeout_timer} ) {
        $kernel->alarm_remove( $timer );
    }
    
    # Read some data from the driver
    my $driver = $client->{wheel}->[ $client->{wheel}->DRIVER_BOTH ];
    my $buffer_ref = $driver->get( $handle );
        
    if ( !$buffer_ref ) {
        # Error, stop reading and shut down this client
        DEBUG && warn "[$ID] [$$] Error reading, disconnecting\n";
        $kernel->select_read( $handle );
        delete $self->{clients}->{$ID};
        return;
    }
        
    $client->{inputbuf} .= join '', @{$buffer_ref};
        
    DEBUG && warn "[$ID] [$$] read_input (" . length( $client->{inputbuf} ) . " bytes in buffer)\n";
    
    $kernel->yield( 'process_input', $ID );
}

sub process_input {
    my ( $kernel, $self, $ID ) = @_[ KERNEL, OBJECT, ARG0 ];
    
    my $client = $self->{clients}->{$ID} || return;
    
    # Have we started processing the body?
    if ( exists $client->{_read} ) {
        
        _process_chunk( $client );
                
        return;
    }
    
    # Have we already parsed headers
    return if $client->{_headers};
    
    # Have we read enough to include all headers?
    if ( $client->{inputbuf} =~ /(\x0D\x0A?\x0D\x0A?|\x0A\x0D?\x0A\x0D?)/s ) {
        $client->{_headers} = 1;

        # Copy the buffer for header parsing, and remove the header block
        # from the content buffer.
        my $buf = $client->{inputbuf};
        $client->{inputbuf} =~ s/.*?(\x0D\x0A?\x0D\x0A?|\x0A\x0D?\x0A\x0D?)//s;
        
        # Parse the request line.
        if ( $buf !~ s/^(\w+)[ \t]+(\S+)(?:[ \t]+(HTTP\/\d+\.\d+))?[^\012]*\012// ) {
            # Invalid request
            DEBUG && warn "[$ID] [$$] Bad request: $buf\n";

            my $status   = 400;
            my $message  = HTTP::Status::status_message($status);
            my $response = HTTP::Response->new( $status => $message );
            $response->content_type( 'text/plain' );
            $response->content( "$status $message" );
            # XXX: fix to use CRLF
            $client->{wheel}->put( $response->as_string );

lib/Catalyst/Engine/HTTP/POE.pm  view on Meta::CPAN

    BENCH && $client->{stopwatch}->lap('finalize');

    $client->{_finalize_done} = 0;

    $poe_kernel->yield( 'handle_finalize', 'finalize_uploads', $ID );

    if ( $#{ $c->error } >= 0 ) {
        $poe_kernel->yield( 'handle_finalize', 'finalize_error', $ID );
    }

    $poe_kernel->yield( 'handle_finalize', 'finalize_headers', $ID );

    $poe_kernel->yield( 'handle_finalize', 'finalize_body', $ID );

    $poe_kernel->yield( 'finalize_done', $ID );

    while ( !$client->{_finalize_done} ) {
        $poe_kernel->run_one_timeslice();
    }
    
    return $c->response->status;
}

sub handle_finalize {
    my ( $kernel, $self, $method, $ID ) = @_[ KERNEL, OBJECT, ARG0, ARG1 ];
    
    DEBUG && warn "[$ID] [$$] - $method\n";

    my $client = $self->{clients}->{$ID} || return;
    
    BENCH && $client->{stopwatch}->lap(" - $method");

    # Set the response body to null when we're doing a HEAD request.
    # Must be done here so finalize_headers can still set the proper
    # Content-Length value
    if ( $method eq 'finalize_body' ) {
        if ( $client->{context}->request->method eq 'HEAD' ) {
            $client->{context}->response->body('');
        }
    }

    $client->{context}->$method();
}

sub finalize_headers {
    my ( $self, $c ) = @_;

    my $client = $self->{clients}->{ $c->{_POE_ID} } || return;
    
    BENCH && $client->{stopwatch}->lap('finalize_headers');
    
    my $protocol = 'HTTP/1.0'; # We're not HTTP/1.1 (yet)
    my $status   = $c->response->status;
    my $message  = HTTP::Status::status_message($status);

    my @headers;
    push @headers, "$protocol $status $message";
    
    $c->response->headers->header( Date => HTTP::Date::time2str(time) );

    # Some notes: I found that to get keepalive mode to perform well under ab,
    # I had to send all data in a single put() call, so the second put in write() below is
    # what caused keepalive to be so slow.  Not sure if this is just a quirk with ab
    # or really a performance problem. :(
    
    # Should we keep the connection open?
    my $connection = $c->request->header('Connection');
    if ( $connection && $connection =~ /^keep-alive$/i ) {
        $c->response->headers->header( Connection => 'keep-alive' );
        $client->{_keepalive} = 1;
    }
    else {
        $c->response->headers->header( Connection => 'close' );
    }
    
    push @headers, $c->response->headers->as_string("\x0D\x0A");
    
    # Buffer the headers so they are sent with the first write() call
    # This reduces the number of TCP packets we are sending
    $client->{_header_buf} = join("\x0D\x0A", @headers, '');
}

sub finalize_done {
    my ( $kernel, $self, $ID ) = @_[ KERNEL, OBJECT, ARG0 ];
    
    DEBUG && warn "[$ID] [$$] finalize_done\n";

    my $client = $self->{clients}->{$ID} || return;
    
    # If we did not send our headers yet (we had no body), send them now
    if ( my $headers = delete $client->{_header_buf} ) {
        $client->{wheel}->put( $headers );
    }

    $client->{_finalize_done} = 1;
    
    BENCH && $client->{stopwatch}->lap('finalize_done');
}

sub write {
    my ( $self, $c, $buffer ) = @_;

    my $ID = $c->{_POE_ID};
    my $client = $self->{clients}->{$ID} || return;
    
    BENCH && $client->{stopwatch}->lap('write');
    
    # keep track of the amount of data we've sent
    $client->{_written} += length $buffer;
    DEBUG && warn "[$ID] [$$] written: " . $client->{_written} . "\n";
    
    # Add headers to the first write() call
    if ( my $headers = delete $client->{_header_buf} ) {
        $client->{_highmark_reached} = $client->{wheel}->put( $headers . $buffer );
    }
    else {
        $client->{_highmark_reached} = $client->{wheel}->put( $buffer );
    }    

    # if the output buffer has reached the highmark, we have a
    # lot of outgoing data.  Don't return until it's been sent
    while ( $client && $client->{_highmark_reached} ) {
        $poe_kernel->run_one_timeslice();
    }

    # always return 1, we can't detect failures here
    return 1;
}

# client_flushed is called when all data is done being written to the browser
sub client_flushed {
    my ( $kernel, $self, $ID ) = @_[ KERNEL, OBJECT, ARG0 ];

    my $client = $self->{clients}->{$ID} || return;
    
    BENCH && $client->{stopwatch}->lap('client_flushed');

    # Are we done writing?
    if ( $client->{context} ) {
        my $cl = $client->{context}->response->content_length;
        if ( $cl && $client->{_written} >= $cl ) {
            DEBUG && warn "[$ID] [$$] client_flushed, written full content-length\n";
            $kernel->yield( 'client_done', $ID );
            return;
        }
    }

    # if we get this event because of the highmark being reached
    # don't clean up but reset the highmark value to 0
    if ( $client->{_highmark_reached} ) {
        $client->{_highmark_reached} = 0;
        return;
    }

    # we may have not had a content-length...
    $kernel->yield( 'client_done', $ID );
}

sub client_done {
    my ( $kernel, $self, $ID ) = @_[ KERNEL, OBJECT, ARG0 ];
    
    my $client = $self->{clients}->{$ID} || return;
    
    BENCH && warn "[$ID] [$$] Stopwatch:\n" . $client->{stopwatch}->stop->summary;

    # clean up everything about this client unless we are using keepalive
    if ( $client->{_keepalive} ) {
        DEBUG && warn "[$ID] [$$] client_done, keepalive enabled, waiting for more requests\n";
        $client->{requests}++;
        
        # Clear important variables from the previous state
        delete $client->{_headers};
        delete $client->{_written};
        delete $client->{_read};
        
        if ( BENCH ) {
            $client->{stopwatch} = Benchmark::Stopwatch->new->start;
        }

        # timeout idle connection after some seconds
        $client->{_timeout_timer} = $kernel->delay_set( 'keepalive_timeout', KEEPALIVE_TIMEOUT, $ID );
    }
    else {
        DEBUG && warn "[$ID] [$$] client_done, closing connection\n";
        delete $self->{clients}->{$ID};
    }
}

sub keepalive_timeout {
    my ( $kernel, $self, $ID ) = @_[ KERNEL, OBJECT, ARG0 ];
    
    DEBUG && warn "[$ID] [$$] Timing out idle keepalive connection\n";
    
    delete $self->{clients}->{$ID};
}

# Process a chunk of body data
sub _process_chunk {
    my $client = shift;
    
    # Read no more than content-length
    my $cl = $client->{env}->{CONTENT_LENGTH} || length( $client->{inputbuf} ) || 0;

    my $buf  = substr $client->{inputbuf}, 0, $cl, '';
    my $read = length($buf);
    
    return unless $read;
        
    $client->{context}->prepare_body_chunk( $buf );

    $client->{_read} += $read;
    
    if ( DEBUG ) {
        my $ID   = $client->{wheel}->ID;
        my $togo = $client->{_read_length} - $client->{_read};
        warn "[$ID] [$$] prepare_body: Read $read bytes ($togo to go)\n";
    }
    
    # Is that all the body data?
    if ( $client->{_read} >= $client->{_read_length} ) {
        # Some browsers (like MSIE 5.01) send extra CRLFs after the content
        # so we need to strip it away
        $client->{inputbuf} =~ s/^\s+//;
        
        $client->{_prepare_body_done} = 1;
    }
}

1;
__END__

=head1 NAME

Catalyst::Engine::HTTP::POE - Single-threaded multi-tasking Catalyst engine (deprecated in favor of HTTP::Prefork)

=head1 SYNOPIS

    CATALYST_ENGINE='HTTP::POE' script/yourapp_server.pl
    
    # Prefork 5 children
    CATALYST_POE_MAX_PROC=6 CATALYST_ENGINE='HTTP::POE' script/yourapp_server.pl

=head1 DEPRECATED

This engine has been deprecated.  Please consider using L<Catalyst::Engine::HTTP::Prefork> instead.

=head1 DESCRIPTION

This engine allows Catalyst to process multiple requests in parallel within a
single process.  Much of the internal Catalyst flow now uses POE yield calls.
Application code will still block of course, but all I/O, header processing, and
POST body processing is handled asynchronously.



( run in 1.354 second using v1.01-cache-2.11-cpan-39bf76dae61 )