Acme-Parataxis
view release on metacpan or search on metacpan
lib/Acme/Parataxis.pm view on Meta::CPAN
use v5.40;
use experimental qw[class try];
package Acme::Parataxis v0.0.10 {
use Affix;
use Config;
use File::Spec;
use File::Basename qw[dirname];
use Time::HiRes qw[usleep];
use Exporter qw[import];
use Carp qw[croak];
our %EXPORT_TAGS = (
all => [
our @EXPORT_OK
= qw[
run spawn yield await stop async fiber
await_sleep await_read await_write await_core_id
current_fid tid root maybe_yield
set_max_threads max_threads
]
]
);
#
our @IPC_BUFFER;
my $lib;
my @SCHEDULER_QUEUE;
my $IS_RUNNING = 0;
sub _bind_functions ($l) {
affix $l, 'init_system', [], Int;
affix $l, 'create_fiber', [ Pointer [SV], Pointer [SV] ], Int;
affix $l, 'coro_call', [ Int, Pointer [SV] ], Pointer [SV];
affix $l, 'coro_transfer', [ Int, Pointer [SV] ], Pointer [SV];
affix $l, 'coro_yield', [ Pointer [SV] ], Pointer [SV];
affix $l, 'is_finished', [Int], Int;
affix $l, 'destroy_coro', [Int], Void;
affix $l, 'force_depth_zero', [ Pointer [SV] ], Void;
affix $l, 'cleanup', [], Void;
affix $l, 'get_os_thread_id_export', [], Int;
affix $l, 'get_current_parataxis_id', [], Int;
affix $l, 'submit_c_job', [ Int, LongLong, Int ], Int;
affix $l, 'check_for_completion', [], Int;
affix $l, 'get_job_result', [Int], Pointer [SV];
affix $l, 'get_job_coro_id', [Int], Int;
affix $l, 'free_job_slot', [Int], Void;
affix $l, 'get_thread_pool_size', [], Int;
affix $l, 'get_max_thread_pool_size', [], Int;
affix $l, 'set_max_threads', [Int], Void;
affix $l, 'set_preempt_threshold', [LongLong], Void;
affix $l, [ 'maybe_yield' => '_maybe_yield' ], [], Pointer [SV];
affix $l, 'get_preempt_count', [], LongLong;
# Capture the main interpreter context eagerly
init_system();
if ( $^O eq 'MSWin32' ) {
my $perl_dll = $Config{libperl};
$perl_dll =~ s/^lib//;
$perl_dll =~ s/\.a$//;
$perl_dll .= '.' . $Config{so};
my $p = Affix::load_library($perl_dll);
affix $p, 'win32_get_osfhandle', [Int], LongLong;
}
}
BEGIN {
my $lib_name = ( $^O eq 'MSWin32' ? '' : 'lib' ) . 'parataxis.' . $Config{so};
my @paths;
push @paths, File::Spec->catfile( dirname(__FILE__), $lib_name );
push @paths, File::Spec->catfile( dirname(__FILE__), '..', 'arch', 'auto', 'Acme', 'Parataxis', $lib_name );
push @paths, File::Spec->catfile( dirname(__FILE__), '..', '..', 'arch', 'auto', 'Acme', 'Parataxis', $lib_name );
push @paths, File::Spec->catfile( dirname(__FILE__), 'auto', 'Acme', 'Parataxis', $lib_name );
# XXX - Local dir check (This is temporary)
push @paths, File::Spec->catfile( '.', $lib_name );
for my $inc (@INC) {
next if ref $inc;
push @paths, File::Spec->catfile( $inc, 'auto', 'Acme', 'Parataxis', $lib_name );
}
for my $path (@paths) {
if ( -e $path ) {
$lib = Affix::load_library($path);
last if $lib;
}
}
die 'Could not find or load ' . $lib_name unless $lib;
_bind_functions($lib);
}
# API aliases and wrappers
sub fiber : prototype(&) ($code) { spawn( 'Acme::Parataxis', $code ) }
sub async : prototype(&) ($code) {
my $ret = run($code);
stop();
return $ret;
}
sub await {
my $thing = shift;
if ( builtin::blessed($thing) ) {
return $thing->await if $thing->can('await');
return $thing->wait if $thing->can('wait');
}
croak 'await() requires a Future or Fiber object';
}
sub yield {
my $invocant = shift;
if ( !defined $invocant || ( ( ref $invocant || $invocant ) ne 'Acme::Parataxis' && !eval { $invocant->isa('Acme::Parataxis') } ) ) {
unshift @_, $invocant if defined $invocant;
$invocant = 'Acme::Parataxis';
}
my $result = coro_yield( \@_ );
return unless defined $result;
return ( ref $result eq 'ARRAY' ) ? ( wantarray ? @$result : $result->[-1] ) : $result;
}
sub spawn {
my ( $class, $code ) = @_;
if ( ref $class eq 'CODE' ) {
$code = $class;
$class = 'Acme::Parataxis';
}
my $future = Acme::Parataxis::Future->new();
my $fiber = Acme::Parataxis->new( code => $code, future => $future );
push @SCHEDULER_QUEUE, $fiber;
return $future;
}
sub await_sleep {
my $invocant = shift;
if ( !defined $invocant || ( ( ref $invocant || $invocant ) ne 'Acme::Parataxis' && !eval { $invocant->isa('Acme::Parataxis') } ) ) {
unshift @_, $invocant if defined $invocant;
}
my $ms = shift // 0;
return 'Queue Full' if submit_c_job( 0, $ms, 0 ) < 0;
return yield('WAITING');
}
sub await_core_id {
my $invocant = shift;
if ( !defined $invocant || ( ( ref $invocant || $invocant ) ne 'Acme::Parataxis' && !eval { $invocant->isa('Acme::Parataxis') } ) ) {
unshift @_, $invocant if defined $invocant;
}
return 'Queue Full' if submit_c_job( 1, 0, 0 ) < 0;
return yield('WAITING');
( run in 1.622 second using v1.01-cache-2.11-cpan-2398b32b56e )