Acme-Parataxis

 view release on metacpan or  search on metacpan

lib/Acme/Parataxis.pm  view on Meta::CPAN

use v5.40;
use experimental qw[class try];

package Acme::Parataxis v0.0.10 {
    use Affix;
    use Config;
    use File::Spec;
    use File::Basename qw[dirname];
    use Time::HiRes    qw[usleep];
    use Exporter       qw[import];
    use Carp           qw[croak];
    our %EXPORT_TAGS = (
        all => [
            our @EXPORT_OK
                = qw[
                run spawn yield await stop async fiber
                await_sleep await_read await_write await_core_id
                current_fid tid root maybe_yield
                set_max_threads max_threads
                ]
        ]
    );
    #
    our @IPC_BUFFER;
    my $lib;
    my @SCHEDULER_QUEUE;
    my $IS_RUNNING = 0;

    sub _bind_functions ($l) {
        affix $l, 'init_system',                       [],                             Int;
        affix $l, 'create_fiber',                      [ Pointer [SV], Pointer [SV] ], Int;
        affix $l, 'coro_call',                         [ Int, Pointer [SV] ],          Pointer [SV];
        affix $l, 'coro_transfer',                     [ Int, Pointer [SV] ],          Pointer [SV];
        affix $l, 'coro_yield',                        [ Pointer [SV] ],               Pointer [SV];
        affix $l, 'is_finished',                       [Int],                          Int;
        affix $l, 'destroy_coro',                      [Int],                          Void;
        affix $l, 'force_depth_zero',                  [ Pointer [SV] ],               Void;
        affix $l, 'cleanup',                           [],                             Void;
        affix $l, 'get_os_thread_id_export',           [],                             Int;
        affix $l, 'get_current_parataxis_id',          [],                             Int;
        affix $l, 'submit_c_job',                      [ Int, LongLong, Int ],         Int;
        affix $l, 'check_for_completion',              [],                             Int;
        affix $l, 'get_job_result',                    [Int],                          Pointer [SV];
        affix $l, 'get_job_coro_id',                   [Int],                          Int;
        affix $l, 'free_job_slot',                     [Int],                          Void;
        affix $l, 'get_thread_pool_size',              [],                             Int;
        affix $l, 'get_max_thread_pool_size',          [],                             Int;
        affix $l, 'set_max_threads',                   [Int],                          Void;
        affix $l, 'set_preempt_threshold',             [LongLong],                     Void;
        affix $l, [ 'maybe_yield' => '_maybe_yield' ], [],                             Pointer [SV];
        affix $l, 'get_preempt_count',                 [],                             LongLong;

        # Capture the main interpreter context eagerly
        init_system();
        if ( $^O eq 'MSWin32' ) {
            my $perl_dll = $Config{libperl};
            $perl_dll =~ s/^lib//;
            $perl_dll =~ s/\.a$//;
            $perl_dll .= '.' . $Config{so};
            my $p = Affix::load_library($perl_dll);
            affix $p, 'win32_get_osfhandle', [Int], LongLong;
        }
    }

    BEGIN {
        my $lib_name = ( $^O eq 'MSWin32' ? '' : 'lib' ) . 'parataxis.' . $Config{so};
        my @paths;
        push @paths, File::Spec->catfile( dirname(__FILE__), $lib_name );
        push @paths, File::Spec->catfile( dirname(__FILE__), '..',   'arch', 'auto',      'Acme', 'Parataxis', $lib_name );
        push @paths, File::Spec->catfile( dirname(__FILE__), '..',   '..',   'arch',      'auto', 'Acme', 'Parataxis', $lib_name );
        push @paths, File::Spec->catfile( dirname(__FILE__), 'auto', 'Acme', 'Parataxis', $lib_name );

        # XXX - Local dir check (This is temporary)
        push @paths, File::Spec->catfile( '.', $lib_name );
        for my $inc (@INC) {
            next if ref $inc;
            push @paths, File::Spec->catfile( $inc, 'auto', 'Acme', 'Parataxis', $lib_name );
        }
        for my $path (@paths) {
            if ( -e $path ) {
                $lib = Affix::load_library($path);
                last if $lib;
            }
        }
        die 'Could not find or load ' . $lib_name unless $lib;
        _bind_functions($lib);
    }

    # API aliases and wrappers
    sub fiber : prototype(&) ($code) { spawn( 'Acme::Parataxis', $code ) }

    sub async : prototype(&) ($code) {
        my $ret = run($code);
        stop();
        return $ret;
    }

    sub await {
        my $thing = shift;
        if ( builtin::blessed($thing) ) {
            return $thing->await if $thing->can('await');
            return $thing->wait  if $thing->can('wait');
        }
        croak 'await() requires a Future or Fiber object';
    }

    sub yield {
        my $invocant = shift;
        if ( !defined $invocant || ( ( ref $invocant || $invocant ) ne 'Acme::Parataxis' && !eval { $invocant->isa('Acme::Parataxis') } ) ) {
            unshift @_, $invocant if defined $invocant;
            $invocant = 'Acme::Parataxis';
        }
        my $result = coro_yield( \@_ );
        return unless defined $result;
        return ( ref $result eq 'ARRAY' ) ? ( wantarray ? @$result : $result->[-1] ) : $result;
    }

    sub spawn {
        my ( $class, $code ) = @_;
        if ( ref $class eq 'CODE' ) {
            $code  = $class;
            $class = 'Acme::Parataxis';
        }
        my $future = Acme::Parataxis::Future->new();
        my $fiber  = Acme::Parataxis->new( code => $code, future => $future );
        push @SCHEDULER_QUEUE, $fiber;
        return $future;
    }

    sub await_sleep {
        my $invocant = shift;
        if ( !defined $invocant || ( ( ref $invocant || $invocant ) ne 'Acme::Parataxis' && !eval { $invocant->isa('Acme::Parataxis') } ) ) {
            unshift @_, $invocant if defined $invocant;
        }
        my $ms = shift // 0;
        return 'Queue Full' if submit_c_job( 0, $ms, 0 ) < 0;
        return yield('WAITING');
    }

    sub await_core_id {
        my $invocant = shift;
        if ( !defined $invocant || ( ( ref $invocant || $invocant ) ne 'Acme::Parataxis' && !eval { $invocant->isa('Acme::Parataxis') } ) ) {
            unshift @_, $invocant if defined $invocant;
        }
        return 'Queue Full' if submit_c_job( 1, 0, 0 ) < 0;
        return yield('WAITING');
    }

    sub await_read {
        my $invocant = shift;
        if ( !defined $invocant || ( ( ref $invocant || $invocant ) ne 'Acme::Parataxis' && !eval { $invocant->isa('Acme::Parataxis') } ) ) {
            unshift @_, $invocant if defined $invocant;
        }
        my ( $fh, $timeout ) = @_;
        $timeout //= 5000;
        my $fileno = fileno($fh);
        die 'Not a valid filehandle' unless defined $fileno;
        my $handle = $^O eq 'MSWin32' ? win32_get_osfhandle($fileno) : $fileno;
        return 'Queue Full' if submit_c_job( 2, $handle, $timeout ) < 0;
        return yield('WAITING');
    }

    sub await_write {
        my $invocant = shift;
        if ( !defined $invocant || ( ( ref $invocant || $invocant ) ne 'Acme::Parataxis' && !eval { $invocant->isa('Acme::Parataxis') } ) ) {
            unshift @_, $invocant if defined $invocant;
        }
        my ( $fh, $timeout ) = @_;
        $timeout //= 5000;
        my $fileno = fileno($fh);
        die 'Not a valid filehandle' unless defined $fileno;
        my $handle = $^O eq 'MSWin32' ? win32_get_osfhandle($fileno) : $fileno;
        return 'Queue Full' if submit_c_job( 3, $handle, $timeout ) < 0;
        return yield('WAITING');
    }

    sub maybe_yield {
        my $invocant = shift;
        if ( !defined $invocant || ( ( ref $invocant || $invocant ) ne 'Acme::Parataxis' && !eval { $invocant->isa('Acme::Parataxis') } ) ) {
            unshift @_, $invocant if defined $invocant;
        }
        my $result = Acme::Parataxis::_maybe_yield();
        return unless defined $result;
        return wantarray ? @$result : $result->[-1];
    }
    sub tid            { get_os_thread_id_export() }
    sub current_fid    { get_current_parataxis_id() }
    sub root           { state $root //= Acme::Parataxis::Root->new() }
    sub max_threads () { Acme::Parataxis::get_max_thread_pool_size() }

    # Scheduler internals
    sub _scheduler_enqueue_by_id ($fid) {
        if ( my $fiber = Acme::Parataxis->by_id($fid) ) {
            push @SCHEDULER_QUEUE, $fiber;
        }
    }

    sub poll_io {
        my @ready;
        while (1) {
            my $job_idx = check_for_completion();
            last if $job_idx == -1;
            my $fid = get_job_coro_id($job_idx);
            my $res = get_job_result($job_idx);
            push @ready, [ $fid, $res ];
            free_job_slot($job_idx);
        }
        return @ready;
    }

    sub run ($code) {
        @SCHEDULER_QUEUE = ();
        $IS_RUNNING      = 1;
        my $main_fiber = Acme::Parataxis->new( code => $code );
        push @SCHEDULER_QUEUE, $main_fiber;
        while ($IS_RUNNING) {
            my @ready = poll_io();
            for my $ready (@ready) {
                my ( $fid, $res ) = @$ready;
                my $fiber = Acme::Parataxis->by_id($fid);
                if ($fiber) {
                    my $yield_val = $fiber->call($res);
                    if ( defined $fiber && !$fiber->is_done ) {
                        if ( defined $yield_val && $yield_val eq 'WAITING' ) { }
                        else {
                            push @SCHEDULER_QUEUE, $fiber;
                        }
                    }
                }
            }
            if (@SCHEDULER_QUEUE) {
                my $current = shift @SCHEDULER_QUEUE;
                next unless $current;
                next if $current->is_done;
                my $res = $current->call();
                if ( defined $current && !$current->is_done ) {
                    if ( defined $res && $res eq 'WAITING' ) { }
                    else {
                        push @SCHEDULER_QUEUE, $current;
                    }
                }
            }
            my $active_count = scalar keys %Acme::Parataxis::REGISTRY;
            if ( defined $main_fiber && $main_fiber->is_done && $active_count == 0 && !@SCHEDULER_QUEUE ) {
                $IS_RUNNING = 0;
            }
            if ( $IS_RUNNING && !@SCHEDULER_QUEUE && !@ready ) {
                usleep(1000);
            }
        }
    }
    sub stop () { $IS_RUNNING = 0 }
    class    #
        Acme::Parataxis {
        use Carp qw[croak];
        field $code : reader : param;
        field $is_done = 0;
        field $error  : reader;
        field $result : reader;
        field $fid    : reader;
        field $future : param = undef;

        method set_result ($val) {
            $result = $val;
            $future->set_result($val) if $future;
        }

        method set_error ($err) {
            $error = $err;
            $future->set_error($err) if $future;
        }

        method _clear_result () {
            $result = undef;
            $error  = undef;
        }
        our %REGISTRY;
        ADJUST {
            Acme::Parataxis::force_depth_zero($code);
            $fid = Acme::Parataxis::create_fiber( $code, $self );
            $REGISTRY{$fid} = $self;
            builtin::weaken $REGISTRY{$fid};
        }

        method call (@args) {
            croak 'Cannot call a finished fiber' if $is_done;
            my $rv = Acme::Parataxis::coro_call( $fid, \@args );
            return unless defined $self;
            if ( $self->is_done ) {
                my $err = $error;
                die $err if defined $err;
            }
            return unless defined $rv;
            return ( ref $rv eq 'ARRAY' ) ? ( wantarray ? @$rv : $rv->[-1] ) : $rv;
        }

        method transfer (@args) {
            croak 'Cannot transfer to a finished fiber' if $self->is_done;
            my $rv = Acme::Parataxis::coro_transfer( $fid, \@args );
            if ( $self->is_done ) {
                my $err = $error;
                die $err if defined $err;
            }
            return unless defined $rv;
            return ( ref $rv eq 'ARRAY' ) ? ( wantarray ? @$rv : $rv->[-1] ) : $rv;
        }

        method is_done () {
            return 1 if $is_done;
            if ( defined $fid && $fid >= 0 && Acme::Parataxis::is_finished($fid) ) {
                $is_done = 1;
                my $old_fid = $fid;
                $fid = -1;
                delete $REGISTRY{$old_fid};
                Acme::Parataxis::destroy_coro($old_fid);
                return 1;
            }
            return 0;
        }

        method wait () {
            while ( !$self->is_done ) {
                Acme::Parataxis->yield('WAITING_FOR_CHILD');
            }
            return $self->result;
        }

        method DESTROY {
            return if ${^GLOBAL_PHASE} eq 'DESTRUCT';
            if ( defined $fid && $fid >= 0 ) {
                delete $REGISTRY{$fid};
                Acme::Parataxis::destroy_coro($fid);
                $fid = -1;
            }
        }
        sub by_id ( $class, $fid ) { $REGISTRY{$fid} }
    }
    class    #
        Acme::Parataxis::Root {

        method transfer (@args) {
            my $rv = Acme::Parataxis::coro_transfer( -1, \@args );
            return unless defined $rv;
            return ( ref $rv eq 'ARRAY' ) ? ( wantarray ? @$rv : $rv->[-1] ) : $rv;
        }
        method fid () {-1}
    }
    class    #
        Acme::Parataxis::Future {
        use Carp qw[croak];
        field $is_ready : reader = 0;
        field $result;
        field $error;
        field @callbacks;

        method result () {
            croak 'Future not ready' unless $is_ready;
            return $result;
        }

        method set_result ($val) {
            die 'Future already ready' if $is_ready;
            $result   = $val;
            $is_ready = 1;
            $_->($self) for @callbacks;
        }

        method set_error ($err) {
            die 'Future already ready' if $is_ready;
            $error    = $err;
            $is_ready = 1;
            $_->($self) for @callbacks;
        }

        method clear_result () {
            $result = undef;
            $error  = undef;
        }

        method on_ready ($cb) {
            if   ($is_ready) { $cb->($self) }
            else             { push @callbacks, $cb }
        }

        method await () {
            return $self->result if $is_ready;
            my $fid = Acme::Parataxis->current_fid;
            $self->on_ready(
                sub ($f) {
                    Acme::Parataxis::_scheduler_enqueue_by_id($fid);
                }
            );
            Acme::Parataxis->yield('WAITING');
            $self->result;
        }
    }
    END { cleanup() unless ${^GLOBAL_PHASE} eq 'DESTRUCT' }
}
1;



( run in 1.004 second using v1.01-cache-2.11-cpan-2398b32b56e )