SPVM-Go

 view release on metacpan or  search on metacpan

lib/SPVM/Go/Schedule.spvm  view on Meta::CPAN

# Copyright (c) 2023 Yuki Kimoto
# MIT License

class Go::Schedule {
  allow Go;
  allow Go::Schedule::Task;
  allow Go::Channel;
  allow Go::Select;
  allow Go::Poll;
  
  use Go::Coroutine;
  use List;
  use Go::Schedule::Task;
  use Sys::Time;
  use Sys::Time::Timespec;
  use Go::Poll;
  use Thread::ID;
  use Thread::ThisThread;
  use Go::Error::IOTimeout;
  use Error::System;
  use Time::HiRes;
  use Sys::Time::Util;
  use Sys::Time::Constant as TIME;
  
  has coroutines : List of Go::Coroutine;
  
  has current_coroutine : Go::Coroutine;
  
  has schedule_task_coroutine : Go::Coroutine;
  
  has poll : Go::Poll;
  
  has thread_id : Thread::ID;
  
  # Class Methods
  private static method new : Go::Schedule () {
    
    my $self = new Go::Schedule;
    
    my $coroutines = List->new(new Go::Coroutine[0]);;
    
    $self->{coroutines} = $coroutines;
    
    $self->{poll} = Go::Poll->new;
    
    $self->{thread_id} = Thread::ThisThread->get_id;
    
    return $self;
  }
  
  private static method clock_gettime : Sys::Time::Timespec () {
    my $now_ts = (Sys::Time::Timespec)undef;
    
    eval { $now_ts = Time::HiRes->clock_gettime_timespec(TIME->CLOCK_MONOTONIC); }
    
    if ($@) {
      if (eval_error_id is_error Error::System) {
        $now_ts = Time::HiRes->clock_gettime_timespec(TIME->CLOCK_REALTIME);
      }
      else {
        die eval_error_id, $@;
      }
    }
    
    return $now_ts;
  }
  
  private static method is_over_deadline : int ($deadline_base : Sys::Time::Timespec, $after : double) {
    
    unless ($deadline_base) {
      die "\$deadline_base must be defined.";
    }
    
    my $after_ts = Sys::Time::Util->float_seconds_to_timespec($after);
    
    my $deadline = Sys::Time::Util->add_timespec($deadline_base, $after_ts);
    
    my $now_ts = &clock_gettime;
    
    my $is_over_deadline = 0;
    
    my $now_sec = $now_ts->tv_sec;
    my $now_nsec = $now_ts->tv_nsec;
    
    my $deadline_sec = $deadline->tv_sec;
    my $deadline_nsec = $deadline->tv_nsec;
    
    if ($now_sec > $deadline_sec || ($now_sec == $deadline_sec && $now_nsec > $deadline_nsec)) {
      $is_over_deadline = 1;
    }
    
    return $is_over_deadline;
  }
  
  # Instance Methods
  private method schedule_io_read : void ($fd : int, , $timeout : double = 0) {
    
    my $after = 0;
    
    my $is_write = 0;
    
    $self->schedule($after, $fd, $is_write, $timeout);
  }
  
  private method schedule_io_write : void ($fd : int, , $timeout : double = 0) {
    
    my $after = 0;
    
    my $is_write = 1;
    
    $self->schedule($after, $fd, $is_write, $timeout);
  }
  
  private method schedule : int ($after : double = 0, $fd : int = -1, $is_write : int = 0, $timeout : double = 0) {
    
    my $called_from_main_thread = Thread::ID->eq(Thread::ThisThread->get_id, $self->{thread_id});
    
    unless ($called_from_main_thread) {
      die "The schedule method in the Go::Schedule class must be called from the main thread.";
    }
    
    my $current_coroutine = $self->{current_coroutine};
    
    if ($current_coroutine) {
      
      my $schedule_task_coroutine = $self->{schedule_task_coroutine};
      
      unless ($after <= Fn->INT_MAX) {
        die "\$after must be less than or equal to Fn->INT_MAX.";
      }
      
      if ($fd >= 0 && $after > 0) {
        die "The arguments must not both \$fd is greater than or equal to 0 and \$after is greater than 0.";
      }
      
      # IO
      if ($fd >= 0) {
        $current_coroutine->{fd} = $fd;
        
        $current_coroutine->{is_write} = (byte)$is_write;
        
        unless ($timeout >= 0) {
          die "\$timeout must be greater than or equal to 0.";
        }
          
        unless ($timeout <= Fn->INT_MAX) {
          die "\$timeout must be less than or equal to Fn->INT_MAX.";
        }
        
        # IO timeout
        if ($timeout > 0) {
          
          $current_coroutine->{deadline_base_io_timeout} = &clock_gettime;
          $current_coroutine->{io_timeout} = $timeout;
        }
      }
      else {
        $current_coroutine->{fd} = -1;
        
        # Timer
        if ($after > 0) {
          $current_coroutine->{deadline_base_timer} = &clock_gettime;
          $current_coroutine->{after} = $after;
        }
      }
      
      Go::Coroutine->transfer($current_coroutine, $schedule_task_coroutine);
      
      if ($current_coroutine->{io_timeout_occur}) {
        $current_coroutine->{io_timeout_occur} = 0;
        die Go::Error::IOTimeout "IO timeout occur";
      }
    }
    else {
      if ($after > 0) {
        die "\$after(or \$timeout) must be given in a goroutine.";
      }
      
      my $return_back = Go::Coroutine->new;
      
      my $schedule_task = Go::Schedule::Task->new;
      
      $schedule_task->{schedule} = $self;
      
      my $schedule_task_coroutine = Go::Coroutine->new($schedule_task);
      
      $schedule_task_coroutine->{return_back} = $return_back;
      
      $self->{schedule_task_coroutine} = $schedule_task_coroutine;
      
      Go::Coroutine->transfer($return_back, $schedule_task_coroutine);
      
      $schedule_task->{schedule} = undef;
      
      $self->{schedule_task_coroutine} = undef;
      
      $self->{current_coroutine} = undef;
      
      $schedule_task = undef;
      
      $schedule_task_coroutine->{task} = undef;
    }
  }
  
  private method add_task : void ($task : Callback) {
    
    my $coroutine = Go::Coroutine->new($task);
    
    $self->{coroutines}->push($coroutine);
    
    if (Go->ENV_DEBUG) {
      Fn->say_stderr(Fn->sprintf("[Go Debug]Push new goroutine with the callback %p.", [(object)$task]));
    }
    
  }
  
}



( run in 0.785 second using v1.01-cache-2.11-cpan-39bf76dae61 )