view release on metacpan or search on metacpan
The classic way (as I write this, Acme::Parataxis is 5 days old and already has a 'classic' API...)
```perl
use v5.40;
use Acme::Parataxis;
$|++;
Acme::Parataxis::run(
sub {
say 'Main task started';
# Spawn background workers
my $f1 = Acme::Parataxis->spawn(
sub {
say ' Task 1: Sleeping in a native thread pool...';
Acme::Parataxis->await_sleep(1000);
say ' Task 1: Ah! What a nice nap...';
return 42;
}
);
```
Or do things the more modern way:
```perl
use v5.40;
use Acme::Parataxis qw[:all];
$|++;
async {
say 'Main task started';
# 'fiber' is a shorter alias for 'spawn'
my $f1 = fiber {
say ' Task 1: Sleeping...';
await_sleep(1000);
return 42;
};
my $f2 = fiber {
say ' Task 2: Performing I/O...';
Close the browser and clear your history before this does further harm!
# MODERN API
While the classic object-oriented API is always available, `Acme::Parataxis` exports a set of functions (via the
`:all` tag) that provide a more modern, concise way to write concurrent code.
## `async { ... }`
A convenience wrapper around `run( )`. It starts the scheduler, executes the provided block as the main fiber, and
automatically calls `stop( )` when the block completes.
```
async {
say "The scheduler is running!";
};
```
## `fiber { ... }`
async {
my $core = await_core_id( );
say "Background task handled by CPU core: $core";
};
```
# CORE CONCEPTS
## Creating Fibers
All Perl code in this system runs within a fiber. When you start your script or call `Acme::Parataxis::run`, a "main"
fiber is active. You can create new fibers using `spawn` or by manually instantiating an `Acme::Parataxis` object:
```perl
my $fiber = Acme::Parataxis->new(code => sub {
say "I'm in a fiber!";
});
```
Creating a fiber does not run it immediately. It simply prepares the context and waits to be invoked.
```
## `yield( @args )`
Pauses the current fiber and returns control to the scheduler. If `@args` are provided, they are passed to the context
that next resumes this fiber. Arguments can be of any Perl data type.
## `stop( )`
Tells the scheduler to exit the loop after the current iteration. Note that this does not immediately terminate other
fibers; it simply prevents the scheduler from starting new ones.
# THREAD POOL CONFIGURATION
`Acme::Parataxis` uses a native thread pool to handle blocking tasks. While it manages itself automatically, you can
tune its behavior using these functions.
## `set_max_threads( $count )`
Sets the maximum number of worker threads the pool is allowed to spawn. By default, this is set to the number of
logical CPU cores detected on your system (up to a hard limit of 64).
return $res;
}
}
{
package My::HTTP::Handle;
use parent -norequire, 'HTTP::Tiny::Handle';
use Time::HiRes qw[time];
sub _do_timeout {
my ($self, $type, $timeout) = @_;
$timeout //= $self->{timeout} // 60;
my $start = time;
while (1) {
# Check for readiness NOW (0 timeout)
return 1 if $self->SUPER::_do_timeout($type, 0);
# Check for overall timeout
my $elapsed = time - $start;
return 0 if $elapsed > $timeout;
# Suspend fiber and wait for background I/O check
my $wait = ($timeout - $elapsed) > 0.5 ? 0.5 : ($timeout - $elapsed);
if ($type eq 'read') {
Acme::Parataxis->await_read($self->{fh}, int($wait * 1000));
} else {
Acme::Parataxis->await_write($self->{fh}, int($wait * 1000));
}
}
}
# BEST PRACTICES & GOTCHAS
- **Avoid blocking syscalls:** Never call blocking `sleep( )` or `sysread( )` on the main interpretation thread.
Always use the `await_*` equivalents to offload work to the pool.
- **Thread Safety:** While Perl code remains single-threaded, background tasks run on separate OS threads. Shared
C-level data (if accessed via FFI) must be mutex-protected.
- **Stack Limits:** Each fiber is allocated a 512KB stack by default. This is more than sufficient for most
Perl code and allows for high concurrency with a small memory footprint. Extremely deep recursion or massive regex
backtracking might still hit limits.
- **Efficiency:** The native thread pool is initialized dynamically upon the first asynchronous request. It
starts with a small "seed" pool and grows on demand up to the configured limit. Worker threads use condition
variables to sleep efficiently when idle, ensuring near-zero CPU usage when no background tasks are pending.
- **Reference Cycles:** Be careful when passing fiber objects into their own closures, as this can create
memory leaks.
# GORY TECHNICAL DETAILS
## Architectural Inspiration
The concurrency model in Parataxis is heavily inspired by the **Wren** programming language, specifically its treatment
of fibers as the primary unit of execution and its deterministic cooperative scheduling.
eg/basic.pl view on Meta::CPAN
use v5.40;
use blib;
use Acme::Parataxis;
#
say 'Main thread (TID: ' . Acme::Parataxis->tid . ')';
# Create a worker parataxis
my $worker = Acme::Parataxis->new(
code => sub ($name) {
say "---> Worker '$name' started (FID: " . Acme::Parataxis->current_fid . ")";
for my $i ( 1 .. 3 ) {
say "---> Worker '$name' processing step $i";
my $input = Acme::Parataxis->yield( 'Result from step ' . $i );
say "---> Worker '$name' received: $input";
}
return 'Final success';
}
);
# Drive the worker
eg/http_tiny.pl view on Meta::CPAN
}
}
#
package My::HTTP::Handle {
use parent -norequire, 'HTTP::Tiny::Handle';
use Time::HiRes qw[time];
use Acme::Parataxis qw[await_read await_write];
sub _do_timeout ( $self, $type, $timeout //= $self->{timeout} // 60 ) {
if ( $self->{fh} ) {
my $start = time;
while (1) {
# Check for readiness NOW (0 timeout)
return 1 if $self->SUPER::_do_timeout( $type, 0 );
# Check for overall timeout
my $elapsed = time() - $start;
return 0 if $elapsed > $timeout;
# Suspend fiber and wait for background I/O check.
# This is where the magic happens: the fiber yields control back
# to the scheduler while waiting for the socket to be ready.
my $wait = ( $timeout - $elapsed ) > 0.5 ? 0.5 : ( $timeout - $elapsed );
if ( $type eq 'read' ) {
await_read( $self->{fh}, int( $wait * 1000 ) );
}
else {
eg/http_tiny.pl view on Meta::CPAN
my @urls = qw[
http://www.google.com
http://www.example.com
https://www.perl.org
https://metacpan.org
];
my @futures;
for my $url (@urls) {
push @futures, fiber {
my $start = time();
say sprintf ' [Fiber %d] Fetching %s...', current_fid(), $url;
my $res = $http->get($url);
my $elapsed = time() - $start;
say sprintf ' [Fiber %d] Done %s (Status: %d) in %.2fs', current_fid(), $url, $res->{status}, $elapsed;
return $res;
};
}
# Wait for all fibers to complete and collect results
for my $i ( 0 .. $#urls ) {
my $res = await $futures[$i];
say sprintf 'Final Result for %s: %d %s', $urls[$i], $res->{status}, $res->{reason};
}
eg/modern.pl view on Meta::CPAN
use v5.40;
use blib;
use Acme::Parataxis qw[:all];
#
async {
say 'Main fiber started';
# 'fiber' is the modern alias for 'spawn'
my $f1 = fiber {
say ' Fiber 1: Sleeping...';
await_sleep(1000);
say ' Fiber 1: Woke up!';
return 42;
};
# 'await' is the modern way to wait for results
eg/port_scanner.pl view on Meta::CPAN
use Errno qw[EINPROGRESS EWOULDBLOCK];
use Time::HiRes qw[time];
$|++;
# This demonstrates how fibers can be used to perform concurrent
# network operations using standard Perl modules (IO::Socket) and the
# Parataxis non-blocking I/O scheduler.
my $target = '127.0.0.1';
my @ports = ( 22, 80, 443, 3306, 3389, 5432, 8080, 9999 ); # Common ports + our demo server
# To make the test interesting, let's start a tiny listener on 9999 in a fiber
Acme::Parataxis::run(
sub {
say "Main: Starting parallel scan of $target...";
my $start_time = time;
# Start a dummy listener so we have at least one open port to find
my $server = IO::Socket::INET->new( LocalAddr => $target, LocalPort => 9999, Listen => 5, Reuse => 1, Blocking => 0 );
say 'Main: Local dummy server started on 9999' if $server;
my @results;
my @futures;
# Spawn a fiber for each port we want to scan
for my $port (@ports) {
push @futures, Acme::Parataxis->spawn(
sub {
my $s = IO::Socket::INET->new( PeerAddr => $target, PeerPort => $port, Proto => 'tcp', Blocking => 0 );
# If the socket failed immediately (e.g. invalid target)
eg/port_scanner.pl view on Meta::CPAN
}
# Wait for all scanners to finish
for my $f (@futures) {
my $r = $f->await();
push @results, $r;
if ( $r->{status} eq 'OPEN' ) {
say " [!] Port $r->{port} is OPEN";
}
}
my $elapsed = time() - $start_time;
say 'Main: Scan complete.';
say 'Summary:';
for my $r ( sort { $a->{port} <=> $b->{port} } @results ) {
say sprintf ' Port %-5d: %s', $r->{port}, $r->{status};
}
say sprintf 'Total scan time: %.2fs', $elapsed;
$server->close() if $server;
Acme::Parataxis::stop();
}
);
eg/socket.pl view on Meta::CPAN
Acme::Parataxis::run(
sub {
my $server = IO::Socket::INET->new( LocalAddr => '127.0.0.1', LocalPort => 9999, Proto => 'tcp', Listen => 5, Reuse => 1 ) or
die 'Could not create server: ' . $!;
$server->blocking(0);
$server->autoflush(1);
say 'Server listening on 127.0.0.1:9999';
my $client_done = 0;
Acme::Parataxis->spawn(
sub {
say 'Client starting...';
my $client = IO::Socket::INET->new( PeerAddr => '127.0.0.1', PeerPort => 9999, Proto => 'tcp' ) or
die 'Could not create client: ' . $!;
$client->blocking(0);
$client->autoflush(1);
say 'Client connected, waiting to write...';
Acme::Parataxis->await_write($client);
$client->print("Hello from Parataxis!\n");
say 'Client waiting for response...';
Acme::Parataxis->await_read($client);
my $line = <$client>;
eg/socket.pl view on Meta::CPAN
my $res = Acme::Parataxis->await_read($server);
last if $client_done; # Check immediately after waking up
if ( $res > 0 ) {
my $conn = $server->accept();
if ($conn) {
say 'Server accepted connection!';
Acme::Parataxis->spawn(
sub {
$conn->blocking(0);
$conn->autoflush(1);
say ' Handler started for ' . fileno($conn);
Acme::Parataxis->await_read($conn);
my $line = <$conn>;
if ( defined $line ) {
chomp $line;
say ' Server echoing: ' . $line;
Acme::Parataxis->await_write($conn);
$conn->print("Echo: $line\n");
}
else {
say ' Server got EOF';
eg/stress_fibers.pl view on Meta::CPAN
use blib;
use Acme::Parataxis qw[:all];
use Time::HiRes qw[time];
# Stress Test 1: High Fiber Count & Rapid Lifecycle
# This tests the scheduler's ability to handle many short-lived fibers.
async {
my $total_fibers = 1000; # We increased MAX_FIBERS in the C code to 1024
my $iterations = 5;
say "Starting Stress Test: $total_fibers fibers, $iterations iterations...";
my $start_time = time();
for my $iter ( 1 .. $iterations ) {
my $iter_start = time();
my @futures;
# Spawn a batch of fibers
for my $i ( 1 .. $total_fibers ) {
push @futures, fiber {
# Do a tiny bit of work
my $val = 0;
$val += $_ for 1 .. 100;
no warnings 'recursion';
yield() if $i % 2 == 0; # Mix in some yields
return $val;
};
}
# Wait for all to finish
for my $f (@futures) {
await $f;
}
my $elapsed = time() - $iter_start;
say sprintf ' Iteration %2d completed in %.4fs', $iter, $elapsed;
}
my $total_elapsed = time() - $start_time;
say sprintf 'Total time for %d fibers across %d iterations: %.4fs', $total_fibers, $iterations, $total_elapsed;
say 'Average time per fiber lifecycle: ' . ( $total_elapsed / ( $total_fibers * $iterations ) ) . 's';
};
eg/stress_io.pl view on Meta::CPAN
use Time::HiRes qw[time];
$|++;
# Stress Test 2: Concurrent I/O (Background Thread Pool)
# This tests the ability to handle many simultaneous blocking background tasks.
async {
my $concurrency = 64; # Half of MAX_FIBERS
my $iterations = 5;
my $sleep_ms = 500;
say "Starting I/O Stress Test: $concurrency concurrent sleeps of ${sleep_ms}ms...";
my $start_time = time();
for my $iter ( 1 .. $iterations ) {
my $iter_start = time();
my @futures;
# Spawn concurrent sleep fibers
for my $i ( 1 .. $concurrency ) {
push @futures, fiber {
no warnings 'recursion';
# This offloads to the C-level background thread pool
await_sleep($sleep_ms);
return "Worker $i done";
};
}
# Wait for all to finish
for my $f (@futures) {
await $f;
}
my $elapsed = time() - $iter_start;
# Since they are concurrent and $concurrency > CPU count (usually),
# it should take roughly (ceil(concurrency / num_cpus) * sleep_ms)
say sprintf ' Iteration %2d completed in %.4fs', $iter, $elapsed;
}
my $total_elapsed = time() - $start_time;
say sprintf 'Total time for %d concurrent sleeps across %d iterations: %.4fs', $concurrency, $iterations, $total_elapsed;
};
eg/stress_preempt.pl view on Meta::CPAN
# This tests the integrity of the stack teleportation during deep recursion
# and the effectiveness of the preemption threshold.
# Configuration
my $recursion_depth = 50; # Not too deep to overflow 4MB C-stack, but deep enough to test pads
my $preempt_at = 100; # Operations before forced yield
my $num_fibers = 10; # Number of competing CPU-bound fibers
Acme::Parataxis::set_preempt_threshold($preempt_at);
async {
say 'Starting Preemption Stress Test...';
say "Recursion depth: $recursion_depth, Preempt threshold: $preempt_at, Fibers: $num_fibers";
my $start_time = time();
my @futures;
# Recursive function to stress pads and preemption
sub recursive_math( $depth_in, $fiber_id ) {
no warnings 'uninitialized';
my $depth = $depth_in; # Localize
return 1 if $depth <= 0;
# Do some CPU work and potential preemption
my $acc = 0;
eg/stress_preempt.pl view on Meta::CPAN
my $val = recursive_math( $recursion_depth, $i );
say " [Fiber $i] Done with result: $val";
return $val;
};
}
# Wait for all to finish
for my $f (@futures) {
await $f;
}
my $total_elapsed = time() - $start_time;
say sprintf 'Total time for %d competing recursive fibers: %.4fs', $num_fibers, $total_elapsed;
# Reset preemption threshold for the rest of the script/environment
Acme::Parataxis::set_preempt_threshold(0);
};
eg/symmetric.pl view on Meta::CPAN
}
say ' Consumer: Shutting down.';
Acme::Parataxis->root->transfer(); # Return to main
}
);
say 'Main: Starting the dance...';
# We need to prime the consumer so it hits its yield()
$consumer->call();
# Now start the producer
$producer->call();
say 'Main: The dance is over.';
eg/synopsis.pl view on Meta::CPAN
{
use v5.40;
use blib;
use Acme::Parataxis;
$|++;
# Basic usage with the integrated scheduler
Acme::Parataxis::run(
sub {
say 'Main fiber started (TID: ' . Acme::Parataxis->tid . ')';
# Spawn background workers
my $f1 = Acme::Parataxis->spawn(
sub {
say ' Task 1: Sleeping (non-blocking for others)...';
Acme::Parataxis->await_sleep(1000);
return 'Coffee is ready!';
}
);
my $f2 = Acme::Parataxis->spawn(
eg/thread_pool.pl view on Meta::CPAN
# Demonstrate dynamic thread pool growth and modern API
async {
say 'Main: Initial thread pool size is ' . Acme::Parataxis::get_thread_pool_size();
my @futures;
for my $id ( 1 .. 5 ) {
push @futures, fiber {
my $fid = current_fid();
my $sleep_time = int( rand(1000) ) + 500;
say " [Worker $id] Fiber #$fid will sleep for ${sleep_time}ms in background pool";
my $start = time();
# This yields to main, background thread sleeps, scheduler resumes us
await_sleep($sleep_time);
my $elapsed = int( ( time() - $start ) * 1000 );
say " [Worker $id] Fiber #$fid woke up after ${elapsed}ms!";
return "Done $id";
};
}
say 'Main: All workers spawned. Waiting for completion...';
say 'Main: Thread pool size during load: ' . Acme::Parataxis::get_thread_pool_size();
my $start_time = time();
# Wait for all workers to finish via futures using modern 'await'
await $_ for @futures;
say 'Main: All workers finished!';
printf "Total wallclock time: %.2fs\n", time() - $start_time;
say 'Main: Final thread pool size: ' . Acme::Parataxis::get_thread_pool_size();
};
eg/worker_pool.pl view on Meta::CPAN
sub {
my @jobs = (
{ id => 1, task => 'Fetch User Data', delay => 800 },
{ id => 2, task => 'Process Payment', delay => 1200 },
{ id => 3, task => 'Send Email', delay => 500 },
{ id => 4, task => 'Update Inventory', delay => 1500 },
{ id => 5, task => 'Generate Report', delay => 900 },
{ id => 6, task => 'Sync Analytics', delay => 1100 }
);
say 'Main: Starting worker pool with 3 fibers...';
my $start_time = time;
# Simple shared queue
my @queue = @jobs;
my @results;
# Spawn 3 worker fibers
my @workers;
for my $w_id ( 1 .. 3 ) {
push @workers, Acme::Parataxis->spawn(
sub {
say " [Worker $w_id] Fiber started.";
while ( my $job = shift @queue ) {
say " [Worker $w_id] Processing Job $job->{id}: $job->{task}...";
# Simulate a blocking I/O call (DB query, API request, idk...)
# This yields to the scheduler while the native thread pool sleeps.
Acme::Parataxis->await_sleep( $job->{delay} );
say " [Worker $w_id] Finished Job $job->{id}.";
push @results, "Job $job->{id} complete";
}
return "Worker $w_id finished.";
}
);
}
# Wait for all workers to complete
say 'Main: Waiting for pool to drain...';
$_->await for @workers;
my $elapsed = time() - $start_time;
my $sum_delays = 0;
$sum_delays += $_->{delay} for @jobs;
say 'Main: All tasks finished!';
say 'Total Results: ' . scalar(@results);
say sprintf 'Total Wallclock Time: %.2fs', $elapsed;
say sprintf 'Sum of individual delays: %.2fs', $sum_delays / 1000;
say sprintf 'Speedup: ~%.2fx', ( $sum_delays / 1000 ) / $elapsed;
}
);
lib/Acme/Parataxis.c view on Meta::CPAN
#endif
/*
* Perl Interpreter State Pointers.
* These must be saved and restored during every context switch.
*/
PERL_SI * si; /**< Current Stack Info (tracks recursion and eval frames) */
AV * curstack; /**< The active Argument Stack (AV*) */
SSize_t stack_sp_offset; /**< Stack Pointer offset from stack base */
I32 * markstack; /**< Base of the Mark Stack (tracks list start points) */
I32 * markstack_ptr; /**< Current pointer into the Mark Stack */
I32 * markstack_max; /**< Limit of the Mark Stack */
I32 * scopestack; /**< Base of the Scope Stack (tracks block nesting) */
I32 scopestack_ix; /**< Current index in the Scope Stack */
I32 scopestack_max; /**< Limit of the Scope Stack */
ANY * savestack; /**< Base of the Save Stack (tracks local/my variables for cleanup) */
I32 savestack_ix; /**< Current index in the Save Stack */
I32 savestack_max; /**< Limit of the Save Stack */
lib/Acme/Parataxis.c view on Meta::CPAN
if (idx < 0 || idx >= MAX_JOBS)
return;
LOCK(queue_lock);
job_slots[idx].status = JOB_FREE;
UNLOCK(queue_lock);
}
/**
* @brief Resets the call depth of a Perl CV to zero.
*
* Used to ensure that a newly created fiber starts its coderef with a
* clean execution state.
*
* @param cv_ref SV reference to the coderef.
*/
DLLEXPORT void force_depth_zero(SV * cv_ref) {
dTHX;
CV * cv = NULL;
if (SvROK(cv_ref))
cv = (CV *)SvRV(cv_ref);
else if (SvTYPE(cv_ref) == SVt_PVCV)
lib/Acme/Parataxis.c view on Meta::CPAN
return res;
}
/**
* @brief Entry point function for all new fibers.
*
* Sets up the Perl environment (ENTER/SAVETMPS), unpacks arguments,
* calls the user coderef, handles results/errors, and manages the
* fiber's completion lifecycle.
*
* @param c Pointer to the fiber context being started.
*/
static void entry_point(para_fiber_t * c) {
dTHX;
ENTER;
SAVETMPS;
dSP;
PUSHMARK(SP);
/* Unpack arguments passed during coro_call */
if (c->transfer_data && SvROK(c->transfer_data) && SvTYPE(SvRV(c->transfer_data)) == SVt_PVAV) {
lib/Acme/Parataxis.pod view on Meta::CPAN
=head1 SYNOPSIS
The classic way (as I write this, Acme::Parataxis is 5 days old and already has a 'classic' API...)
use v5.40;
use Acme::Parataxis;
$|++;
Acme::Parataxis::run(
sub {
say 'Main task started';
# Spawn background workers
my $f1 = Acme::Parataxis->spawn(
sub {
say ' Task 1: Sleeping in a native thread pool...';
Acme::Parataxis->await_sleep(1000);
say ' Task 1: Ah! What a nice nap...';
return 42;
}
);
lib/Acme/Parataxis.pod view on Meta::CPAN
}
);
Or do things the more modern way:
use v5.40;
use Acme::Parataxis qw[:all];
$|++;
async {
say 'Main task started';
# 'fiber' is a shorter alias for 'spawn'
my $f1 = fiber {
say ' Task 1: Sleeping...';
await_sleep(1000);
return 42;
};
my $f2 = fiber {
say ' Task 2: Performing I/O...';
lib/Acme/Parataxis.pod view on Meta::CPAN
Close the browser and clear your history before this does further harm!
=head1 MODERN API
While the classic object-oriented API is always available, C<Acme::Parataxis> exports a set of functions (via the
C<:all> tag) that provide a more modern, concise way to write concurrent code.
=head2 C<async { ... }>
A convenience wrapper around C<run( )>. It starts the scheduler, executes the provided block as the main fiber, and
automatically calls C<stop( )> when the block completes.
async {
say "The scheduler is running!";
};
=head2 C<fiber { ... }>
An alias for C<spawn( )>. It creates a new fiber and returns a L<Future|/"Acme::Parataxis::Future OBJECT METHODS">.
lib/Acme/Parataxis.pod view on Meta::CPAN
async {
my $core = await_core_id( );
say "Background task handled by CPU core: $core";
};
=head1 CORE CONCEPTS
=head2 Creating Fibers
All Perl code in this system runs within a fiber. When you start your script or call C<Acme::Parataxis::run>, a "main"
fiber is active. You can create new fibers using C<spawn> or by manually instantiating an C<Acme::Parataxis> object:
my $fiber = Acme::Parataxis->new(code => sub {
say "I'm in a fiber!";
});
Creating a fiber does not run it immediately. It simply prepares the context and waits to be invoked.
=head2 Invoking Fibers
lib/Acme/Parataxis.pod view on Meta::CPAN
});
=head2 C<yield( @args )>
Pauses the current fiber and returns control to the scheduler. If C<@args> are provided, they are passed to the context
that next resumes this fiber. Arguments can be of any Perl data type.
=head2 C<stop( )>
Tells the scheduler to exit the loop after the current iteration. Note that this does not immediately terminate other
fibers; it simply prevents the scheduler from starting new ones.
=head1 THREAD POOL CONFIGURATION
C<Acme::Parataxis> uses a native thread pool to handle blocking tasks. While it manages itself automatically, you can
tune its behavior using these functions.
=head2 C<set_max_threads( $count )>
Sets the maximum number of worker threads the pool is allowed to spawn. By default, this is set to the number of
logical CPU cores detected on your system (up to a hard limit of 64).
lib/Acme/Parataxis.pod view on Meta::CPAN
return $res;
}
}
{
package My::HTTP::Handle;
use parent -norequire, 'HTTP::Tiny::Handle';
use Time::HiRes qw[time];
sub _do_timeout {
my ($self, $type, $timeout) = @_;
$timeout //= $self->{timeout} // 60;
my $start = time;
while (1) {
# Check for readiness NOW (0 timeout)
return 1 if $self->SUPER::_do_timeout($type, 0);
# Check for overall timeout
my $elapsed = time - $start;
return 0 if $elapsed > $timeout;
# Suspend fiber and wait for background I/O check
my $wait = ($timeout - $elapsed) > 0.5 ? 0.5 : ($timeout - $elapsed);
if ($type eq 'read') {
Acme::Parataxis->await_read($self->{fh}, int($wait * 1000));
} else {
Acme::Parataxis->await_write($self->{fh}, int($wait * 1000));
}
}
}
lib/Acme/Parataxis.pod view on Meta::CPAN
Always use the C<await_*> equivalents to offload work to the pool.
=item * B<Thread Safety:> While Perl code remains single-threaded, background tasks run on separate OS threads. Shared
C-level data (if accessed via FFI) must be mutex-protected.
=item * B<Stack Limits:> Each fiber is allocated a 512KB stack by default. This is more than sufficient for most
Perl code and allows for high concurrency with a small memory footprint. Extremely deep recursion or massive regex
backtracking might still hit limits.
=item * B<Efficiency:> The native thread pool is initialized dynamically upon the first asynchronous request. It
starts with a small "seed" pool and grows on demand up to the configured limit. Worker threads use condition
variables to sleep efficiently when idle, ensuring near-zero CPU usage when no background tasks are pending.
=item * B<Reference Cycles:> Be careful when passing fiber objects into their own closures, as this can create
memory leaks.
=back
=head1 GORY TECHNICAL DETAILS
=head2 Architectural Inspiration
t/004_synopsis.t view on Meta::CPAN
use v5.40;
use Test2::V1 -ipP;
use blib;
use Acme::Parataxis;
$|++;
# This is the synopsis for Acme::Parataxis but verbose
Acme::Parataxis::run(
sub {
diag 'Main fiber started (FID: ' . Acme::Parataxis->current_fid . ')';
# Spawn background workers
diag 'Spawning Task 1 (Sleep)...';
my $f1 = Acme::Parataxis->spawn(
sub {
diag 'Task 1 started (FID: ' . Acme::Parataxis->current_fid . ')';
diag 'Task 1: Sleeping in a native thread pool (1000ms)...';
Acme::Parataxis->await_sleep(1000);
diag 'Task 1: Ah! What a nice nap...';
return 42;
}
);
diag 'Spawning Task 2 (I/O dummy)...';
my $f2 = Acme::Parataxis->spawn(
sub {
diag 'Task 2 started (FID: ' . Acme::Parataxis->current_fid . ')';
diag 'Task 2: Performing I/O simulation...';
# await_read/write for non-blocking socket handling
return 'I/O Done';
}
);
# Block current fiber until results are ready (without blocking the thread)
diag 'Main: Waiting for Task 1 result...';
my $res1 = $f1->await();
t/006_parallel.t view on Meta::CPAN
use v5.40;
use Test2::V1 -ipP;
use blib;
use Acme::Parataxis;
use Time::HiRes qw[time];
$|++;
Acme::Parataxis::run(
sub {
my $start_time = time();
diag "Starting parallel sleeps at $start_time...";
# Check thread pool size
my $pool_size = Acme::Parataxis::get_thread_pool_size();
diag "Detected thread pool size: $pool_size";
# Spawn only as many tasks as we have threads (up to 3) to ensure they run in parallel.
# If pool size is 2, running 3 tasks takes 2 seconds, failing the 1.8s test.
my $num_tasks = $pool_size;
$num_tasks = 3 if $num_tasks > 3;
$num_tasks = 1 if $num_tasks < 1;
diag "Spawning $num_tasks parallel tasks...";
my @futures;
for my $i ( 1 .. $num_tasks ) {
push @futures, Acme::Parataxis->spawn(
sub {
my $id = $i; # Closure capture
diag "Fiber $id started (FID: " . Acme::Parataxis->current_fid . ')';
diag "Fiber $id: Sleeping for 1000ms...";
Acme::Parataxis->await_sleep(1000);
diag "Fiber $id: Woke up!";
return $id;
}
);
}
# Wait for all
diag "Main: Waiting for $num_tasks fibers to finish...";
my @results;
push @results, $_->await() for @futures;
my $elapsed = time() - $start_time;
diag "Total wallclock time: $elapsed seconds";
diag 'Results: ' . join( ', ', @results );
my $expected = [ 1 .. $num_tasks ];
is \@results, $expected, 'Fibers returned correct individual results';
# If we have at least 1 task, it should take ~1s.
ok $elapsed < 1.8, "$num_tasks tasks ran in parallel (elapsed < 1.8s)";
}
);
done_testing();
t/009_http_tiny.t view on Meta::CPAN
}
{
package Acme::Parataxis::Test::HTTP::Handle;
use parent -norequire, 'HTTP::Tiny::Handle';
sub _do_timeout {
my ( $self, $type, $timeout ) = @_;
$timeout //= $self->{timeout};
if ( $self->{fh} ) {
my $start = time();
while (1) {
# Immediate check using original select (0 timeout)
return 1 if $self->SUPER::_do_timeout( $type, 0 );
# Check for overall timeout
return 0 if ( time() - $start ) > $timeout;
# Suspend fiber and wait for background I/O check.
# await_* submits a job and yields 'WAITING'.
if ( $type eq 'read' ) {
Acme::Parataxis->await_read( $self->{fh}, 500 );
}
else {
Acme::Parataxis->await_write( $self->{fh}, 500 );
}
}
t/013_real_http.t view on Meta::CPAN
}
{
package Acme::Parataxis::Test::RealHTTP::Handle;
use parent -norequire, 'HTTP::Tiny::Handle';
sub _do_timeout {
my ( $self, $type, $timeout ) = @_;
$timeout //= $self->{timeout} // 60;
if ( $self->{fh} ) {
my $start = time();
while (1) {
return 1 if $self->SUPER::_do_timeout( $type, 0 );
my $elapsed = time() - $start;
return 0 if $elapsed > $timeout;
my $wait = ( $timeout - $elapsed ) > 0.5 ? 0.5 : ( $timeout - $elapsed );
if ( $type eq 'read' ) {
Acme::Parataxis->await_read( $self->{fh}, int( $wait * 1000 ) );
}
else {
Acme::Parataxis->await_write( $self->{fh}, int( $wait * 1000 ) );
}
}
}
t/014_http_pool.t view on Meta::CPAN
}
{
package Acme::Parataxis::Test::PoolHTTP::Handle;
use parent -norequire, 'HTTP::Tiny::Handle';
sub _do_timeout {
my ( $self, $type, $timeout ) = @_;
$timeout //= $self->{timeout} // 60;
if ( $self->{fh} ) {
my $start = time();
while (1) {
return 1 if $self->SUPER::_do_timeout( $type, 0 );
my $elapsed = time() - $start;
return 0 if $elapsed > $timeout;
my $wait = ( $timeout - $elapsed ) > 0.5 ? 0.5 : ( $timeout - $elapsed );
if ( $type eq 'read' ) {
Acme::Parataxis->await_read( $self->{fh}, int( $wait * 1000 ) );
}
else {
Acme::Parataxis->await_write( $self->{fh}, int( $wait * 1000 ) );
}
}
}
t/015_http_mock_pool.t view on Meta::CPAN
}
}
package Acme::Parataxis::Test::MockPoolHTTP::Handle {
use parent -norequire, 'HTTP::Tiny::Handle';
sub _do_timeout {
my ( $self, $type, $timeout ) = @_;
$timeout //= $self->{timeout} // 60;
if ( $self->{fh} ) {
my $start = time();
while (1) {
return 1 if $self->SUPER::_do_timeout( $type, 0 );
my $elapsed = time() - $start;
return 0 if $elapsed > $timeout;
my $wait = ( $timeout - $elapsed ) > 0.5 ? 0.5 : ( $timeout - $elapsed );
if ( $type eq 'read' ) {
Acme::Parataxis->await_read( $self->{fh}, int( $wait * 1000 ) );
}
else {
Acme::Parataxis->await_write( $self->{fh}, int( $wait * 1000 ) );
}
}
}