SPVM-Go
view release on metacpan or search on metacpan
lib/SPVM/Go/Schedule.spvm view on Meta::CPAN
# Copyright (c) 2023 Yuki Kimoto
# MIT License
class Go::Schedule {
allow Go;
allow Go::Schedule::Task;
allow Go::Channel;
allow Go::Select;
allow Go::Poll;
use Go::Coroutine;
use List;
use Go::Schedule::Task;
use Sys::Time;
use Sys::Time::Timespec;
use Go::Poll;
use Thread::ID;
use Thread::ThisThread;
use Go::Error::IOTimeout;
use Error::System;
use Time::HiRes;
use Sys::Time::Util;
use Sys::Time::Constant as TIME;
has coroutines : List of Go::Coroutine;
has current_coroutine : Go::Coroutine;
has schedule_task_coroutine : Go::Coroutine;
has poll : Go::Poll;
has thread_id : Thread::ID;
# Class Methods
private static method new : Go::Schedule () {
my $self = new Go::Schedule;
my $coroutines = List->new(new Go::Coroutine[0]);;
$self->{coroutines} = $coroutines;
$self->{poll} = Go::Poll->new;
$self->{thread_id} = Thread::ThisThread->get_id;
return $self;
}
private static method clock_gettime : Sys::Time::Timespec () {
my $now_ts = (Sys::Time::Timespec)undef;
eval { $now_ts = Time::HiRes->clock_gettime_timespec(TIME->CLOCK_MONOTONIC); }
if ($@) {
if (eval_error_id is_error Error::System) {
$now_ts = Time::HiRes->clock_gettime_timespec(TIME->CLOCK_REALTIME);
}
else {
die eval_error_id, $@;
}
}
return $now_ts;
}
private static method is_over_deadline : int ($deadline_base : Sys::Time::Timespec, $after : double) {
unless ($deadline_base) {
die "\$deadline_base must be defined.";
}
my $after_ts = Sys::Time::Util->float_seconds_to_timespec($after);
my $deadline = Sys::Time::Util->add_timespec($deadline_base, $after_ts);
my $now_ts = &clock_gettime;
my $is_over_deadline = 0;
my $now_sec = $now_ts->tv_sec;
my $now_nsec = $now_ts->tv_nsec;
my $deadline_sec = $deadline->tv_sec;
my $deadline_nsec = $deadline->tv_nsec;
if ($now_sec > $deadline_sec || ($now_sec == $deadline_sec && $now_nsec > $deadline_nsec)) {
$is_over_deadline = 1;
}
return $is_over_deadline;
}
# Instance Methods
private method schedule_io_read : void ($fd : int, , $timeout : double = 0) {
my $after = 0;
my $is_write = 0;
$self->schedule($after, $fd, $is_write, $timeout);
}
private method schedule_io_write : void ($fd : int, , $timeout : double = 0) {
my $after = 0;
my $is_write = 1;
$self->schedule($after, $fd, $is_write, $timeout);
}
private method schedule : int ($after : double = 0, $fd : int = -1, $is_write : int = 0, $timeout : double = 0) {
my $called_from_main_thread = Thread::ID->eq(Thread::ThisThread->get_id, $self->{thread_id});
unless ($called_from_main_thread) {
die "The schedule method in the Go::Schedule class must be called from the main thread.";
}
my $current_coroutine = $self->{current_coroutine};
if ($current_coroutine) {
my $schedule_task_coroutine = $self->{schedule_task_coroutine};
unless ($after <= Fn->INT_MAX) {
die "\$after must be less than or equal to Fn->INT_MAX.";
}
if ($fd >= 0 && $after > 0) {
die "The arguments must not both \$fd is greater than or equal to 0 and \$after is greater than 0.";
}
# IO
if ($fd >= 0) {
$current_coroutine->{fd} = $fd;
$current_coroutine->{is_write} = (byte)$is_write;
unless ($timeout >= 0) {
die "\$timeout must be greater than or equal to 0.";
}
unless ($timeout <= Fn->INT_MAX) {
die "\$timeout must be less than or equal to Fn->INT_MAX.";
}
# IO timeout
if ($timeout > 0) {
$current_coroutine->{deadline_base_io_timeout} = &clock_gettime;
$current_coroutine->{io_timeout} = $timeout;
}
}
else {
$current_coroutine->{fd} = -1;
# Timer
if ($after > 0) {
$current_coroutine->{deadline_base_timer} = &clock_gettime;
$current_coroutine->{after} = $after;
}
}
Go::Coroutine->transfer($current_coroutine, $schedule_task_coroutine);
if ($current_coroutine->{io_timeout_occur}) {
$current_coroutine->{io_timeout_occur} = 0;
die Go::Error::IOTimeout "IO timeout occur";
}
}
else {
if ($after > 0) {
die "\$after(or \$timeout) must be given in a goroutine.";
}
my $return_back = Go::Coroutine->new;
my $schedule_task = Go::Schedule::Task->new;
$schedule_task->{schedule} = $self;
my $schedule_task_coroutine = Go::Coroutine->new($schedule_task);
$schedule_task_coroutine->{return_back} = $return_back;
$self->{schedule_task_coroutine} = $schedule_task_coroutine;
Go::Coroutine->transfer($return_back, $schedule_task_coroutine);
$schedule_task->{schedule} = undef;
$self->{schedule_task_coroutine} = undef;
$self->{current_coroutine} = undef;
$schedule_task = undef;
$schedule_task_coroutine->{task} = undef;
}
}
private method add_task : void ($task : Callback) {
my $coroutine = Go::Coroutine->new($task);
$self->{coroutines}->push($coroutine);
if (Go->ENV_DEBUG) {
Fn->say_stderr(Fn->sprintf("[Go Debug]Push new goroutine with the callback %p.", [(object)$task]));
}
}
}
( run in 0.785 second using v1.01-cache-2.11-cpan-39bf76dae61 )