POE-Component-PreforkDispatch
view release on metacpan or search on metacpan
lib/POE/Component/PreforkDispatch.pm view on Meta::CPAN
}
sub handle_request {
my ($heap, $request) = (shift, shift);
my $method_name = $request->{method_name};
my @args = ($request->{from}, @{ $request->{params} });
call_method($heap, '_precall', \@args);
my $result = call_method($heap, $method_name, \@args, $request);
call_method($heap, '_postcall', \@args);
return $result;
}
sub call_method {
my ($heap, $method_name, $args, $request) = @_;
# Find a method to handle this
my $method = $heap->{methods} ? $heap->{methods}{$method_name} : undef;
$method ||= $request->{method};
if ($method) {
if (ref($method)) {
return $method->(@$args);
}
else {
return $poe_kernel->call( $request->{session}, $method, @$args );
}
}
elsif ($heap->{classes}) {
foreach my $class (@{ $heap->{classes} }) {
# TODO - see if class has function $method_name
}
return { error => "Class-based method calls not yet implemented" };
}
elsif ($heap->{xmlrpc_server_parent}) {
my $from = shift @$args;
my $transaction = POE::Component::PreforkDispatch::PseudoXMLRPCTransaction->new(@$args);
$poe_kernel->call( $heap->{xmlrpc_server_parent}, $method_name, $transaction );
return { error => "Couldn't call XMLRPC method $method_name on session ".$heap->{xmlrpc_server_parent} } if $!;
return $transaction->result();
}
else {
return { error => "Unknown XMLRPC method $method_name" };
}
}
## RPC methods
sub process_queue {
my ($kernel, $heap) = @_[KERNEL, HEAP];
# Do nothing if queue is empty
return if $#{ $heap->{request_queue} } < 0;
# Find a fork to use
# Check for available, not busy existing forks.
# Choose the fork that's been waiting the longest
my @avail_forks =
sort { $a->{finished_request} <=> $b->{finished_request} }
grep { $_->{status} eq 'idle' }
@{ $heap->{forks} };
my $use_fork = $avail_forks[0] ? $avail_forks[0] : undef;
# If no fork found, create a new one if possible. Otherwise, wait.
if (! $use_fork) {
if (int @{ $heap->{forks} } == $heap->{max_forks}) {
# Already forked the max number; have to wait for one to return
$heap->{talkback}("All forks are busy; will wait to handle request after a fork returns") if $heap->{verbose};
return;
}
# Don't forkbomb; delay before spawning another fork
if ($heap->{last_fork_created} && time - $heap->{last_fork_created} < 5) {
$heap->{talkback}("Delaying 2 sec on creating another fork") if $heap->{verbose};
$kernel->delay('process_queue', 2);
return;
}
$use_fork = fork_new($heap);
$heap->{talkback}("Creating new fork " . $use_fork->{id});
$heap->{last_fork_created} = time;
}
## With a fork found, hand off the first request in queue to this fork
my $request = shift @{ $heap->{request_queue} };
$heap->{talkback}("Handling request " . $request->{method_name} . " with fork " . $use_fork->{id}) if $heap->{verbose};
$use_fork->{active_request} = $request;
$use_fork->{status} = 'waiting_response';
$use_fork->{started_request} = time;
$use_fork->{wheel}->put( $request );
}
sub fork_new {
my ($heap) = @_;
call_method($heap, '_fork_preinit');
# Create a new fork
my $wheel = POE::Wheel::Run->new(
Program => sub { fork_main($heap) },
StdinFilter => POE::Filter::Reference->new(),
StdoutFilter => POE::Filter::Reference->new(),
StdoutEvent => 'fork_input',
StderrEvent => 'fork_debug',
CloseEvent => 'fork_closed',
ErrorEvent => 'fork_error',
);
my $fork = {
status => 'idle',
wheel => $wheel,
id => $wheel->ID,
active_request => undef,
started_request => 0,
finished_request => 0,
};
push @{ $heap->{forks} }, $fork;
call_method($heap, '_fork_postinit', $fork);
return $fork;
}
sub fork_main {
my ($heap) = @_;
my $raw;
my $size = 4096;
( run in 2.506 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )