SPVM-Go
view release on metacpan or search on metacpan
lib/SPVM/Go/Channel.spvm view on Meta::CPAN
has length : int;
has closed : int;
has read_waits : List of Go::Channel::Wait;
has write_waits : List of Go::Channel::Wait;
has schedule : Go::Schedule;
# Class Methods
private static method new : Go::Channel ($options : object[] = undef) {
Fn->check_option_names($options, ["capacity", "schedule"]);
my $options_h = Hash->new($options);
my $self = new Go::Channel;
my $capacity = $options_h->delete_or_default_int("capacity", 0);
$self->{capacity} = $capacity;
unless ($capacity >= 0) {
die "The \"capacity\" option must be greater than or equal to 0.";
}
my $schedule = (Go::Schedule)$options_h->delete_or_default("schedule", undef);
$self->{schedule} = $schedule;
unless ($schedule) {
die "The \"schedule\" option must be defined.";
}
$self->{read_waits} = List->new(new Go::Channel::Wait[0]);
$self->{write_waits} = List->new(new Go::Channel::Wait[0]);
return $self;
}
# Instance Methods
method read : object ($ok_ref : int*) {
$$ok_ref = 1;
my $closed = $self->{closed};
if ($closed && $self->{write_waits}->length == 0) {
$$ok_ref = 0;
return undef;
}
if ($self->{write_waits}->length) {
my $write_wait = (Go::Channel::Wait)$self->{write_waits}->shift;
my $ok = 1;
&finish_wait($write_wait, $ok);
my $write_wait_coroutine = $write_wait->{coroutine};
unless ($write_wait_coroutine) {
$self->{length}--;
}
my $value = $write_wait->{value};
return $value;
}
else {
my $read_wait = Go::Channel::Wait->new;
my $schedule = $self->{schedule};
my $current_coroutine = $schedule->{current_coroutine};
unless ($current_coroutine) {
die "There is no currently running goroutine.";
}
$current_coroutine->{disable} = 1;
$read_wait->{coroutine} = $current_coroutine;
$self->{read_waits}->push($read_wait);
while (1) {
my $finish = $read_wait->{finish};
if ($finish) {
my $value = $read_wait->{value};
$$ok_ref = $read_wait->{ok};
return $value;
}
if ($self->{write_waits}->length) {
if ($read_wait == $self->{read_waits}->get(0)) {
my $write_wait = (Go::Channel::Wait)$self->{write_waits}->shift;
my $ok = 1;
&finish_wait($write_wait, $ok);
my $value = $write_wait->{value};
my $write_wait_coroutine = $write_wait->{coroutine};
unless ($write_wait_coroutine) {
$self->{length}--;
}
$self->{read_waits}->shift;
return $value;
}
}
$schedule->schedule;
}
}
return undef;
}
method write : void ($value : object) {
my $closed = $self->{closed};
if ($closed) {
die "This channel is closed.";
}
my $length = $self->{length};
my $capacity = $self->{capacity};
if ($length < $capacity) {
if ($self->{read_waits}->length) {
my $read_wait = (Go::Channel::Wait)$self->{read_waits}->shift;
$read_wait->{finish} = 1;
$read_wait->{ok} = 1;
my $read_wait_coroutine = $read_wait->{coroutine};
if ($read_wait_coroutine) {
$read_wait_coroutine->{disable} = 0;
}
$read_wait->{value} = $value;
return;
}
else {
my $write_wait = Go::Channel::Wait->new;
$write_wait->{value} = $value;
$self->{write_waits}->push($write_wait);
$self->{length}++;
}
}
else {
if ($self->{read_waits}->length) {
my $read_wait = (Go::Channel::Wait)$self->{read_waits}->shift;
my $ok = 1;
&finish_wait($read_wait, $ok);
$read_wait->{value} = $value;
return;
}
else {
my $write_wait = Go::Channel::Wait->new;
$write_wait->{value} = $value;
my $schedule = $self->{schedule};
my $current_coroutine = $schedule->{current_coroutine};
unless ($current_coroutine) {
die "There is no currently running goroutine.";
}
$current_coroutine->{disable} = 1;
$write_wait->{coroutine} = $current_coroutine;
$self->{write_waits}->push($write_wait);
while (1) {
my $finish = $write_wait->{finish};
if ($finish) {
return;
}
$schedule->schedule;
}
}
}
}
method close : void () {
my $closed = $self->{closed};
if ($closed) {
die "This channel is already closed.";
}
my $read_waits = $self->{read_waits};
while ($read_waits->length > 0) {
my $read_wait = (Go::Channel::Wait)$read_waits->shift;
my $ok = 0;
&finish_wait($read_wait, $ok);
}
$self->{closed} = 1;
}
method cap : int () {
return $self->{capacity};
}
method len : int () {
return $self->{length};
}
private static method finish_wait : void ($wait : Go::Channel::Wait, $ok : int) {
$wait->{finish} = 1;
$wait->{ok} = (byte)$ok;
my $wait_coroutine = $wait->{coroutine};
if ($wait_coroutine) {
$wait_coroutine->{disable} = 0;
}
}
}
( run in 0.723 second using v1.01-cache-2.11-cpan-39bf76dae61 )