App-Context

 view release on metacpan or  search on metacpan

lib/App/Context/POE/Server.pm  view on Meta::CPAN

#############################################################################
## $Id: Server.pm 6786 2006-08-11 23:22:48Z zroberts $
#############################################################################

package App::Context::POE::Server;
$VERSION = (q$Revision: 6786 $ =~ /(\d[\d\.]*)/)[0];  # VERSION numbers generated by svn

use strict;
use vars qw(@ISA);
use warnings;

use App::Context::POE;

@ISA = ( "App::Context::POE" );

use POSIX ":sys_wait_h";
use Sys::Hostname;
use Date::Format;
use Date::Parse;
use Time::HiRes qw(gettimeofday tv_interval);

#sub POE::Kernel::TRACE_STATISTICS  () { 1 }
#sub POE::Kernel::TRACE_PROFILE  () { 1 }
use POE;
use POE::Component::Server::SimpleHTTP;
use POE::Component::IKC::Server;
use HTTP::Status qw/RC_OK/;
use Socket qw(INADDR_ANY);
use Storable qw(lock_store lock_retrieve);

sub _init {
    &App::sub_entry if ($App::trace);
    my ($self, $options) = @_;
    $options = {} if (!defined $options);

    $self->SUPER::_init($options);

    App->mkdir($options->{prefix}, "data", "app", "Context");

    $| = 1;  # autoflush STDOUT (not sure this is required)

    ### Configuration stuff
    my $host = hostname;
    $self->{hostname} = $host;
    $host =~ s/\..*//;   # get rid of fully qualified domain name
    $self->{host} = $host;
    $options->{port}              ||= 8080;
    $self->{port}                   = $options->{port};
    $options->{http_port}         ||= $options->{port}+1;
    $self->{poe_kernel_name}        = "poe_$self->{host}_$self->{port}";
    $self->{poe_kernel_http_name}   = $self->{poe_kernel_name} . "_httpd";
    $self->{poe_session_name}       = "poe_session";
    $self->{poe_kernel}             = $poe_kernel;

    $self->{num_procs}              = 0;
    $self->{max_procs}              = $self->{options}{"app.context.max_procs"} || 10;
    $self->{max_async_events}       = $self->{options}{"app.context.max_async_events"}
        if (defined $self->{options}{"app.context.max_async_events"});
    $self->{max_async_events}     ||= 10;
    $self->{num_async_events}       = 0;
    $self->{async_event_count}      = 0;
    $self->{pending_async_events}   = [];
    $self->{running_async_event}    = {};
    $self->{poe_profile}            = $options->{poe_profile};
    $self->{poe_profile}            = 60 if ($self->{poe_profile} && $self->{poe_profile} == 1);
    $self->{poe_trace}              = $options->{poe_trace};

    $self->{verbose} = $options->{verbose};

    $self->{poe_states} = [qw(
        _start _stop _default poe_sigchld poe_sigterm poe_sigignore poe_shutdown poe_alarm poe_profile
        ikc_register ikc_unregister ikc_shutdown
        poe_run_event poe_event_loop_extension poe_dispatch_pending_async_events
        poe_server_state poe_http_server_state poe_debug poe_http_debug poe_http_test_run
        poe_enqueue_async_event poe_enqueue_async_event_finished poe_remote_async_event_finished
    )];
    $self->{poe_ikc_published_states} = [qw(
        poe_server_state
        poe_enqueue_async_event
        poe_remote_async_event_finished
    )];

    ### Does nothing by default, used by ClusterController, maybe other subclasses?
    $self->_init2a($options);

    ### Do log rotation
    ### TODO: this should be refactored out
    if ($self->{options}{log_rotate}) {
        my $rotate_sec = $self->{options}{log_rotate};
        $rotate_sec = $rotate_sec*(24*3600) if ($rotate_sec <= 31);      # interpret as days
        my $time = time();
        my $base_time = str2time(time2str("%Y-%m-%d 00:00:00", $time));  # I need a base which is midnight local time
        my $next_rotate_time = ((int(($time - $base_time)/$rotate_sec)+1)*$rotate_sec) + $base_time;
        $self->schedule_event(
            tag => "context-log-rotation",
            method => "log_file_open",

lib/App/Context/POE/Server.pm  view on Meta::CPAN

    ### Does nothing by default
    $self->_init2b($options);

    &App::sub_exit() if ($App::trace);
}

### Used by subclasses
sub _init2a {
    &App::sub_entry if ($App::trace);
    my ($self, $options) = @_;
    $self->_init_poe($options);
    &App::sub_exit() if ($App::trace);
}

sub _init_poe {
    &App::sub_entry if ($App::trace);
    my ($self, $options) = @_;

    ### Set up a server
    POE::Component::IKC::Server->spawn(
        port => $self->{port},
        name => $self->{poe_kernel_name},
    );
    $self->log({level=>3},"Listening for Inter-Kernel Communications on $self->{host}:$self->{port}\n") if $self->{options}{poe_ikc_debug};
    POE::Component::IKC::Responder->spawn();

    my $session_name = $self->{poe_session_name};
    POE::Component::Server::SimpleHTTP->new(
        'ALIAS'    => $self->{poe_kernel_http_name},
        'ADDRESS'  => INADDR_ANY,
        'PORT'     => $self->{options}{http_port},
        'HANDLERS' => [
            { 'DIR' => '/debug', 'SESSION' => $session_name, 'EVENT' => 'poe_http_debug', },
            { 'DIR' => '/testrun', 'SESSION' => $session_name, 'EVENT' => 'poe_http_test_run', },
            { 'DIR' => '.*', 'SESSION' => $session_name, 'EVENT' => 'poe_http_server_state', },
        ],
    );
    $self->log({level=>3},"Listening for HTTP Requests on $self->{host}:$self->{options}{http_port}\n") if $self->{poe_trace};

    &App::sub_exit() if ($App::trace);
}

### Used by subclasses
sub _init2b {
    &App::sub_entry if ($App::trace);
    my ($self, $options) = @_;
    &App::sub_exit() if ($App::trace);
}

sub shutdown {
    &App::sub_entry if ($App::trace);
    my $self = shift;

    ### Shut down servers
    ### TODO

    ### Shut down children
    $self->shutdown_child_processes();

    ### Call SUPER shutdown  
    $self->SUPER::shutdown();

    &App::sub_exit() if ($App::trace);
}

sub shutdown_child_processes {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    if ($self->{proc}) {
        foreach my $pid (keys %{$self->{proc}}) {
            kill(15, $pid);
        }
    }
    &App::sub_exit() if ($App::trace);
}

sub dispatch_events {
    &App::sub_entry if ($App::trace);
    my ($self, $max_events_occurred) = @_;

    my $verbose = $self->{verbose};
    my $options = $self->{options};

    my ($role, $port, $startup, $shutdown);
    $self->dispatch_events_begin();

    ### Set up init_objects, untouched and snagged from App::Context::POE::Server
    my $objects = $options->{init_objects};
    my ($service_type, $name, $service);
    foreach my $object (split(/ *[;,]+ */, $objects)) {
        if ($object) {
            if ($object =~ /^([A-Z][A-Za-z0-9]+)\.([A-Za-z0-9_-]+)$/) {
                $service_type = $1;
                $name = $2;
            }
            else {
                $service_type = "SessionObject";
                $name = $object;
            }
            $service = $self->service($service_type, $name);  # instantiate it. that's all.
            $self->log({level=>3},"dispatch_events: $service_type $name instantiated [$service]\n");
            $self->{main_service} = $service if (!$self->{main_service});
        }
    }

    eval {
        ### POE Server begins here
        POE::Session->create( object_states => [ $self => $self->{poe_states} ] );
        $poe_kernel->run();
    };
    if ($@) {
        $self->log({level=>2},$@);
    }

    $self->dispatch_events_end();
    $self->shutdown();
    &App::sub_exit() if ($App::trace);
}

sub dispatch_events_begin {
    &App::sub_entry if ($App::trace);

lib/App/Context/POE/Server.pm  view on Meta::CPAN

        $str = "";
        if ($main_service && $main_service->can("format_async_event")) {
            $str = $main_service->format_async_event($event, $callback_event, $runtime_event_token);
        }
        if ($str) {
            $state .= "   ";
            $state .= $main_service->format_async_event($event, $callback_event, $runtime_event_token);
            $state .= "\n";
        }
        else {
            @args = ();
            @args = @{$event->{args}} if ($event->{args});
            $args_str = join(",",@args);
            $state .= sprintf("   %-20s %-20s %-24s", $event->{event_token}, $runtime_event_token, "$event->{name}.$event->{method}($args_str)");
            if ($callback_event) {
                @args = ();
                @args = @{$callback_event->{args}} if ($callback_event->{args});
                $args_str = join(",",@args);
                $state .= "$callback_event->{name}.$callback_event->{method}($args_str)";
            }
            $state .= "\n";
        }
    }

    $state .= "\n";
    $state .= "Pending Async Events: count [$self->{async_event_count}]\n";
    foreach $async_event (@{$self->{pending_async_events}}) {
        ($event, $callback_event) = @$async_event;
        $str = "";
        if ($main_service && $main_service->can("format_async_event")) {
            $str = $main_service->format_async_event($event, $callback_event);
        }
        if ($str) {
            $state .= "   ";
            $state .= $main_service->format_async_event($event, $callback_event);
            $state .= "\n";
        }
        else {
            @args = ();
            @args = @{$event->{args}} if ($event->{args});
            $args_str = join(",",@args);
            $state .= sprintf("   %-20s %-40s", $event->{event_token}, "$event->{name}.$event->{method}($args_str)");
            if ($callback_event) {
                @args = ();
                @args = @{$callback_event->{args}} if ($callback_event->{args});
                $args_str = join(",",@args);
                $state .= " => $callback_event->{name}.$callback_event->{method}($args_str)";
            }
            $state .= "\n";
        }
    }

    $state .= "\n";

    ### Only enable this in development, requires a library uncomment as well
    $state .= $self->_state_poe();

    ### THIS DOESN'T WORK YET
    #$state .= $self->_state_q();

    $state .= $self->SUPER::_state();

    &App::sub_exit($state) if ($App::trace);
    return($state);
}

sub _state_poe {
    my ($self) = @_;
    my $state = "";

    ### POE state dumping - Currently commented out because it doesn't gain us much
    ### in the way of visibility, and POE::API::Peek is a CPAN pain
    ### UNCOMMENT THIS IF YOU NEED IT, DEPENDS ON A PAINFUL LIBRARY
    if ($self->{poe_peek}) {
        App->use("POE::API::Peek");
        my $api = POE::API::Peek->new();
        my @queue = $api->event_queue_dump();
        $state .= "POE event_queue_dump\n";
        my $first = 1;
        my $poe_stuff = [qw(ID index priority event type source destination)];
        for my $item (@queue) {
            if ($first) {
                $state .= sprintf("%7s %6s %20s %30s %15s %30s %30s\n", @$poe_stuff);
                $first = 0;
            }
            $state .= sprintf("%7d %6d %20f %30s %15s %30s %30s\n", @{$item}{@$poe_stuff});
        }
        $state .= "\n";
    }

    return $state;
}

### THIS DOESN'T WORK YET, THROWS AN EXCEPTION
sub _state_q {
    my $self = @_;
    my $HOTEL_SITE_QNAME    = "q-hotel_site";
    my $HOTEL_COMPUTE_QNAME = "q-hotel_compute";
    my $state = "";

    for my $qname ("q-hotel_site", "q-hotel_compute") {
        $state .= "$qname\n";
        ### EXCEPTION IS THROWN HERE
        my $q = $self->work_queue($qname);
        my $entries = $q->{data};
        foreach my $entry (@$entries) {
            $state .= sprintf("   {", join("|",%$entry), "}\n");
        }
        $state .= "\n";
    }
    $state .= "\n";

    return $state;
}

sub debug {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;

    my $datetime = time2str("%Y-%m-%d %H:%M:%S", time());
    my $debug = "DEBUG --- Server: $self->{host}:$self->{port}  procs[$self->{num_procs}/$self->{max_procs}:max]  async_events[$self->{num_async_events}/$self->{max_async_events}:max]\n[$datetime]\n";

lib/App/Context/POE/Server.pm  view on Meta::CPAN

            };
            if ($@) {
                @results = ($@);
            }
            if ($#results > -1 && defined $results[0] && $results[0] ne "") {
                my $ipc_file = $self->{options}{prefix} . "/data/app/Context/$$";
                ### Use Storable as IPC
                if ($self->{options}{poe_storable_ipc}) {
                    my $results = (@results == 1) ? $results[0] : \@results;
                    my $success = lock_store($results, $ipc_file);
                }
                ### Use a string value as IPC
                else {
                    if (open(FILE, "> $ipc_file")) {
                            print App::Context::POE::Server::FILE @results;
                            close(App::Context::POE::Server::FILE);
                    }
                    else {
                        $exitval = 1;
                    }
                }
            }
            $self->shutdown();
            $self->exit($exitval);
        }
        my $destination = $event->{destination} || "local";
        $self->{num_async_events}++;
        $self->{node}{$destination}{num_async_events}++;
        my $runtime_event_token = $pid;
        $self->{running_async_event}{$runtime_event_token} = [ $event, $callback_event ];
    }
    $self->profile_stop("send_async_event_now") if $self->{poe_profile};
    &App::sub_exit() if ($App::trace);
}
=head2 wait_for_event()

    * Signature: $self->wait_for_event($event_token)
    * Param:     $event_token     string
    * Return:    void
    * Throws:    App::Exception
    * Since:     0.01

    Sample Usage: 

    $self->wait_for_event($event_token);

The wait_for_event() method is called when an asynchronous event has been
sent and no more processing can be completed before it is done.

=cut

sub wait_for_event {
    &App::sub_entry if ($App::trace);
    my ($self, $event_token) = @_;
    &App::sub_exit() if ($App::trace);
}

sub fork {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;
    my $pid = $self->SUPER::fork();
    if ($pid) {  # the parent process has a new child process
        $self->{num_procs}++;
        $self->{proc}{$pid} = {};
    }
    else {  # the new child process has no sub-processes
        $self->{num_procs} = 0;
        $self->{proc} = {};
        $SIG{INT}  = sub { $self->log({level=>3},"fork: Caught Signal: @_ (quitting)\n"); $self->exit(102); };   # SIG  2
        $SIG{QUIT} = sub { $self->log({level=>3},"fork: Caught Signal: @_ (quitting)\n"); $self->exit(103); };   # SIG  3
        $SIG{TERM} = sub { $self->log({level=>3},"fork: Caught Signal: @_ (quitting)\n"); $self->shutdown(); $self->exit(115); };   # SIG 15
    }
    &App::sub_exit($pid) if ($App::trace);
    return($pid);
}

sub finish_pid {
    &App::sub_entry if ($App::trace);
    my ($self, $pid, $exitval, $sig) = @_;

    $self->{num_procs}--;
    delete $self->{proc}{$pid};

    my $runtime_event_token = $pid;
    my $async_event = $self->{running_async_event}{$runtime_event_token};
    if ($async_event) {
        my ($event, $callback_event) = @$async_event;
        my $returnval = "";
        my $ipc_file = $self->{options}{prefix} . "/data/app/Context/$pid";
        ### Use Storable as IPC
        if ($self->{options}{poe_storable_ipc}) {
            $returnval = lock_retrieve($ipc_file);
            unlink($ipc_file);
        }
        ### Use a string value as IPC
        else {
            if (open(FILE, $ipc_file)) {
                if ($callback_event) {
                    $returnval = join("",<App::Context::POE::Server::FILE>);
                }
                close(App::Context::POE::Server::FILE);
                unlink($ipc_file);
            }
        }

        my $destination = $event->{destination} || "local";
        $self->{num_async_events}--;
        $self->{node}{$destination}{num_async_events}--;
        delete $self->{running_async_event}{$runtime_event_token};

        if ($callback_event) {
            $callback_event->{args} = [] if (! $callback_event->{args});
            my $errmsg = ($exitval || $sig) ? "Exit $exitval on $pid [sig=$sig]" : "";
            push(@{$callback_event->{args}[2]{args}}, { event_token => $callback_event->{event_token},
                                                        returnval => $returnval,
                                                        errnum => $exitval,
                                                        errmsg => $errmsg });
            $self->send_event($callback_event);
        }
        elsif ($sig == 9) {  # killed without a chance to finish its work
            $self->finish_killed_async_event($event);



( run in 1.021 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )