App-EvalServerAdvanced

 view release on metacpan or  search on metacpan

lib/App/EvalServerAdvanced.pm  view on Meta::CPAN

package App::EvalServerAdvanced;

use strict;
our $VERSION = '0.024';

use IO::Async::Loop;
use IO::Async::Function;
use App::EvalServerAdvanced::Config;
use App::EvalServerAdvanced::Sandbox;
use App::EvalServerAdvanced::JobManager;
use Function::Parameters;
use App::EvalServerAdvanced::Protocol;
use App::EvalServerAdvanced::Log;
use Syntax::Keyword::Try;

use Data::Dumper;
use POSIX qw/_exit/;

use Moo;
use IPC::Run qw/harness/;

has loop => (is => 'ro', lazy => 1, default => sub {IO::Async::Loop->new()});
has _inited => (is => 'rw', default => 0);
has jobman => (is => 'ro', default => sub {App::EvalServerAdvanced::JobManager->new(loop => $_[0]->loop)});
has listener => (is => 'rw');

has session_counter => (is => 'rw', default => 0);
has sessions => (is => 'ro', default => sub {+{}});

method new_session_id() {
  my $c = $self->session_counter + 1;
  $self->session_counter($c);

  return $c;
}

method init() {
  return if $self->_inited();
  my $es_self = $self;

  my $listener = $self->loop->listen(
    service => config->evalserver->port,
    host => config->evalserver->host,
    socktype => 'stream',
    on_stream => fun ($stream) {
      my $session_id = $self->new_session_id;
      $self->sessions->{$session_id} = {}; # init the session

      my $close_session = sub {
        debug "Closing session $session_id! ";
        for my $sequence (keys $self->sessions->{$session_id}{jobs}->%*) {
          my $job = $self->sessions->{$session_id}{jobs}{$sequence};

          $job->{future}->fail("Session ended") unless $job->{future}->is_ready;
          $job->{canceled} = 1; # Mark them as canceled
        }

        delete $self->sessions->{$session_id}; # delete the session references
      };

      $stream->configure(
        on_read_eof => sub {debug "read_eof"; $close_session->()},
        on_write_eof => sub {debug "write_eof"; $close_session->()},

        on_read => method ($buffref, $eof) {
          my ($res, $message, $newbuf);
          do { # decode as many packets as we can
            ($res, $message, $newbuf) = eval{decode_message($$buffref)};
            debug sprintf("packet decode %d %d %d: %d", $res, length($message//''), length($newbuf//''), $eof);

            # We had an error when decoding the incoming packets, tell them and close the connection.
            if ($@) {
              debug "Session error, decoding packet. $@";
              my $message = encode_message(warning => {message => $@});
              $stream->write($message);
              $close_session->();



( run in 0.767 second using v1.01-cache-2.11-cpan-39bf76dae61 )