POE-Component-PreforkDispatch

 view release on metacpan or  search on metacpan

lib/POE/Component/PreforkDispatch.pm  view on Meta::CPAN

}

sub handle_request {
    my ($heap, $request) = (shift, shift);

    my $method_name = $request->{method_name};
    my @args = ($request->{from}, @{ $request->{params} });

    call_method($heap, '_precall', \@args);
    my $result = call_method($heap, $method_name, \@args, $request);
    call_method($heap, '_postcall', \@args);

    return $result;
}

sub call_method {
    my ($heap, $method_name, $args, $request) = @_;

    # Find a method to handle this

    my $method = $heap->{methods} ? $heap->{methods}{$method_name} : undef;
    $method ||= $request->{method};

    if ($method) {
        if (ref($method)) {
            return $method->(@$args);
        }
        else {
            return $poe_kernel->call( $request->{session}, $method, @$args );
        }
    }
    elsif ($heap->{classes}) {
        foreach my $class (@{ $heap->{classes} }) {
            # TODO - see if class has function $method_name
        }
        return { error => "Class-based method calls not yet implemented" };
    }
    elsif ($heap->{xmlrpc_server_parent}) {
        my $from = shift @$args;
        my $transaction = POE::Component::PreforkDispatch::PseudoXMLRPCTransaction->new(@$args);
        $poe_kernel->call( $heap->{xmlrpc_server_parent}, $method_name, $transaction );
        return { error => "Couldn't call XMLRPC method $method_name on session ".$heap->{xmlrpc_server_parent} } if $!;
        return $transaction->result();
    }
    else {
        return { error => "Unknown XMLRPC method $method_name" };
    }
}


## RPC methods

sub process_queue {
    my ($kernel, $heap) = @_[KERNEL, HEAP];

    # Do nothing if queue is empty
    return if $#{ $heap->{request_queue} } < 0;

    # Find a fork to use

    # Check for available, not busy existing forks.
    # Choose the fork that's been waiting the longest
    my @avail_forks = 
        sort { $a->{finished_request} <=> $b->{finished_request} }
        grep { $_->{status} eq 'idle' }
        @{ $heap->{forks} };

    my $use_fork = $avail_forks[0] ? $avail_forks[0] : undef;

    # If no fork found, create a new one if possible.  Otherwise, wait.
    if (! $use_fork) {
        if (int @{ $heap->{forks} } == $heap->{max_forks}) {
            # Already forked the max number; have to wait for one to return
            $heap->{talkback}("All forks are busy; will wait to handle request after a fork returns") if $heap->{verbose};
            return;
        }

        # Don't forkbomb; delay before spawning another fork
        if ($heap->{last_fork_created} && time - $heap->{last_fork_created} < 5) {
            $heap->{talkback}("Delaying 2 sec on creating another fork") if $heap->{verbose};
            $kernel->delay('process_queue', 2);
            return;
        }
        $use_fork = fork_new($heap);
        $heap->{talkback}("Creating new fork " . $use_fork->{id});
        $heap->{last_fork_created} = time;
    }

    ## With a fork found, hand off the first request in queue to this fork

    my $request = shift @{ $heap->{request_queue} };

    $heap->{talkback}("Handling request " . $request->{method_name} . " with fork " . $use_fork->{id}) if $heap->{verbose};

    $use_fork->{active_request} = $request;
    $use_fork->{status} = 'waiting_response';
    $use_fork->{started_request} = time;

    $use_fork->{wheel}->put( $request );
}

sub fork_new {
    my ($heap) = @_;

    call_method($heap, '_fork_preinit');

    # Create a new fork
    my $wheel = POE::Wheel::Run->new(
        Program => sub { fork_main($heap) },
        StdinFilter => POE::Filter::Reference->new(),
        StdoutFilter => POE::Filter::Reference->new(),
        StdoutEvent => 'fork_input',
        StderrEvent => 'fork_debug',
        CloseEvent => 'fork_closed',
        ErrorEvent => 'fork_error',
    );
    my $fork = {
        status => 'idle',
        wheel => $wheel,
        id => $wheel->ID,

        active_request => undef,
        started_request => 0,
        finished_request => 0,
    };
    push @{ $heap->{forks} }, $fork;
    call_method($heap, '_fork_postinit', $fork);
    return $fork;
}

sub fork_main {
    my ($heap) = @_;
    my $raw;
    my $size   = 4096;



( run in 2.506 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )