App-Context
view release on metacpan or search on metacpan
lib/App/Context/Server.pm view on Meta::CPAN
#############################################################################
## $Id: Server.pm 9819 2007-08-03 19:34:40Z spadkins $
#############################################################################
package App::Context::Server;
$VERSION = (q$Revision: 9819 $ =~ /(\d[\d\.]*)/)[0]; # VERSION numbers generated by svn
use App;
use App::Context;
@ISA = ( "App::Context" );
use Sys::Hostname;
use Socket;
use IO::Socket;
use IO::Socket::INET;
use POSIX ":sys_wait_h";
use Date::Format;
use Date::Parse;
use strict;
=head1 NAME
App::Context::Server - a runtime environment with a single master server and its subprocesses
=head1 SYNOPSIS
# ... official way to get a Context object ...
use App;
$context = App->context();
$config = $context->config(); # get the configuration
$config->dispatch_events(); # dispatch events
# ... alternative way (used internally) ...
use App::Context::Server;
$context = App::Context::Server->new();
=cut
sub _init {
&App::sub_entry if ($App::trace);
my ($self, $options) = @_;
$options = {} if (!defined $options);
$self->SUPER::_init($options);
App->mkdir($options->{prefix}, "data", "app", "Context");
$| = 1; # autoflush STDOUT (not sure this is required)
open(STDERR, ">&STDOUT") || die "Unable to redirect STDERR to STDOUT";
my $host = hostname;
$self->{hostname} = $host;
$host =~ s/\..*//; # get rid of fully qualified domain name
$self->{host} = $host;
$self->{port} = $options->{port} || 8080;
$self->{num_procs} = 0;
$self->{max_procs} = $self->{options}{"app.context.max_procs"} || 10;
$self->{max_async_events} = $self->{options}{"app.context.max_async_events"}
if (defined $self->{options}{"app.context.max_async_events"});
$self->{async_event_count} = 0;
$self->{pending_async_events} = [];
$self->{running_async_event} = {};
$self->{verbose} = $options->{verbose};
$self->_init2a($options);
my $listen_socket = IO::Socket::INET->new(
Proto => "tcp",
# LocalAddr => $self->{host}, # allow both the "hostname" and "localhost" to be used
LocalPort => $self->{port},
Listen => SOMAXCONN,
ReuseAddr => 1,
) || die "Unable to listen on $self->{host}:$self->{port} - $!";
$self->{listen_socket} = $listen_socket;
my $listen_fd = fileno($listen_socket);
my $listen_vec;
vec($listen_vec, $listen_fd, 1) = 1;
$self->{listen_vec} = $listen_vec;
$self->{rpc_serializer} = $self->serializer("server_rpc", class => "App::Serializer::Perl", indent => 0);
if ($self->{options}{log_rotate}) {
my $rotate_sec = $self->{options}{log_rotate};
$rotate_sec = $rotate_sec*(24*3600) if ($rotate_sec <= 31);
my $time = time();
my $base_time = str2time(time2str("%Y-%m-%d 00:00:00", $time)); # I need a base which is midnight local time
my $next_rotate_time = ((int(($time - $base_time)/$rotate_sec)+1)*$rotate_sec) + $base_time;
$self->schedule_event(
tag => "context-log-rotation",
method => "log_file_open",
args => [0], # don't overwrite
time => $next_rotate_time,
interval => $rotate_sec, # and every X seconds hereafter
);
}
$self->_init2b($options);
&App::sub_exit() if ($App::trace);
}
sub _init2a {
&App::sub_entry if ($App::trace);
my ($self, $options) = @_;
&App::sub_exit() if ($App::trace);
}
sub _init2b {
&App::sub_entry if ($App::trace);
my ($self, $options) = @_;
&App::sub_exit() if ($App::trace);
}
sub close_listen_socket {
&App::sub_entry if ($App::trace);
my ($self) = @_;
if ($self->{listen_socket}) {
my $listen_socket = $self->{listen_socket};
my $listen_fd = fileno($listen_socket);
$self->log({level=>4},"Closed listen socket($listen_fd)\n");
$listen_socket->close();
$listen_socket = undef;
delete $self->{listen_socket};
delete $self->{listen_vec};
}
&App::sub_exit() if ($App::trace);
}
sub shutdown_unshareable_resources {
&App::sub_entry if ($App::trace);
my $self = shift;
$self->close_listen_socket();
$self->SUPER::shutdown_unshareable_resources();
&App::sub_exit() if ($App::trace);
}
sub shutdown {
&App::sub_entry if ($App::trace);
my $self = shift;
$self->close_listen_socket();
$self->shutdown_child_processes();
$self->SUPER::shutdown();
&App::sub_exit() if ($App::trace);
}
sub shutdown_child_processes {
&App::sub_entry if ($App::trace);
my $self = shift;
if ($self->{proc}) {
foreach my $pid (keys %{$self->{proc}}) {
kill(15, $pid);
}
}
&App::sub_exit() if ($App::trace);
}
sub DESTROY {
&App::sub_entry if ($App::trace);
my ($self) = @_;
$self->close_listen_socket();
&App::sub_exit() if ($App::trace);
}
sub dispatch_events {
&App::sub_entry if ($App::trace);
my ($self, $max_events_occurred) = @_;
my ($role, $port, $startup, $shutdown);
$self->dispatch_events_begin();
my $verbose = $self->{verbose};
my $options = $self->{options};
my $objects = $options->{init_objects};
my ($service_type, $name, $service);
foreach my $object (split(/ *[;,]+ */, $objects)) {
if ($object) {
if ($object =~ /^([A-Z][A-Za-z0-9]+)\.([A-Za-z0-9_-]+)$/) {
$service_type = $1;
$name = $2;
}
else {
$service_type = "SessionObject";
$name = $object;
}
$service = $self->service($service_type, $name); # instantiate it. that's all.
$self->log({level=>3},"$service_type $name instantiated [$service]\n");
$self->{main_service} = $service if (!$self->{main_service});
}
}
my $quit = 0;
$SIG{HUP} = sub { $self->log({level=>2},"Caught Signal: @_\n"); }; # SIG 1
$SIG{INT} = sub { $self->log({level=>2},"Caught Signal: @_ (quitting)\n"); $quit = 1; }; # SIG 2
$SIG{QUIT} = sub { $self->log({level=>2},"Caught Signal: @_ (quitting)\n"); $quit = 1; }; # SIG 3
$SIG{USR1} = sub { $self->log({level=>2},"Caught Signal: @_\n"); }; # SIG 10
$SIG{USR2} = sub { $self->log({level=>2},"Caught Signal: @_\n"); }; # SIG 12
$SIG{TERM} = sub { $self->log({level=>2},"Caught Signal: @_ (quitting)\n"); $quit = 1; }; # SIG 15
$SIG{CHLD} = "DEFAULT"; # SIG 17
my $default_sleep_interval = 15*60;
lib/App/Context/Server.pm view on Meta::CPAN
$state .= "\n";
$state .= "Running Async Events:\n";
my ($async_event, $event, $callback_event, @args, $args_str, $event_token, $runtime_event_token, $str);
foreach $runtime_event_token (sort keys %{$self->{running_async_event}}) {
$async_event = $self->{running_async_event}{$runtime_event_token};
($event, $callback_event) = @$async_event;
$str = "";
if ($main_service && $main_service->can("format_async_event")) {
$str = $main_service->format_async_event($event, $callback_event, $runtime_event_token);
}
if ($str) {
$state .= " ";
$state .= $main_service->format_async_event($event, $callback_event, $runtime_event_token);
$state .= "\n";
}
else {
@args = ();
@args = @{$event->{args}} if ($event->{args});
$args_str = join(",",@args);
$state .= sprintf(" %-20s %-20s %-24s", $event->{event_token}, $runtime_event_token, "$event->{name}.$event->{method}($args_str)");
if ($callback_event) {
@args = ();
@args = @{$callback_event->{args}} if ($callback_event->{args});
$args_str = join(",",@args);
$state .= "$callback_event->{name}.$callback_event->{method}($args_str)";
}
$state .= "\n";
}
}
$state .= "\n";
$state .= "Pending Async Events: count [$self->{async_event_count}]\n";
foreach $async_event (@{$self->{pending_async_events}}) {
($event, $callback_event) = @$async_event;
$str = "";
if ($main_service && $main_service->can("format_async_event")) {
$str = $main_service->format_async_event($event, $callback_event);
}
if ($str) {
$state .= " ";
$state .= $main_service->format_async_event($event, $callback_event);
$state .= "\n";
}
else {
@args = ();
@args = @{$event->{args}} if ($event->{args});
$args_str = join(",",@args);
$state .= sprintf(" %-20s %-40s", $event->{event_token}, "$event->{name}.$event->{method}($args_str)");
if ($callback_event) {
@args = ();
@args = @{$callback_event->{args}} if ($callback_event->{args});
$args_str = join(",",@args);
$state .= " => $callback_event->{name}.$callback_event->{method}($args_str)";
}
$state .= "\n";
}
}
$state .= "\n";
$state .= $self->SUPER::_state();
&App::sub_exit($state) if ($App::trace);
return($state);
}
# TODO: Implement this as a fork() or a context-level message to a node to fork().
# i.e. messages such as "EVENT:" and "EVENT-OK:"
# Save the callback_event according to an event_token.
# Then implement cleanup_pid to send the callback_event.
sub send_async_event {
&App::sub_entry if ($App::trace);
my ($self, $event, $callback_event) = @_;
my $event_token = $self->new_event_token();
$event->{event_token} = $event_token;
$callback_event->{event_token} = $event_token if ($callback_event);
push(@{$self->{pending_async_events}}, [ $event, $callback_event ]);
&App::sub_exit($event_token) if ($App::trace);
return($event_token);
}
sub new_event_token {
&App::sub_entry if ($App::trace);
my ($self) = @_;
$self->{async_event_count} ++;
my $event_token = "$self->{host}-$self->{port}-$self->{async_event_count}";
&App::sub_exit($event_token) if ($App::trace);
return($event_token);
}
sub dispatch_pending_async_events {
&App::sub_entry if ($App::trace);
my ($self, $max_events) = @_;
$max_events ||= 9999;
my $pending_async_events = $self->{pending_async_events};
my ($async_event, $assigned, $event, $in_process);
my $events_occurred = 0;
my $i = 0;
my $event_capacity_exists = 1;
my $max_i = $#$pending_async_events;
while ($i <= $max_i && $events_occurred < $max_events) {
$async_event = $pending_async_events->[$i];
$event = $async_event->[0];
if ($event->{destination}) {
$self->send_async_event_now(@$async_event);
$events_occurred ++;
splice(@$pending_async_events, $i, 1); # remove $pending_async_events->[$i]
$max_i--;
}
elsif ($event_capacity_exists) {
$assigned = $self->assign_event_destination($event);
if ($assigned) {
$self->send_async_event_now(@$async_event);
$events_occurred ++;
# keep $i the same
splice(@$pending_async_events, $i, 1); # remove $pending_async_events->[$i]
$max_i--;
}
else { # [undef] no servers are eligible for assignment
$event_capacity_exists = 0; # there's no sense looking at the other pending async events
lib/App/Context/Server.pm view on Meta::CPAN
}
else {
my $pid = $self->fork();
if (!$pid) { # running in child
my $exitval = 0;
my (@results);
eval {
@results = $self->send_event($event);
};
if ($@) {
@results = ($@);
}
if ($#results > -1 && defined $results[0] && $results[0] ne "") {
my $textfile = $self->{options}{prefix} . "/data/app/Context/$$";
if (open(FILE, "> $textfile")) {
print App::Context::Server::FILE @results;
close(App::Context::Server::FILE);
}
else {
$exitval = 1;
}
}
$self->shutdown();
$self->exit($exitval);
}
my $destination = $event->{destination} || "local";
$self->{num_async_events}++;
$self->{node}{$destination}{num_async_events}++;
my $runtime_event_token = $pid;
$self->{running_async_event}{$runtime_event_token} = [ $event, $callback_event ];
}
&App::sub_exit() if ($App::trace);
}
=head2 wait_for_event()
* Signature: $self->wait_for_event($event_token)
* Param: $event_token string
* Return: void
* Throws: App::Exception
* Since: 0.01
Sample Usage:
$self->wait_for_event($event_token);
The wait_for_event() method is called when an asynchronous event has been
sent and no more processing can be completed before it is done.
=cut
sub wait_for_event {
&App::sub_entry if ($App::trace);
my ($self, $event_token) = @_;
&App::sub_exit() if ($App::trace);
}
sub fork {
&App::sub_entry if ($App::trace);
my ($self) = @_;
my $pid = $self->SUPER::fork();
if ($pid) { # the parent process has a new child process
$self->{num_procs}++;
$self->{proc}{$pid} = {};
}
else { # the new child process has no sub-processes
$self->{num_procs} = 0;
$self->{proc} = {};
$SIG{INT} = sub { $self->log({level=>2},"Caught Signal: @_ (quitting)\n"); $self->exit(102); }; # SIG 2
$SIG{QUIT} = sub { $self->log({level=>2},"Caught Signal: @_ (quitting)\n"); $self->exit(103); }; # SIG 3
$SIG{TERM} = sub { $self->log({level=>2},"Caught Signal: @_ (quitting)\n"); $self->exit(115); }; # SIG 15
}
&App::sub_exit($pid) if ($App::trace);
return($pid);
}
sub finish_pid {
&App::sub_entry if ($App::trace);
my ($self, $pid, $exitval, $sig) = @_;
$self->{num_procs}--;
delete $self->{proc}{$pid};
my $runtime_event_token = $pid;
my $async_event = $self->{running_async_event}{$runtime_event_token};
if ($async_event) {
my ($event, $callback_event) = @$async_event;
my $returnval = "";
my $returnvalfile = $self->{options}{prefix} . "/data/app/Context/$pid";
if (open(FILE, $returnvalfile)) {
if ($callback_event) {
$returnval = join("",<App::Context::Server::FILE>);
}
close(App::Context::Server::FILE);
unlink($returnvalfile);
}
my $destination = $event->{destination} || "local";
$self->{num_async_events}--;
$self->{node}{$destination}{num_async_events}--;
delete $self->{running_async_event}{$runtime_event_token};
if ($callback_event) {
$callback_event->{args} = [] if (! $callback_event->{args});
my $errmsg = ($exitval || $sig) ? "Exit $exitval on $pid [sig=$sig]" : "";
push(@{$callback_event->{args}},
{event_token => $callback_event->{event_token}, returnval => $returnval, errnum => $exitval, errmsg => $errmsg});
$self->send_event($callback_event);
}
elsif ($sig == 9) { # killed without a chance to finish its work
$self->finish_killed_async_event($event);
}
}
&App::sub_exit() if ($App::trace);
}
sub finish_killed_async_event {
&App::sub_entry if ($App::trace);
my ($self, $event) = @_;
&App::sub_exit() if ($App::trace);
( run in 0.944 second using v1.01-cache-2.11-cpan-0bb4e1dffa6 )