Acme-Parataxis
view release on metacpan or search on metacpan
lib/Acme/Parataxis.pm view on Meta::CPAN
my $yield_val = $fiber->call($res);
if ( defined $fiber && !$fiber->is_done ) {
if ( defined $yield_val && $yield_val eq 'WAITING' ) { }
else {
push @SCHEDULER_QUEUE, $fiber;
}
}
}
}
if (@SCHEDULER_QUEUE) {
my $current = shift @SCHEDULER_QUEUE;
next unless $current;
next if $current->is_done;
my $res = $current->call();
if ( defined $current && !$current->is_done ) {
if ( defined $res && $res eq 'WAITING' ) { }
else {
push @SCHEDULER_QUEUE, $current;
}
}
}
my $active_count = scalar keys %Acme::Parataxis::REGISTRY;
if ( defined $main_fiber && $main_fiber->is_done && $active_count == 0 && !@SCHEDULER_QUEUE ) {
$IS_RUNNING = 0;
}
if ( $IS_RUNNING && !@SCHEDULER_QUEUE && !@ready ) {
usleep(1000);
}
}
}
sub stop () { $IS_RUNNING = 0 }
class #
Acme::Parataxis {
use Carp qw[croak];
field $code : reader : param;
field $is_done = 0;
field $error : reader;
field $result : reader;
field $fid : reader;
field $future : param = undef;
method set_result ($val) {
$result = $val;
$future->set_result($val) if $future;
}
method set_error ($err) {
$error = $err;
$future->set_error($err) if $future;
}
method _clear_result () {
$result = undef;
$error = undef;
}
our %REGISTRY;
ADJUST {
Acme::Parataxis::force_depth_zero($code);
$fid = Acme::Parataxis::create_fiber( $code, $self );
$REGISTRY{$fid} = $self;
builtin::weaken $REGISTRY{$fid};
}
method call (@args) {
croak 'Cannot call a finished fiber' if $is_done;
my $rv = Acme::Parataxis::coro_call( $fid, \@args );
return unless defined $self;
if ( $self->is_done ) {
my $err = $error;
die $err if defined $err;
}
return unless defined $rv;
return ( ref $rv eq 'ARRAY' ) ? ( wantarray ? @$rv : $rv->[-1] ) : $rv;
}
method transfer (@args) {
croak 'Cannot transfer to a finished fiber' if $self->is_done;
my $rv = Acme::Parataxis::coro_transfer( $fid, \@args );
if ( $self->is_done ) {
my $err = $error;
die $err if defined $err;
}
return unless defined $rv;
return ( ref $rv eq 'ARRAY' ) ? ( wantarray ? @$rv : $rv->[-1] ) : $rv;
}
method is_done () {
return 1 if $is_done;
if ( defined $fid && $fid >= 0 && Acme::Parataxis::is_finished($fid) ) {
$is_done = 1;
my $old_fid = $fid;
$fid = -1;
delete $REGISTRY{$old_fid};
Acme::Parataxis::destroy_coro($old_fid);
return 1;
}
return 0;
}
method wait () {
while ( !$self->is_done ) {
Acme::Parataxis->yield('WAITING_FOR_CHILD');
}
return $self->result;
}
method DESTROY {
return if ${^GLOBAL_PHASE} eq 'DESTRUCT';
if ( defined $fid && $fid >= 0 ) {
delete $REGISTRY{$fid};
Acme::Parataxis::destroy_coro($fid);
$fid = -1;
}
}
sub by_id ( $class, $fid ) { $REGISTRY{$fid} }
}
class #
Acme::Parataxis::Root {
method transfer (@args) {
my $rv = Acme::Parataxis::coro_transfer( -1, \@args );
( run in 0.916 second using v1.01-cache-2.11-cpan-5511b514fd6 )