view release on metacpan or search on metacpan
```perl
use v5.40;
use Acme::Parataxis;
$|++;
Acme::Parataxis::run(
sub {
say 'Main task started';
# Spawn background workers
my $f1 = Acme::Parataxis->spawn(
sub {
say ' Task 1: Sleeping in a native thread pool...';
Acme::Parataxis->await_sleep(1000);
say ' Task 1: Ah! What a nice nap...';
return 42;
}
);
my $f2 = Acme::Parataxis->spawn(
sub {
```
async {
await_write($socket);
syswrite($socket, $message);
};
```
## `await_core_id( )`
Returns the ID of the CPU core currently executing the background task. This is a non-blocking operation that offloads
the request to the thread pool and suspends the fiber until the result is ready.
```perl
async {
my $core = await_core_id( );
say "Background task handled by CPU core: $core";
};
```
# CORE CONCEPTS
current fiber is suspended, and the actual blocking work is performed on a **different** OS thread in a native pool.
Once the task completes, your fiber is automatically queued for resumption on the main thread.
# SCHEDULER FUNCTIONS
The following functions are the primary interface for the integrated cooperative scheduler.
## `run( $code )`
Starts the event loop and executes `$code` as the initial fiber. The loop continues to run as long as there are active
fibers or pending background tasks.
```perl
Acme::Parataxis::run(sub {
say "The scheduler is running!";
});
```
## `spawn( $code )`
Creates a new fiber and adds it to the scheduler's queue. Returns a [Future](#acme-parataxis-future-object-methods)
## `max_threads( )`
Returns the currently configured maximum thread pool size.
# BLOCKING & I/O FUNCTIONS
These functions **suspend** the current fiber and offload the actual blocking work to the native thread pool.
## `await_sleep( $ms )`
Suspends the fiber for `$ms` milliseconds. While the background thread sleeps, other fibers can continue to execute.
## `await_read( $fh, $timeout = 5000 )`
Suspends the fiber until the filehandle `$fh` is ready for reading, or the `$timeout` (in milliseconds) is reached.
```perl
my $status = Acme::Parataxis->await_read($socket);
if ($status > 0) {
my $data = <$socket>;
}
sub _do_timeout {
my ($self, $type, $timeout) = @_;
$timeout //= $self->{timeout} // 60;
my $start = time;
while (1) {
# Check for readiness NOW (0 timeout)
return 1 if $self->SUPER::_do_timeout($type, 0);
# Check for overall timeout
my $elapsed = time - $start;
return 0 if $elapsed > $timeout;
# Suspend fiber and wait for background I/O check
my $wait = ($timeout - $elapsed) > 0.5 ? 0.5 : ($timeout - $elapsed);
if ($type eq 'read') {
Acme::Parataxis->await_read($self->{fh}, int($wait * 1000));
} else {
Acme::Parataxis->await_write($self->{fh}, int($wait * 1000));
}
}
}
}
```
});
$c->call( ); # Prime consumer
$p->call( ); # Start producer
```
# BEST PRACTICES & GOTCHAS
- **Avoid blocking syscalls:** Never call blocking `sleep( )` or `sysread( )` on the main interpretation thread.
Always use the `await_*` equivalents to offload work to the pool.
- **Thread Safety:** While Perl code remains single-threaded, background tasks run on separate OS threads. Shared
C-level data (if accessed via FFI) must be mutex-protected.
- **Stack Limits:** Each fiber is allocated a 512KB stack by default. This is more than sufficient for most
Perl code and allows for high concurrency with a small memory footprint. Extremely deep recursion or massive regex
backtracking might still hit limits.
- **Efficiency:** The native thread pool is initialized dynamically upon the first asynchronous request. It
starts with a small "seed" pool and grows on demand up to the configured limit. Worker threads use condition
variables to sleep efficiently when idle, ensuring near-zero CPU usage when no background tasks are pending.
- **Reference Cycles:** Be careful when passing fiber objects into their own closures, as this can create
memory leaks.
# GORY TECHNICAL DETAILS
## Architectural Inspiration
The concurrency model in Parataxis is heavily inspired by the **Wren** programming language, specifically its treatment
of fibers as the primary unit of execution and its deterministic cooperative scheduling.
eg/http_tiny.pl view on Meta::CPAN
my $start = time;
while (1) {
# Check for readiness NOW (0 timeout)
return 1 if $self->SUPER::_do_timeout( $type, 0 );
# Check for overall timeout
my $elapsed = time() - $start;
return 0 if $elapsed > $timeout;
# Suspend fiber and wait for background I/O check.
# This is where the magic happens: the fiber yields control back
# to the scheduler while waiting for the socket to be ready.
my $wait = ( $timeout - $elapsed ) > 0.5 ? 0.5 : ( $timeout - $elapsed );
if ( $type eq 'read' ) {
await_read( $self->{fh}, int( $wait * 1000 ) );
}
else {
await_write( $self->{fh}, int( $wait * 1000 ) );
}
}
eg/stress_io.pl view on Meta::CPAN
use v5.40;
use blib;
use Acme::Parataxis qw[:all];
use Time::HiRes qw[time];
$|++;
# Stress Test 2: Concurrent I/O (Background Thread Pool)
# This tests the ability to handle many simultaneous blocking background tasks.
async {
my $concurrency = 64; # Half of MAX_FIBERS
my $iterations = 5;
my $sleep_ms = 500;
say "Starting I/O Stress Test: $concurrency concurrent sleeps of ${sleep_ms}ms...";
my $start_time = time();
for my $iter ( 1 .. $iterations ) {
my $iter_start = time();
my @futures;
# Spawn concurrent sleep fibers
for my $i ( 1 .. $concurrency ) {
push @futures, fiber {
no warnings 'recursion';
# This offloads to the C-level background thread pool
await_sleep($sleep_ms);
return "Worker $i done";
};
}
# Wait for all to finish
for my $f (@futures) {
await $f;
}
my $elapsed = time() - $iter_start;
eg/synopsis.pl view on Meta::CPAN
use v5.40;
use blib;
use Acme::Parataxis;
$|++;
# Basic usage with the integrated scheduler
Acme::Parataxis::run(
sub {
say 'Main fiber started (TID: ' . Acme::Parataxis->tid . ')';
# Spawn background workers
my $f1 = Acme::Parataxis->spawn(
sub {
say ' Task 1: Sleeping (non-blocking for others)...';
Acme::Parataxis->await_sleep(1000);
return 'Coffee is ready!';
}
);
my $f2 = Acme::Parataxis->spawn(
sub {
say ' Task 2: Calculating... (simulated CPU work)';
eg/thread_pool.pl view on Meta::CPAN
$|++;
# Demonstrate dynamic thread pool growth and modern API
async {
say 'Main: Initial thread pool size is ' . Acme::Parataxis::get_thread_pool_size();
my @futures;
for my $id ( 1 .. 5 ) {
push @futures, fiber {
my $fid = current_fid();
my $sleep_time = int( rand(1000) ) + 500;
say " [Worker $id] Fiber #$fid will sleep for ${sleep_time}ms in background pool";
my $start = time();
# This yields to main, background thread sleeps, scheduler resumes us
await_sleep($sleep_time);
my $elapsed = int( ( time() - $start ) * 1000 );
say " [Worker $id] Fiber #$fid woke up after ${elapsed}ms!";
return "Done $id";
};
}
say 'Main: All workers spawned. Waiting for completion...';
say 'Main: Thread pool size during load: ' . Acme::Parataxis::get_thread_pool_size();
my $start_time = time();
lib/Acme/Parataxis.c view on Meta::CPAN
/**
* @file Parataxis.c
* @brief Low-level Green Threads (Fibers) and Hybrid Thread Pool for Perl.
*
* @section Overview
* This file implements a cooperative multitasking system (Fibers) integrated
* with a preemptive native thread pool. It allows Perl to run thousands of
* user-mode fibers that can offload blocking C-level tasks to background
* OS threads without stalling the main interpreter.
*
* @section Architecture
* - **Fibers**: The primitive unit of execution. Each fiber has its own OS context
* and a complete set of Perl interpreter stacks (Argument, Mark, Scope, Save, Mortal).
* - **Coroutines**: The execution pattern (yield/call/transfer) used by fibers to
* pass control.
* - **Thread Pool**: A fixed pool of worker threads that poll a job queue for
* blocking operations like sleep, I/O, or heavy computation.
* - **Context Switching**: The `swap_perl_state` function manually saves and restores
lib/Acme/Parataxis.c view on Meta::CPAN
#include <string.h>
// Forward declarations
DLLEXPORT SV * coro_yield(SV * ret_val);
DLLEXPORT SV * coro_transfer(int fiber_id, SV * args);
DLLEXPORT void destroy_coro(int fiber_id);
/**
* @brief Get the Operating System's unique Thread ID.
*
* Useful for debugging to prove that background tasks are running on
* different OS threads than the main Perl interpreter.
*
* @return int The TID (Windows) or LWP ID (Linux/BSD/macOS).
*/
int get_os_thread_id() {
#ifdef _WIN32
return (int)GetCurrentThreadId();
#elif defined(__APPLE__)
uint64_t tid;
pthread_threadid_np(NULL, &tid);
lib/Acme/Parataxis.c view on Meta::CPAN
* @brief Generic container for task input/output data.
*/
typedef union {
int64_t i; /**< Integer/Pointer storage */
double d; /**< Floating point storage */
char * s; /**< String storage */
} value_t;
/**
* @struct job_t
* @brief Represents a task in the background thread pool queue.
*/
typedef struct {
int fiber_id; /**< ID of the Fiber that submitted this task */
int target_thread; /**< Index of the assigned worker thread */
int type; /**< Type of task to perform (TASK_*) */
value_t input; /**< Input data for the task */
value_t output; /**< Result data populated by the worker */
int timeout_ms; /**< Timeout duration for I/O tasks */
int status; /**< Current lifecycle state (JOB_*) */
} job_t;
lib/Acme/Parataxis.c view on Meta::CPAN
/** @brief Maximum number of concurrent fibers allowed */
#define MAX_FIBERS 1024
/** @brief Array of active fiber structures */
static para_fiber_t * fibers[MAX_FIBERS];
/** @brief The context representing the main Perl thread */
static para_fiber_t main_context;
/** @brief ID of the currently executing fiber (-1 for Main) */
static int current_fiber_id = -1;
/** @brief Size of the background job queue */
#define MAX_JOBS 1024
/** @brief Fixed-size array for background tasks */
static job_t job_slots[MAX_JOBS];
/** @brief Mutex protecting access to the job queue */
static para_mutex_t queue_lock;
#ifdef _WIN32
static CONDITION_VARIABLE queue_cond;
#else
static pthread_cond_t queue_cond;
#endif
lib/Acme/Parataxis.c view on Meta::CPAN
Sleep(1);
#else
usleep(1000);
#endif
}
}
return 0;
}
/**
* @brief Initializes the background thread pool.
*
* Automatically detects the CPU count and spawns worker threads. This function
* is called automatically by `init_system` and `submit_c_job`.
*/
DLLEXPORT void init_threads() {
dTHX;
if (threads_initialized)
return;
LOCK_INIT(queue_lock);
PARA_COND_INIT(queue_cond);
lib/Acme/Parataxis.c view on Meta::CPAN
max_thread_pool_size = MAX_THREADS;
}
/* Start with a small "seed" pool of 2 threads */
_spawn_workers(2);
threads_initialized = 1;
}
/**
* @brief Submits a C-level task to the background pool.
*
* @param type The task type constant (TASK_*).
* @param arg Input integer or pointer data.
* @param timeout_ms Timeout for I/O operations.
* @return int The index of the submitted job, or -1 if the queue is full.
*/
DLLEXPORT int submit_c_job(int type, int64_t arg, int timeout_ms) {
if (!threads_initialized)
init_threads();
int idx = -1;
lib/Acme/Parataxis.c view on Meta::CPAN
job_slots[idx].input.i = arg;
job_slots[idx].timeout_ms = timeout_ms;
job_slots[idx].status = JOB_NEW;
PARA_COND_SIGNAL(queue_cond);
}
UNLOCK(queue_lock);
return idx;
}
/**
* @brief Polls the queue for any completed background jobs.
*
* @return int Index of a finished job, or -1 if none are ready.
*/
DLLEXPORT int check_for_completion() {
if (!threads_initialized)
init_threads();
int job_idx = -1;
LOCK(queue_lock);
for (int i = 0; i < MAX_JOBS; i++) {
if (job_slots[i].status == JOB_DONE) {
lib/Acme/Parataxis.pod view on Meta::CPAN
The classic way (as I write this, Acme::Parataxis is 5 days old and already has a 'classic' API...)
use v5.40;
use Acme::Parataxis;
$|++;
Acme::Parataxis::run(
sub {
say 'Main task started';
# Spawn background workers
my $f1 = Acme::Parataxis->spawn(
sub {
say ' Task 1: Sleeping in a native thread pool...';
Acme::Parataxis->await_sleep(1000);
say ' Task 1: Ah! What a nice nap...';
return 42;
}
);
my $f2 = Acme::Parataxis->spawn(
sub {
lib/Acme/Parataxis.pod view on Meta::CPAN
Suspends the current fiber until the provided filehandle is ready for writing, or the timeout is reached.
async {
await_write($socket);
syswrite($socket, $message);
};
=head2 C<await_core_id( )>
Returns the ID of the CPU core currently executing the background task. This is a non-blocking operation that offloads
the request to the thread pool and suspends the fiber until the result is ready.
async {
my $core = await_core_id( );
say "Background task handled by CPU core: $core";
};
=head1 CORE CONCEPTS
=head2 Creating Fibers
lib/Acme/Parataxis.pod view on Meta::CPAN
current fiber is suspended, and the actual blocking work is performed on a B<different> OS thread in a native pool.
Once the task completes, your fiber is automatically queued for resumption on the main thread.
=head1 SCHEDULER FUNCTIONS
The following functions are the primary interface for the integrated cooperative scheduler.
=head2 C<run( $code )>
Starts the event loop and executes C<$code> as the initial fiber. The loop continues to run as long as there are active
fibers or pending background tasks.
Acme::Parataxis::run(sub {
say "The scheduler is running!";
});
=head2 C<spawn( $code )>
Creates a new fiber and adds it to the scheduler's queue. Returns a L<Future|/"Acme::Parataxis::Future OBJECT METHODS">
that will eventually contain the fiber's return value.
lib/Acme/Parataxis.pod view on Meta::CPAN
=head2 C<max_threads( )>
Returns the currently configured maximum thread pool size.
=head1 BLOCKING & I/O FUNCTIONS
These functions B<suspend> the current fiber and offload the actual blocking work to the native thread pool.
=head2 C<await_sleep( $ms )>
Suspends the fiber for C<$ms> milliseconds. While the background thread sleeps, other fibers can continue to execute.
=head2 C<await_read( $fh, $timeout = 5000 )>
Suspends the fiber until the filehandle C<$fh> is ready for reading, or the C<$timeout> (in milliseconds) is reached.
my $status = Acme::Parataxis->await_read($socket);
if ($status > 0) {
my $data = <$socket>;
}
lib/Acme/Parataxis.pod view on Meta::CPAN
sub _do_timeout {
my ($self, $type, $timeout) = @_;
$timeout //= $self->{timeout} // 60;
my $start = time;
while (1) {
# Check for readiness NOW (0 timeout)
return 1 if $self->SUPER::_do_timeout($type, 0);
# Check for overall timeout
my $elapsed = time - $start;
return 0 if $elapsed > $timeout;
# Suspend fiber and wait for background I/O check
my $wait = ($timeout - $elapsed) > 0.5 ? 0.5 : ($timeout - $elapsed);
if ($type eq 'read') {
Acme::Parataxis->await_read($self->{fh}, int($wait * 1000));
} else {
Acme::Parataxis->await_write($self->{fh}, int($wait * 1000));
}
}
}
}
lib/Acme/Parataxis.pod view on Meta::CPAN
$c->call( ); # Prime consumer
$p->call( ); # Start producer
=head1 BEST PRACTICES & GOTCHAS
=over
=item * B<Avoid blocking syscalls:> Never call blocking C<sleep( )> or C<sysread( )> on the main interpretation thread.
Always use the C<await_*> equivalents to offload work to the pool.
=item * B<Thread Safety:> While Perl code remains single-threaded, background tasks run on separate OS threads. Shared
C-level data (if accessed via FFI) must be mutex-protected.
=item * B<Stack Limits:> Each fiber is allocated a 512KB stack by default. This is more than sufficient for most
Perl code and allows for high concurrency with a small memory footprint. Extremely deep recursion or massive regex
backtracking might still hit limits.
=item * B<Efficiency:> The native thread pool is initialized dynamically upon the first asynchronous request. It
starts with a small "seed" pool and grows on demand up to the configured limit. Worker threads use condition
variables to sleep efficiently when idle, ensuring near-zero CPU usage when no background tasks are pending.
=item * B<Reference Cycles:> Be careful when passing fiber objects into their own closures, as this can create
memory leaks.
=back
=head1 GORY TECHNICAL DETAILS
=head2 Architectural Inspiration
t/004_synopsis.t view on Meta::CPAN
use Test2::V1 -ipP;
use blib;
use Acme::Parataxis;
$|++;
# This is the synopsis for Acme::Parataxis but verbose
Acme::Parataxis::run(
sub {
diag 'Main fiber started (FID: ' . Acme::Parataxis->current_fid . ')';
# Spawn background workers
diag 'Spawning Task 1 (Sleep)...';
my $f1 = Acme::Parataxis->spawn(
sub {
diag 'Task 1 started (FID: ' . Acme::Parataxis->current_fid . ')';
diag 'Task 1: Sleeping in a native thread pool (1000ms)...';
Acme::Parataxis->await_sleep(1000);
diag 'Task 1: Ah! What a nice nap...';
return 42;
}
);
t/009_http_tiny.t view on Meta::CPAN
if ( $self->{fh} ) {
my $start = time();
while (1) {
# Immediate check using original select (0 timeout)
return 1 if $self->SUPER::_do_timeout( $type, 0 );
# Check for overall timeout
return 0 if ( time() - $start ) > $timeout;
# Suspend fiber and wait for background I/O check.
# await_* submits a job and yields 'WAITING'.
if ( $type eq 'read' ) {
Acme::Parataxis->await_read( $self->{fh}, 500 );
}
else {
Acme::Parataxis->await_write( $self->{fh}, 500 );
}
}
}
return $self->SUPER::_do_timeout( $type, 0 );