App-Context
view release on metacpan or search on metacpan
lib/App/Context/POE/Server.pm view on Meta::CPAN
#############################################################################
## $Id: Server.pm 6786 2006-08-11 23:22:48Z zroberts $
#############################################################################
package App::Context::POE::Server;
$VERSION = (q$Revision: 6786 $ =~ /(\d[\d\.]*)/)[0]; # VERSION numbers generated by svn
use strict;
use vars qw(@ISA);
use warnings;
use App::Context::POE;
@ISA = ( "App::Context::POE" );
use POSIX ":sys_wait_h";
use Sys::Hostname;
use Date::Format;
use Date::Parse;
use Time::HiRes qw(gettimeofday tv_interval);
#sub POE::Kernel::TRACE_STATISTICS () { 1 }
#sub POE::Kernel::TRACE_PROFILE () { 1 }
use POE;
use POE::Component::Server::SimpleHTTP;
use POE::Component::IKC::Server;
use HTTP::Status qw/RC_OK/;
use Socket qw(INADDR_ANY);
use Storable qw(lock_store lock_retrieve);
sub _init {
&App::sub_entry if ($App::trace);
my ($self, $options) = @_;
$options = {} if (!defined $options);
$self->SUPER::_init($options);
App->mkdir($options->{prefix}, "data", "app", "Context");
$| = 1; # autoflush STDOUT (not sure this is required)
### Configuration stuff
my $host = hostname;
$self->{hostname} = $host;
$host =~ s/\..*//; # get rid of fully qualified domain name
$self->{host} = $host;
$options->{port} ||= 8080;
$self->{port} = $options->{port};
$options->{http_port} ||= $options->{port}+1;
$self->{poe_kernel_name} = "poe_$self->{host}_$self->{port}";
$self->{poe_kernel_http_name} = $self->{poe_kernel_name} . "_httpd";
$self->{poe_session_name} = "poe_session";
$self->{poe_kernel} = $poe_kernel;
$self->{num_procs} = 0;
$self->{max_procs} = $self->{options}{"app.context.max_procs"} || 10;
$self->{max_async_events} = $self->{options}{"app.context.max_async_events"}
if (defined $self->{options}{"app.context.max_async_events"});
$self->{max_async_events} ||= 10;
$self->{num_async_events} = 0;
$self->{async_event_count} = 0;
$self->{pending_async_events} = [];
$self->{running_async_event} = {};
$self->{poe_profile} = $options->{poe_profile};
$self->{poe_profile} = 60 if ($self->{poe_profile} && $self->{poe_profile} == 1);
$self->{poe_trace} = $options->{poe_trace};
$self->{verbose} = $options->{verbose};
$self->{poe_states} = [qw(
_start _stop _default poe_sigchld poe_sigterm poe_sigignore poe_shutdown poe_alarm poe_profile
ikc_register ikc_unregister ikc_shutdown
poe_run_event poe_event_loop_extension poe_dispatch_pending_async_events
poe_server_state poe_http_server_state poe_debug poe_http_debug poe_http_test_run
poe_enqueue_async_event poe_enqueue_async_event_finished poe_remote_async_event_finished
)];
$self->{poe_ikc_published_states} = [qw(
poe_server_state
poe_enqueue_async_event
poe_remote_async_event_finished
)];
### Does nothing by default, used by ClusterController, maybe other subclasses?
$self->_init2a($options);
### Do log rotation
### TODO: this should be refactored out
if ($self->{options}{log_rotate}) {
my $rotate_sec = $self->{options}{log_rotate};
$rotate_sec = $rotate_sec*(24*3600) if ($rotate_sec <= 31); # interpret as days
my $time = time();
my $base_time = str2time(time2str("%Y-%m-%d 00:00:00", $time)); # I need a base which is midnight local time
my $next_rotate_time = ((int(($time - $base_time)/$rotate_sec)+1)*$rotate_sec) + $base_time;
$self->schedule_event(
tag => "context-log-rotation",
method => "log_file_open",
lib/App/Context/POE/Server.pm view on Meta::CPAN
### Does nothing by default
$self->_init2b($options);
&App::sub_exit() if ($App::trace);
}
### Used by subclasses
sub _init2a {
&App::sub_entry if ($App::trace);
my ($self, $options) = @_;
$self->_init_poe($options);
&App::sub_exit() if ($App::trace);
}
sub _init_poe {
&App::sub_entry if ($App::trace);
my ($self, $options) = @_;
### Set up a server
POE::Component::IKC::Server->spawn(
port => $self->{port},
name => $self->{poe_kernel_name},
);
$self->log({level=>3},"Listening for Inter-Kernel Communications on $self->{host}:$self->{port}\n") if $self->{options}{poe_ikc_debug};
POE::Component::IKC::Responder->spawn();
my $session_name = $self->{poe_session_name};
POE::Component::Server::SimpleHTTP->new(
'ALIAS' => $self->{poe_kernel_http_name},
'ADDRESS' => INADDR_ANY,
'PORT' => $self->{options}{http_port},
'HANDLERS' => [
{ 'DIR' => '/debug', 'SESSION' => $session_name, 'EVENT' => 'poe_http_debug', },
{ 'DIR' => '/testrun', 'SESSION' => $session_name, 'EVENT' => 'poe_http_test_run', },
{ 'DIR' => '.*', 'SESSION' => $session_name, 'EVENT' => 'poe_http_server_state', },
],
);
$self->log({level=>3},"Listening for HTTP Requests on $self->{host}:$self->{options}{http_port}\n") if $self->{poe_trace};
&App::sub_exit() if ($App::trace);
}
### Used by subclasses
sub _init2b {
&App::sub_entry if ($App::trace);
my ($self, $options) = @_;
&App::sub_exit() if ($App::trace);
}
sub shutdown {
&App::sub_entry if ($App::trace);
my $self = shift;
### Shut down servers
### TODO
### Shut down children
$self->shutdown_child_processes();
### Call SUPER shutdown
$self->SUPER::shutdown();
&App::sub_exit() if ($App::trace);
}
sub shutdown_child_processes {
&App::sub_entry if ($App::trace);
my $self = shift;
if ($self->{proc}) {
foreach my $pid (keys %{$self->{proc}}) {
kill(15, $pid);
}
}
&App::sub_exit() if ($App::trace);
}
sub dispatch_events {
&App::sub_entry if ($App::trace);
my ($self, $max_events_occurred) = @_;
my $verbose = $self->{verbose};
my $options = $self->{options};
my ($role, $port, $startup, $shutdown);
$self->dispatch_events_begin();
### Set up init_objects, untouched and snagged from App::Context::POE::Server
my $objects = $options->{init_objects};
my ($service_type, $name, $service);
foreach my $object (split(/ *[;,]+ */, $objects)) {
if ($object) {
if ($object =~ /^([A-Z][A-Za-z0-9]+)\.([A-Za-z0-9_-]+)$/) {
$service_type = $1;
$name = $2;
}
else {
$service_type = "SessionObject";
$name = $object;
}
$service = $self->service($service_type, $name); # instantiate it. that's all.
$self->log({level=>3},"dispatch_events: $service_type $name instantiated [$service]\n");
$self->{main_service} = $service if (!$self->{main_service});
}
}
eval {
### POE Server begins here
POE::Session->create( object_states => [ $self => $self->{poe_states} ] );
$poe_kernel->run();
};
if ($@) {
$self->log({level=>2},$@);
}
$self->dispatch_events_end();
$self->shutdown();
&App::sub_exit() if ($App::trace);
}
sub dispatch_events_begin {
&App::sub_entry if ($App::trace);
lib/App/Context/POE/Server.pm view on Meta::CPAN
$str = "";
if ($main_service && $main_service->can("format_async_event")) {
$str = $main_service->format_async_event($event, $callback_event, $runtime_event_token);
}
if ($str) {
$state .= " ";
$state .= $main_service->format_async_event($event, $callback_event, $runtime_event_token);
$state .= "\n";
}
else {
@args = ();
@args = @{$event->{args}} if ($event->{args});
$args_str = join(",",@args);
$state .= sprintf(" %-20s %-20s %-24s", $event->{event_token}, $runtime_event_token, "$event->{name}.$event->{method}($args_str)");
if ($callback_event) {
@args = ();
@args = @{$callback_event->{args}} if ($callback_event->{args});
$args_str = join(",",@args);
$state .= "$callback_event->{name}.$callback_event->{method}($args_str)";
}
$state .= "\n";
}
}
$state .= "\n";
$state .= "Pending Async Events: count [$self->{async_event_count}]\n";
foreach $async_event (@{$self->{pending_async_events}}) {
($event, $callback_event) = @$async_event;
$str = "";
if ($main_service && $main_service->can("format_async_event")) {
$str = $main_service->format_async_event($event, $callback_event);
}
if ($str) {
$state .= " ";
$state .= $main_service->format_async_event($event, $callback_event);
$state .= "\n";
}
else {
@args = ();
@args = @{$event->{args}} if ($event->{args});
$args_str = join(",",@args);
$state .= sprintf(" %-20s %-40s", $event->{event_token}, "$event->{name}.$event->{method}($args_str)");
if ($callback_event) {
@args = ();
@args = @{$callback_event->{args}} if ($callback_event->{args});
$args_str = join(",",@args);
$state .= " => $callback_event->{name}.$callback_event->{method}($args_str)";
}
$state .= "\n";
}
}
$state .= "\n";
### Only enable this in development, requires a library uncomment as well
$state .= $self->_state_poe();
### THIS DOESN'T WORK YET
#$state .= $self->_state_q();
$state .= $self->SUPER::_state();
&App::sub_exit($state) if ($App::trace);
return($state);
}
sub _state_poe {
my ($self) = @_;
my $state = "";
### POE state dumping - Currently commented out because it doesn't gain us much
### in the way of visibility, and POE::API::Peek is a CPAN pain
### UNCOMMENT THIS IF YOU NEED IT, DEPENDS ON A PAINFUL LIBRARY
if ($self->{poe_peek}) {
App->use("POE::API::Peek");
my $api = POE::API::Peek->new();
my @queue = $api->event_queue_dump();
$state .= "POE event_queue_dump\n";
my $first = 1;
my $poe_stuff = [qw(ID index priority event type source destination)];
for my $item (@queue) {
if ($first) {
$state .= sprintf("%7s %6s %20s %30s %15s %30s %30s\n", @$poe_stuff);
$first = 0;
}
$state .= sprintf("%7d %6d %20f %30s %15s %30s %30s\n", @{$item}{@$poe_stuff});
}
$state .= "\n";
}
return $state;
}
### THIS DOESN'T WORK YET, THROWS AN EXCEPTION
sub _state_q {
my $self = @_;
my $HOTEL_SITE_QNAME = "q-hotel_site";
my $HOTEL_COMPUTE_QNAME = "q-hotel_compute";
my $state = "";
for my $qname ("q-hotel_site", "q-hotel_compute") {
$state .= "$qname\n";
### EXCEPTION IS THROWN HERE
my $q = $self->work_queue($qname);
my $entries = $q->{data};
foreach my $entry (@$entries) {
$state .= sprintf(" {", join("|",%$entry), "}\n");
}
$state .= "\n";
}
$state .= "\n";
return $state;
}
sub debug {
&App::sub_entry if ($App::trace);
my ($self) = @_;
my $datetime = time2str("%Y-%m-%d %H:%M:%S", time());
my $debug = "DEBUG --- Server: $self->{host}:$self->{port} procs[$self->{num_procs}/$self->{max_procs}:max] async_events[$self->{num_async_events}/$self->{max_async_events}:max]\n[$datetime]\n";
lib/App/Context/POE/Server.pm view on Meta::CPAN
};
if ($@) {
@results = ($@);
}
if ($#results > -1 && defined $results[0] && $results[0] ne "") {
my $ipc_file = $self->{options}{prefix} . "/data/app/Context/$$";
### Use Storable as IPC
if ($self->{options}{poe_storable_ipc}) {
my $results = (@results == 1) ? $results[0] : \@results;
my $success = lock_store($results, $ipc_file);
}
### Use a string value as IPC
else {
if (open(FILE, "> $ipc_file")) {
print App::Context::POE::Server::FILE @results;
close(App::Context::POE::Server::FILE);
}
else {
$exitval = 1;
}
}
}
$self->shutdown();
$self->exit($exitval);
}
my $destination = $event->{destination} || "local";
$self->{num_async_events}++;
$self->{node}{$destination}{num_async_events}++;
my $runtime_event_token = $pid;
$self->{running_async_event}{$runtime_event_token} = [ $event, $callback_event ];
}
$self->profile_stop("send_async_event_now") if $self->{poe_profile};
&App::sub_exit() if ($App::trace);
}
=head2 wait_for_event()
* Signature: $self->wait_for_event($event_token)
* Param: $event_token string
* Return: void
* Throws: App::Exception
* Since: 0.01
Sample Usage:
$self->wait_for_event($event_token);
The wait_for_event() method is called when an asynchronous event has been
sent and no more processing can be completed before it is done.
=cut
sub wait_for_event {
&App::sub_entry if ($App::trace);
my ($self, $event_token) = @_;
&App::sub_exit() if ($App::trace);
}
sub fork {
&App::sub_entry if ($App::trace);
my ($self) = @_;
my $pid = $self->SUPER::fork();
if ($pid) { # the parent process has a new child process
$self->{num_procs}++;
$self->{proc}{$pid} = {};
}
else { # the new child process has no sub-processes
$self->{num_procs} = 0;
$self->{proc} = {};
$SIG{INT} = sub { $self->log({level=>3},"fork: Caught Signal: @_ (quitting)\n"); $self->exit(102); }; # SIG 2
$SIG{QUIT} = sub { $self->log({level=>3},"fork: Caught Signal: @_ (quitting)\n"); $self->exit(103); }; # SIG 3
$SIG{TERM} = sub { $self->log({level=>3},"fork: Caught Signal: @_ (quitting)\n"); $self->shutdown(); $self->exit(115); }; # SIG 15
}
&App::sub_exit($pid) if ($App::trace);
return($pid);
}
sub finish_pid {
&App::sub_entry if ($App::trace);
my ($self, $pid, $exitval, $sig) = @_;
$self->{num_procs}--;
delete $self->{proc}{$pid};
my $runtime_event_token = $pid;
my $async_event = $self->{running_async_event}{$runtime_event_token};
if ($async_event) {
my ($event, $callback_event) = @$async_event;
my $returnval = "";
my $ipc_file = $self->{options}{prefix} . "/data/app/Context/$pid";
### Use Storable as IPC
if ($self->{options}{poe_storable_ipc}) {
$returnval = lock_retrieve($ipc_file);
unlink($ipc_file);
}
### Use a string value as IPC
else {
if (open(FILE, $ipc_file)) {
if ($callback_event) {
$returnval = join("",<App::Context::POE::Server::FILE>);
}
close(App::Context::POE::Server::FILE);
unlink($ipc_file);
}
}
my $destination = $event->{destination} || "local";
$self->{num_async_events}--;
$self->{node}{$destination}{num_async_events}--;
delete $self->{running_async_event}{$runtime_event_token};
if ($callback_event) {
$callback_event->{args} = [] if (! $callback_event->{args});
my $errmsg = ($exitval || $sig) ? "Exit $exitval on $pid [sig=$sig]" : "";
push(@{$callback_event->{args}[2]{args}}, { event_token => $callback_event->{event_token},
returnval => $returnval,
errnum => $exitval,
errmsg => $errmsg });
$self->send_event($callback_event);
}
elsif ($sig == 9) { # killed without a chance to finish its work
$self->finish_killed_async_event($event);
( run in 1.021 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )