App-Context

 view release on metacpan or  search on metacpan

lib/App/Context/Server.pm  view on Meta::CPAN


#############################################################################
## $Id: Server.pm 9819 2007-08-03 19:34:40Z spadkins $
#############################################################################

package App::Context::Server;
$VERSION = (q$Revision: 9819 $ =~ /(\d[\d\.]*)/)[0];  # VERSION numbers generated by svn

use App;
use App::Context;

@ISA = ( "App::Context" );

use Sys::Hostname;
use Socket;
use IO::Socket;
use IO::Socket::INET;
use POSIX ":sys_wait_h";
use Date::Format;
use Date::Parse;

use strict;

=head1 NAME

App::Context::Server - a runtime environment with a single master server and its subprocesses

=head1 SYNOPSIS

   # ... official way to get a Context object ...
   use App;
   $context = App->context();
   $config = $context->config();   # get the configuration
   $config->dispatch_events();     # dispatch events

   # ... alternative way (used internally) ...
   use App::Context::Server;
   $context = App::Context::Server->new();

=cut

sub _init {
    &App::sub_entry if ($App::trace);
    my ($self, $options) = @_;
    $options = {} if (!defined $options);

    $self->SUPER::_init($options);

    App->mkdir($options->{prefix}, "data", "app", "Context");

    $| = 1;  # autoflush STDOUT (not sure this is required)
    open(STDERR, ">&STDOUT") || die "Unable to redirect STDERR to STDOUT";

    my $host = hostname;
    $self->{hostname} = $host;
    $host =~ s/\..*//;   # get rid of fully qualified domain name
    $self->{host} = $host;
    $self->{port} = $options->{port} || 8080;

    $self->{num_procs} = 0;
    $self->{max_procs} = $self->{options}{"app.context.max_procs"} || 10;
    $self->{max_async_events} = $self->{options}{"app.context.max_async_events"}
        if (defined $self->{options}{"app.context.max_async_events"});
    $self->{async_event_count} = 0;
    $self->{pending_async_events} = [];
    $self->{running_async_event} = {};

    $self->{verbose} = $options->{verbose};

    $self->_init2a($options);

    my $listen_socket = IO::Socket::INET->new(
        Proto     => "tcp",
        # LocalAddr => $self->{host},  # allow both the "hostname" and "localhost" to be used
        LocalPort => $self->{port},
        Listen    => SOMAXCONN,
        ReuseAddr => 1,
    ) || die "Unable to listen on $self->{host}:$self->{port} - $!";

    $self->{listen_socket} = $listen_socket;
    my $listen_fd = fileno($listen_socket);
    my $listen_vec;
    vec($listen_vec, $listen_fd, 1) = 1;
    $self->{listen_vec} = $listen_vec;

    $self->{rpc_serializer} = $self->serializer("server_rpc", class => "App::Serializer::Perl", indent => 0);

    if ($self->{options}{log_rotate}) {
        my $rotate_sec = $self->{options}{log_rotate};
        $rotate_sec = $rotate_sec*(24*3600) if ($rotate_sec <= 31);
        my $time = time();
        my $base_time = str2time(time2str("%Y-%m-%d 00:00:00", $time));  # I need a base which is midnight local time
        my $next_rotate_time = ((int(($time - $base_time)/$rotate_sec)+1)*$rotate_sec) + $base_time;
        $self->schedule_event(
            tag => "context-log-rotation",
            method => "log_file_open",
            args => [0], # don't overwrite
            time => $next_rotate_time,
            interval => $rotate_sec,  # and every X seconds hereafter
        );
    }

    $self->_init2b($options);

    &App::sub_exit() if ($App::trace);
}

sub _init2a {
    &App::sub_entry if ($App::trace);
    my ($self, $options) = @_;
    &App::sub_exit() if ($App::trace);
}

sub _init2b {
    &App::sub_entry if ($App::trace);
    my ($self, $options) = @_;
    &App::sub_exit() if ($App::trace);
}

sub close_listen_socket {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;
    if ($self->{listen_socket}) {
        my $listen_socket = $self->{listen_socket};
        my $listen_fd = fileno($listen_socket);
        $self->log({level=>4},"Closed listen socket($listen_fd)\n");
        $listen_socket->close();
        $listen_socket = undef;
        delete $self->{listen_socket};
        delete $self->{listen_vec};
    }
    &App::sub_exit() if ($App::trace);
}

sub shutdown_unshareable_resources {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    $self->close_listen_socket();
    $self->SUPER::shutdown_unshareable_resources();
    &App::sub_exit() if ($App::trace);
}

sub shutdown {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    $self->close_listen_socket();
    $self->shutdown_child_processes();
    $self->SUPER::shutdown();
    &App::sub_exit() if ($App::trace);
}

sub shutdown_child_processes {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    if ($self->{proc}) {
        foreach my $pid (keys %{$self->{proc}}) {
            kill(15, $pid);
        }
    }
    &App::sub_exit() if ($App::trace);
}

sub DESTROY {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;
    $self->close_listen_socket();
    &App::sub_exit() if ($App::trace);
}

sub dispatch_events {
    &App::sub_entry if ($App::trace);
    my ($self, $max_events_occurred) = @_;

    my ($role, $port, $startup, $shutdown);
    $self->dispatch_events_begin();

    my $verbose = $self->{verbose};

    my $options = $self->{options};
    my $objects = $options->{init_objects};
    my ($service_type, $name, $service);
    foreach my $object (split(/ *[;,]+ */, $objects)) {
        if ($object) {
            if ($object =~ /^([A-Z][A-Za-z0-9]+)\.([A-Za-z0-9_-]+)$/) {
                $service_type = $1;
                $name = $2;
            }
            else {
                $service_type = "SessionObject";
                $name = $object;
            }
            $service = $self->service($service_type, $name);  # instantiate it. that's all.
            $self->log({level=>3},"$service_type $name instantiated [$service]\n");
            $self->{main_service} = $service if (!$self->{main_service});
        }
    }

    my $quit = 0;

    $SIG{HUP}  = sub { $self->log({level=>2},"Caught Signal: @_\n"); };                         # SIG  1
    $SIG{INT}  = sub { $self->log({level=>2},"Caught Signal: @_ (quitting)\n"); $quit = 1; };   # SIG  2
    $SIG{QUIT} = sub { $self->log({level=>2},"Caught Signal: @_ (quitting)\n"); $quit = 1; };   # SIG  3
    $SIG{USR1} = sub { $self->log({level=>2},"Caught Signal: @_\n"); };                         # SIG 10
    $SIG{USR2} = sub { $self->log({level=>2},"Caught Signal: @_\n"); };                         # SIG 12
    $SIG{TERM} = sub { $self->log({level=>2},"Caught Signal: @_ (quitting)\n"); $quit = 1; };   # SIG 15
    $SIG{CHLD} = "DEFAULT";                                                                     # SIG 17

    my $default_sleep_interval = 15*60;

lib/App/Context/Server.pm  view on Meta::CPAN

    $state .= "\n";
    $state .= "Running Async Events:\n";
    my ($async_event, $event, $callback_event, @args, $args_str, $event_token, $runtime_event_token, $str);
    foreach $runtime_event_token (sort keys %{$self->{running_async_event}}) {
        $async_event = $self->{running_async_event}{$runtime_event_token};
        ($event, $callback_event) = @$async_event;
        $str = "";
        if ($main_service && $main_service->can("format_async_event")) {
            $str = $main_service->format_async_event($event, $callback_event, $runtime_event_token);
        }
        if ($str) {
            $state .= "   ";
            $state .= $main_service->format_async_event($event, $callback_event, $runtime_event_token);
            $state .= "\n";
        }
        else {
            @args = ();
            @args = @{$event->{args}} if ($event->{args});
            $args_str = join(",",@args);
            $state .= sprintf("   %-20s %-20s %-24s", $event->{event_token}, $runtime_event_token, "$event->{name}.$event->{method}($args_str)");
            if ($callback_event) {
                @args = ();
                @args = @{$callback_event->{args}} if ($callback_event->{args});
                $args_str = join(",",@args);
                $state .= "$callback_event->{name}.$callback_event->{method}($args_str)";
            }
            $state .= "\n";
        }
    }

    $state .= "\n";
    $state .= "Pending Async Events: count [$self->{async_event_count}]\n";
    foreach $async_event (@{$self->{pending_async_events}}) {
        ($event, $callback_event) = @$async_event;
        $str = "";
        if ($main_service && $main_service->can("format_async_event")) {
            $str = $main_service->format_async_event($event, $callback_event);
        }
        if ($str) {
            $state .= "   ";
            $state .= $main_service->format_async_event($event, $callback_event);
            $state .= "\n";
        }
        else {
            @args = ();
            @args = @{$event->{args}} if ($event->{args});
            $args_str = join(",",@args);
            $state .= sprintf("   %-20s %-40s", $event->{event_token}, "$event->{name}.$event->{method}($args_str)");
            if ($callback_event) {
                @args = ();
                @args = @{$callback_event->{args}} if ($callback_event->{args});
                $args_str = join(",",@args);
                $state .= " => $callback_event->{name}.$callback_event->{method}($args_str)";
            }
            $state .= "\n";
        }
    }

    $state .= "\n";

    $state .= $self->SUPER::_state();

    &App::sub_exit($state) if ($App::trace);
    return($state);
}

# TODO: Implement this as a fork() or a context-level message to a node to fork().
#       i.e. messages such as "EVENT:" and "EVENT-OK:"
#       Save the callback_event according to an event_token.
#       Then implement cleanup_pid to send the callback_event.

sub send_async_event {
    &App::sub_entry if ($App::trace);
    my ($self, $event, $callback_event) = @_;
    my $event_token = $self->new_event_token();
    $event->{event_token} = $event_token;
    $callback_event->{event_token} = $event_token if ($callback_event);
    push(@{$self->{pending_async_events}}, [ $event, $callback_event ]);
    &App::sub_exit($event_token) if ($App::trace);
    return($event_token);
}

sub new_event_token {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;
    $self->{async_event_count} ++;
    my $event_token = "$self->{host}-$self->{port}-$self->{async_event_count}";
    &App::sub_exit($event_token) if ($App::trace);
    return($event_token);
}

sub dispatch_pending_async_events {
    &App::sub_entry if ($App::trace);
    my ($self, $max_events) = @_;
    $max_events ||= 9999;
    my $pending_async_events = $self->{pending_async_events};
    my ($async_event, $assigned, $event, $in_process);
    my $events_occurred = 0;
    my $i = 0;
    my $event_capacity_exists = 1;
    my $max_i = $#$pending_async_events;
    while ($i <= $max_i && $events_occurred < $max_events) {
        $async_event = $pending_async_events->[$i];
        $event = $async_event->[0];
        if ($event->{destination}) {
            $self->send_async_event_now(@$async_event);
            $events_occurred ++;
            splice(@$pending_async_events, $i, 1);  # remove $pending_async_events->[$i]
            $max_i--;
        }
        elsif ($event_capacity_exists) {
            $assigned = $self->assign_event_destination($event);
            if ($assigned) {
                $self->send_async_event_now(@$async_event);
                $events_occurred ++;
                # keep $i the same
                splice(@$pending_async_events, $i, 1);  # remove $pending_async_events->[$i]
                $max_i--;
            }
            else {   # [undef] no servers are eligible for assignment
                $event_capacity_exists = 0;   # there's no sense looking at the other pending async events

lib/App/Context/Server.pm  view on Meta::CPAN

    }
    else {
        my $pid = $self->fork();
        if (!$pid) {   # running in child
            my $exitval = 0;
            my (@results);
            eval {
                @results = $self->send_event($event);
            };
            if ($@) {
                @results = ($@);
            }
            if ($#results > -1 && defined $results[0] && $results[0] ne "") {
                my $textfile = $self->{options}{prefix} . "/data/app/Context/$$";
                if (open(FILE, "> $textfile")) {
                    print App::Context::Server::FILE @results;
                    close(App::Context::Server::FILE);
                }
                else {
                    $exitval = 1;
                }
            }
            $self->shutdown();
            $self->exit($exitval);
        }
        my $destination = $event->{destination} || "local";
        $self->{num_async_events}++;
        $self->{node}{$destination}{num_async_events}++;
        my $runtime_event_token = $pid;
        $self->{running_async_event}{$runtime_event_token} = [ $event, $callback_event ];
    }
    &App::sub_exit() if ($App::trace);
}

=head2 wait_for_event()

    * Signature: $self->wait_for_event($event_token)
    * Param:     $event_token     string
    * Return:    void
    * Throws:    App::Exception
    * Since:     0.01

    Sample Usage: 

    $self->wait_for_event($event_token);

The wait_for_event() method is called when an asynchronous event has been
sent and no more processing can be completed before it is done.

=cut

sub wait_for_event {
    &App::sub_entry if ($App::trace);
    my ($self, $event_token) = @_;
    &App::sub_exit() if ($App::trace);
}

sub fork {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;
    my $pid = $self->SUPER::fork();
    if ($pid) {  # the parent process has a new child process
        $self->{num_procs}++;
        $self->{proc}{$pid} = {};
    }
    else {  # the new child process has no sub-processes
        $self->{num_procs} = 0;
        $self->{proc} = {};
        $SIG{INT}  = sub { $self->log({level=>2},"Caught Signal: @_ (quitting)\n"); $self->exit(102); };   # SIG  2
        $SIG{QUIT} = sub { $self->log({level=>2},"Caught Signal: @_ (quitting)\n"); $self->exit(103); };   # SIG  3
        $SIG{TERM} = sub { $self->log({level=>2},"Caught Signal: @_ (quitting)\n"); $self->exit(115); };   # SIG 15
    }
    &App::sub_exit($pid) if ($App::trace);
    return($pid);
}

sub finish_pid {
    &App::sub_entry if ($App::trace);
    my ($self, $pid, $exitval, $sig) = @_;

    $self->{num_procs}--;
    delete $self->{proc}{$pid};

    my $runtime_event_token = $pid;
    my $async_event = $self->{running_async_event}{$runtime_event_token};
    if ($async_event) {
        my ($event, $callback_event) = @$async_event;
        my $returnval = "";
        my $returnvalfile = $self->{options}{prefix} . "/data/app/Context/$pid";
        if (open(FILE, $returnvalfile)) {
            if ($callback_event) {
                $returnval = join("",<App::Context::Server::FILE>);
            }
            close(App::Context::Server::FILE);
            unlink($returnvalfile);
        }

        my $destination = $event->{destination} || "local";
        $self->{num_async_events}--;
        $self->{node}{$destination}{num_async_events}--;
        delete $self->{running_async_event}{$runtime_event_token};

        if ($callback_event) {
            $callback_event->{args} = [] if (! $callback_event->{args});
            my $errmsg = ($exitval || $sig) ? "Exit $exitval on $pid [sig=$sig]" : "";
            push(@{$callback_event->{args}},
                {event_token => $callback_event->{event_token}, returnval => $returnval, errnum => $exitval, errmsg => $errmsg});
            $self->send_event($callback_event);
        }
        elsif ($sig == 9) {  # killed without a chance to finish its work
            $self->finish_killed_async_event($event);
        }
    }

    &App::sub_exit() if ($App::trace);
}

sub finish_killed_async_event {
    &App::sub_entry if ($App::trace);
    my ($self, $event) = @_;
    &App::sub_exit() if ($App::trace);



( run in 0.944 second using v1.01-cache-2.11-cpan-0bb4e1dffa6 )