Acme-Parataxis

 view release on metacpan or  search on metacpan

lib/Acme/Parataxis.pm  view on Meta::CPAN

                    my $yield_val = $fiber->call($res);
                    if ( defined $fiber && !$fiber->is_done ) {
                        if ( defined $yield_val && $yield_val eq 'WAITING' ) { }
                        else {
                            push @SCHEDULER_QUEUE, $fiber;
                        }
                    }
                }
            }
            if (@SCHEDULER_QUEUE) {
                my $current = shift @SCHEDULER_QUEUE;
                next unless $current;
                next if $current->is_done;
                my $res = $current->call();
                if ( defined $current && !$current->is_done ) {
                    if ( defined $res && $res eq 'WAITING' ) { }
                    else {
                        push @SCHEDULER_QUEUE, $current;
                    }
                }
            }
            my $active_count = scalar keys %Acme::Parataxis::REGISTRY;
            if ( defined $main_fiber && $main_fiber->is_done && $active_count == 0 && !@SCHEDULER_QUEUE ) {
                $IS_RUNNING = 0;
            }
            if ( $IS_RUNNING && !@SCHEDULER_QUEUE && !@ready ) {
                usleep(1000);
            }
        }
    }
    sub stop () { $IS_RUNNING = 0 }
    class    #
        Acme::Parataxis {
        use Carp qw[croak];
        field $code : reader : param;
        field $is_done = 0;
        field $error  : reader;
        field $result : reader;
        field $fid    : reader;
        field $future : param = undef;

        method set_result ($val) {
            $result = $val;
            $future->set_result($val) if $future;
        }

        method set_error ($err) {
            $error = $err;
            $future->set_error($err) if $future;
        }

        method _clear_result () {
            $result = undef;
            $error  = undef;
        }
        our %REGISTRY;
        ADJUST {
            Acme::Parataxis::force_depth_zero($code);
            $fid = Acme::Parataxis::create_fiber( $code, $self );
            $REGISTRY{$fid} = $self;
            builtin::weaken $REGISTRY{$fid};
        }

        method call (@args) {
            croak 'Cannot call a finished fiber' if $is_done;
            my $rv = Acme::Parataxis::coro_call( $fid, \@args );
            return unless defined $self;
            if ( $self->is_done ) {
                my $err = $error;
                die $err if defined $err;
            }
            return unless defined $rv;
            return ( ref $rv eq 'ARRAY' ) ? ( wantarray ? @$rv : $rv->[-1] ) : $rv;
        }

        method transfer (@args) {
            croak 'Cannot transfer to a finished fiber' if $self->is_done;
            my $rv = Acme::Parataxis::coro_transfer( $fid, \@args );
            if ( $self->is_done ) {
                my $err = $error;
                die $err if defined $err;
            }
            return unless defined $rv;
            return ( ref $rv eq 'ARRAY' ) ? ( wantarray ? @$rv : $rv->[-1] ) : $rv;
        }

        method is_done () {
            return 1 if $is_done;
            if ( defined $fid && $fid >= 0 && Acme::Parataxis::is_finished($fid) ) {
                $is_done = 1;
                my $old_fid = $fid;
                $fid = -1;
                delete $REGISTRY{$old_fid};
                Acme::Parataxis::destroy_coro($old_fid);
                return 1;
            }
            return 0;
        }

        method wait () {
            while ( !$self->is_done ) {
                Acme::Parataxis->yield('WAITING_FOR_CHILD');
            }
            return $self->result;
        }

        method DESTROY {
            return if ${^GLOBAL_PHASE} eq 'DESTRUCT';
            if ( defined $fid && $fid >= 0 ) {
                delete $REGISTRY{$fid};
                Acme::Parataxis::destroy_coro($fid);
                $fid = -1;
            }
        }
        sub by_id ( $class, $fid ) { $REGISTRY{$fid} }
    }
    class    #
        Acme::Parataxis::Root {

        method transfer (@args) {
            my $rv = Acme::Parataxis::coro_transfer( -1, \@args );



( run in 0.916 second using v1.01-cache-2.11-cpan-5511b514fd6 )