SPVM-Go

 view release on metacpan or  search on metacpan

lib/SPVM/Go/Channel.spvm  view on Meta::CPAN

  
  has length : int;
  
  has closed : int;
  
  has read_waits : List of Go::Channel::Wait;
  
  has write_waits : List of Go::Channel::Wait;
  
  has schedule : Go::Schedule;
  
  # Class Methods
  private static method new : Go::Channel ($options : object[] = undef) {
    
    Fn->check_option_names($options, ["capacity", "schedule"]);
    
    my $options_h = Hash->new($options);
    
    my $self = new Go::Channel;
    
    my $capacity = $options_h->delete_or_default_int("capacity", 0);
    $self->{capacity} = $capacity;
    
    unless ($capacity >= 0) {
      die "The \"capacity\" option must be greater than or equal to 0.";
    }
    
    my $schedule = (Go::Schedule)$options_h->delete_or_default("schedule", undef);
    $self->{schedule} = $schedule;
    
    unless ($schedule) {
      die "The \"schedule\" option must be defined.";
    }
    
    $self->{read_waits} = List->new(new Go::Channel::Wait[0]);
    
    $self->{write_waits} = List->new(new Go::Channel::Wait[0]);
    
    return $self;
  }
  
  # Instance Methods
  method read : object ($ok_ref : int*) {
    
    $$ok_ref = 1;
    
    my $closed = $self->{closed};
    
    if ($closed && $self->{write_waits}->length == 0) {
      $$ok_ref = 0;
      return undef;
    }
    
    if ($self->{write_waits}->length) {
      my $write_wait = (Go::Channel::Wait)$self->{write_waits}->shift;
      
      my $ok = 1;
      
      &finish_wait($write_wait, $ok);
      
      my $write_wait_coroutine = $write_wait->{coroutine};
      
      unless ($write_wait_coroutine) {
        $self->{length}--;
      }
      
      my $value = $write_wait->{value};
      
      return $value;
    }
    else {
      my $read_wait = Go::Channel::Wait->new;
      
      my $schedule = $self->{schedule};
      
      my $current_coroutine = $schedule->{current_coroutine};
      
      unless ($current_coroutine) {
        die "There is no currently running goroutine.";
      }
      
      $current_coroutine->{disable} = 1;
      
      $read_wait->{coroutine} = $current_coroutine;
      
      $self->{read_waits}->push($read_wait);
      
      while (1) {
        
        my $finish = $read_wait->{finish};
        
        if ($finish) {
          my $value = $read_wait->{value};
          
          $$ok_ref = $read_wait->{ok};
          
          return $value;
        }
        
        if ($self->{write_waits}->length) {
          if ($read_wait == $self->{read_waits}->get(0)) {
            my $write_wait = (Go::Channel::Wait)$self->{write_waits}->shift;
            
            my $ok = 1;
            
            &finish_wait($write_wait, $ok);
            
            my $value = $write_wait->{value};
            
            my $write_wait_coroutine = $write_wait->{coroutine};
            
            unless ($write_wait_coroutine) {
              $self->{length}--;
            }
            
            $self->{read_waits}->shift;
            
            return $value;
          }
        }
        
        $schedule->schedule;
      }
    }
    
    return undef;
  }
  
  method write : void ($value : object) {
    my $closed = $self->{closed};
    
    if ($closed) {
      die "This channel is closed.";
    }
    
    my $length = $self->{length};
    
    my $capacity = $self->{capacity};
    
    if ($length < $capacity) {
      if ($self->{read_waits}->length) {
        my $read_wait = (Go::Channel::Wait)$self->{read_waits}->shift;
        
        $read_wait->{finish} = 1;
        
        $read_wait->{ok} = 1;
        
        my $read_wait_coroutine = $read_wait->{coroutine};
        
        if ($read_wait_coroutine) {
          $read_wait_coroutine->{disable} = 0;
        }
        
        $read_wait->{value} = $value;
        
        return;
      }
      else {
        my $write_wait = Go::Channel::Wait->new;
        
        $write_wait->{value} = $value;
        
        $self->{write_waits}->push($write_wait);
        
        $self->{length}++;
      }
    }
    else {
      if ($self->{read_waits}->length) {
        my $read_wait = (Go::Channel::Wait)$self->{read_waits}->shift;
        
        my $ok = 1;
        
        &finish_wait($read_wait, $ok);
        
        $read_wait->{value} = $value;
        
        return;
      }
      else {
        my $write_wait = Go::Channel::Wait->new;
        
        $write_wait->{value} = $value;
        
        my $schedule = $self->{schedule};
        
        my $current_coroutine = $schedule->{current_coroutine};
        
        unless ($current_coroutine) {
          die "There is no currently running goroutine.";
        }
        
        $current_coroutine->{disable} = 1;
        
        $write_wait->{coroutine} = $current_coroutine;
        
        $self->{write_waits}->push($write_wait);
        
        while (1) {
          
          my $finish = $write_wait->{finish};
          
          if ($finish) {
            return;
          }
          
          $schedule->schedule;
        }
      }
    }
  }
  
  method close : void () {
    my $closed = $self->{closed};
    
    if ($closed) {
      die "This channel is already closed.";
    }
    
    my $read_waits = $self->{read_waits};
    
    while ($read_waits->length > 0) {
      my $read_wait = (Go::Channel::Wait)$read_waits->shift;
      
      my $ok = 0;
      
      &finish_wait($read_wait, $ok);
    }
    
    $self->{closed} = 1;
  }
  
  method cap : int () {
    return $self->{capacity};
  }
  
  method len : int () {
    return $self->{length};
  }
  
  private static method finish_wait : void ($wait : Go::Channel::Wait, $ok : int) {
    
    $wait->{finish} = 1;
    
    $wait->{ok} = (byte)$ok;
    
    my $wait_coroutine = $wait->{coroutine};
    
    if ($wait_coroutine) {
      $wait_coroutine->{disable} = 0;
    }
  }
  
}



( run in 0.723 second using v1.01-cache-2.11-cpan-39bf76dae61 )