Acme-Parataxis

 view release on metacpan or  search on metacpan

lib/Acme/Parataxis.pm  view on Meta::CPAN

use v5.40;
use experimental qw[class try];

package Acme::Parataxis v0.0.10 {
    use Affix;
    use Config;
    use File::Spec;
    use File::Basename qw[dirname];
    use Time::HiRes    qw[usleep];
    use Exporter       qw[import];
    use Carp           qw[croak];
    our %EXPORT_TAGS = (
        all => [
            our @EXPORT_OK
                = qw[
                run spawn yield await stop async fiber
                await_sleep await_read await_write await_core_id
                current_fid tid root maybe_yield
                set_max_threads max_threads
                ]
        ]
    );
    #
    our @IPC_BUFFER;
    my $lib;
    my @SCHEDULER_QUEUE;
    my $IS_RUNNING = 0;

    sub _bind_functions ($l) {
        affix $l, 'init_system',                       [],                             Int;
        affix $l, 'create_fiber',                      [ Pointer [SV], Pointer [SV] ], Int;
        affix $l, 'coro_call',                         [ Int, Pointer [SV] ],          Pointer [SV];
        affix $l, 'coro_transfer',                     [ Int, Pointer [SV] ],          Pointer [SV];
        affix $l, 'coro_yield',                        [ Pointer [SV] ],               Pointer [SV];
        affix $l, 'is_finished',                       [Int],                          Int;
        affix $l, 'destroy_coro',                      [Int],                          Void;
        affix $l, 'force_depth_zero',                  [ Pointer [SV] ],               Void;
        affix $l, 'cleanup',                           [],                             Void;
        affix $l, 'get_os_thread_id_export',           [],                             Int;
        affix $l, 'get_current_parataxis_id',          [],                             Int;
        affix $l, 'submit_c_job',                      [ Int, LongLong, Int ],         Int;
        affix $l, 'check_for_completion',              [],                             Int;
        affix $l, 'get_job_result',                    [Int],                          Pointer [SV];
        affix $l, 'get_job_coro_id',                   [Int],                          Int;
        affix $l, 'free_job_slot',                     [Int],                          Void;
        affix $l, 'get_thread_pool_size',              [],                             Int;
        affix $l, 'get_max_thread_pool_size',          [],                             Int;
        affix $l, 'set_max_threads',                   [Int],                          Void;
        affix $l, 'set_preempt_threshold',             [LongLong],                     Void;
        affix $l, [ 'maybe_yield' => '_maybe_yield' ], [],                             Pointer [SV];
        affix $l, 'get_preempt_count',                 [],                             LongLong;

        # Capture the main interpreter context eagerly
        init_system();
        if ( $^O eq 'MSWin32' ) {
            my $perl_dll = $Config{libperl};
            $perl_dll =~ s/^lib//;
            $perl_dll =~ s/\.a$//;
            $perl_dll .= '.' . $Config{so};
            my $p = Affix::load_library($perl_dll);
            affix $p, 'win32_get_osfhandle', [Int], LongLong;
        }
    }

    BEGIN {
        my $lib_name = ( $^O eq 'MSWin32' ? '' : 'lib' ) . 'parataxis.' . $Config{so};
        my @paths;
        push @paths, File::Spec->catfile( dirname(__FILE__), $lib_name );
        push @paths, File::Spec->catfile( dirname(__FILE__), '..',   'arch', 'auto',      'Acme', 'Parataxis', $lib_name );
        push @paths, File::Spec->catfile( dirname(__FILE__), '..',   '..',   'arch',      'auto', 'Acme', 'Parataxis', $lib_name );
        push @paths, File::Spec->catfile( dirname(__FILE__), 'auto', 'Acme', 'Parataxis', $lib_name );

        # XXX - Local dir check (This is temporary)
        push @paths, File::Spec->catfile( '.', $lib_name );
        for my $inc (@INC) {
            next if ref $inc;
            push @paths, File::Spec->catfile( $inc, 'auto', 'Acme', 'Parataxis', $lib_name );
        }
        for my $path (@paths) {
            if ( -e $path ) {
                $lib = Affix::load_library($path);
                last if $lib;
            }
        }
        die 'Could not find or load ' . $lib_name unless $lib;
        _bind_functions($lib);
    }

    # API aliases and wrappers
    sub fiber : prototype(&) ($code) { spawn( 'Acme::Parataxis', $code ) }

    sub async : prototype(&) ($code) {
        my $ret = run($code);
        stop();
        return $ret;
    }

    sub await {
        my $thing = shift;
        if ( builtin::blessed($thing) ) {
            return $thing->await if $thing->can('await');
            return $thing->wait  if $thing->can('wait');
        }
        croak 'await() requires a Future or Fiber object';
    }

    sub yield {
        my $invocant = shift;
        if ( !defined $invocant || ( ( ref $invocant || $invocant ) ne 'Acme::Parataxis' && !eval { $invocant->isa('Acme::Parataxis') } ) ) {
            unshift @_, $invocant if defined $invocant;
            $invocant = 'Acme::Parataxis';
        }
        my $result = coro_yield( \@_ );
        return unless defined $result;
        return ( ref $result eq 'ARRAY' ) ? ( wantarray ? @$result : $result->[-1] ) : $result;
    }

    sub spawn {
        my ( $class, $code ) = @_;
        if ( ref $class eq 'CODE' ) {
            $code  = $class;
            $class = 'Acme::Parataxis';
        }
        my $future = Acme::Parataxis::Future->new();
        my $fiber  = Acme::Parataxis->new( code => $code, future => $future );
        push @SCHEDULER_QUEUE, $fiber;
        return $future;
    }

    sub await_sleep {
        my $invocant = shift;
        if ( !defined $invocant || ( ( ref $invocant || $invocant ) ne 'Acme::Parataxis' && !eval { $invocant->isa('Acme::Parataxis') } ) ) {
            unshift @_, $invocant if defined $invocant;
        }
        my $ms = shift // 0;
        return 'Queue Full' if submit_c_job( 0, $ms, 0 ) < 0;
        return yield('WAITING');
    }

    sub await_core_id {
        my $invocant = shift;
        if ( !defined $invocant || ( ( ref $invocant || $invocant ) ne 'Acme::Parataxis' && !eval { $invocant->isa('Acme::Parataxis') } ) ) {
            unshift @_, $invocant if defined $invocant;
        }
        return 'Queue Full' if submit_c_job( 1, 0, 0 ) < 0;
        return yield('WAITING');



( run in 1.622 second using v1.01-cache-2.11-cpan-2398b32b56e )