Acme-Parataxis

 view release on metacpan or  search on metacpan

README.md  view on Meta::CPAN


```perl
use v5.40;
use Acme::Parataxis;
$|++;

Acme::Parataxis::run(
    sub {
        say 'Main task started';

        # Spawn background workers
        my $f1 = Acme::Parataxis->spawn(
            sub {
                say '  Task 1: Sleeping in a native thread pool...';
                Acme::Parataxis->await_sleep(1000);
                say '  Task 1: Ah! What a nice nap...';
                return 42;
            }
        );
        my $f2 = Acme::Parataxis->spawn(
            sub {

README.md  view on Meta::CPAN


```
async {
    await_write($socket);
    syswrite($socket, $message);
};
```

## `await_core_id( )`

Returns the ID of the CPU core currently executing the background task. This is a non-blocking operation that offloads
the request to the thread pool and suspends the fiber until the result is ready.

```perl
async {
    my $core = await_core_id( );
    say "Background task handled by CPU core: $core";
};
```

# CORE CONCEPTS

README.md  view on Meta::CPAN

current fiber is suspended, and the actual blocking work is performed on a **different** OS thread in a native pool.
Once the task completes, your fiber is automatically queued for resumption on the main thread.

# SCHEDULER FUNCTIONS

The following functions are the primary interface for the integrated cooperative scheduler.

## `run( $code )`

Starts the event loop and executes `$code` as the initial fiber. The loop continues to run as long as there are active
fibers or pending background tasks.

```perl
Acme::Parataxis::run(sub {
    say "The scheduler is running!";
});
```

## `spawn( $code )`

Creates a new fiber and adds it to the scheduler's queue. Returns a [Future](#acme-parataxis-future-object-methods)

README.md  view on Meta::CPAN

## `max_threads( )`

Returns the currently configured maximum thread pool size.

# BLOCKING & I/O FUNCTIONS

These functions **suspend** the current fiber and offload the actual blocking work to the native thread pool.

## `await_sleep( $ms )`

Suspends the fiber for `$ms` milliseconds. While the background thread sleeps, other fibers can continue to execute.

## `await_read( $fh, $timeout = 5000 )`

Suspends the fiber until the filehandle `$fh` is ready for reading, or the `$timeout` (in milliseconds) is reached.

```perl
my $status = Acme::Parataxis->await_read($socket);
if ($status > 0) {
    my $data = <$socket>;
}

README.md  view on Meta::CPAN

    sub _do_timeout {
        my ($self, $type, $timeout) = @_;
        $timeout //= $self->{timeout} // 60;
        my $start = time;
        while (1) {
            # Check for readiness NOW (0 timeout)
            return 1 if $self->SUPER::_do_timeout($type, 0);
            # Check for overall timeout
            my $elapsed = time - $start;
            return 0 if $elapsed > $timeout;
            # Suspend fiber and wait for background I/O check
            my $wait = ($timeout - $elapsed) > 0.5 ? 0.5 : ($timeout - $elapsed);
            if ($type eq 'read') {
                Acme::Parataxis->await_read($self->{fh}, int($wait * 1000));
            } else {
                Acme::Parataxis->await_write($self->{fh}, int($wait * 1000));
            }
        }
    }
}
```

README.md  view on Meta::CPAN

});

$c->call( ); # Prime consumer
$p->call( ); # Start producer
```

# BEST PRACTICES & GOTCHAS

- **Avoid blocking syscalls:** Never call blocking `sleep( )` or `sysread( )` on the main interpretation thread.
Always use the `await_*` equivalents to offload work to the pool.
- **Thread Safety:** While Perl code remains single-threaded, background tasks run on separate OS threads. Shared
C-level data (if accessed via FFI) must be mutex-protected.
- **Stack Limits:** Each fiber is allocated a 512KB stack by default. This is more than sufficient for most
Perl code and allows for high concurrency with a small memory footprint. Extremely deep recursion or massive regex
backtracking might still hit limits.
- **Efficiency:** The native thread pool is initialized dynamically upon the first asynchronous request. It
starts with a small "seed" pool and grows on demand up to the configured limit. Worker threads use condition
variables to sleep efficiently when idle, ensuring near-zero CPU usage when no background tasks are pending.
- **Reference Cycles:** Be careful when passing fiber objects into their own closures, as this can create
memory leaks.

# GORY TECHNICAL DETAILS

## Architectural Inspiration

The concurrency model in Parataxis is heavily inspired by the **Wren** programming language, specifically its treatment
of fibers as the primary unit of execution and its deterministic cooperative scheduling.

eg/http_tiny.pl  view on Meta::CPAN

            my $start = time;
            while (1) {

                # Check for readiness NOW (0 timeout)
                return 1 if $self->SUPER::_do_timeout( $type, 0 );

                # Check for overall timeout
                my $elapsed = time() - $start;
                return 0 if $elapsed > $timeout;

                # Suspend fiber and wait for background I/O check.
                # This is where the magic happens: the fiber yields control back
                # to the scheduler while waiting for the socket to be ready.
                my $wait = ( $timeout - $elapsed ) > 0.5 ? 0.5 : ( $timeout - $elapsed );
                if ( $type eq 'read' ) {
                    await_read( $self->{fh}, int( $wait * 1000 ) );
                }
                else {
                    await_write( $self->{fh}, int( $wait * 1000 ) );
                }
            }

eg/stress_io.pl  view on Meta::CPAN

use v5.40;
use blib;
use Acme::Parataxis qw[:all];
use Time::HiRes     qw[time];
$|++;

# Stress Test 2: Concurrent I/O (Background Thread Pool)
# This tests the ability to handle many simultaneous blocking background tasks.
async {
    my $concurrency = 64;    # Half of MAX_FIBERS
    my $iterations  = 5;
    my $sleep_ms    = 500;
    say "Starting I/O Stress Test: $concurrency concurrent sleeps of ${sleep_ms}ms...";
    my $start_time = time();
    for my $iter ( 1 .. $iterations ) {
        my $iter_start = time();
        my @futures;

        # Spawn concurrent sleep fibers
        for my $i ( 1 .. $concurrency ) {
            push @futures, fiber {
                no warnings 'recursion';

                # This offloads to the C-level background thread pool
                await_sleep($sleep_ms);
                return "Worker $i done";
            };
        }

        # Wait for all to finish
        for my $f (@futures) {
            await $f;
        }
        my $elapsed = time() - $iter_start;

eg/synopsis.pl  view on Meta::CPAN

    use v5.40;
    use blib;
    use Acme::Parataxis;
    $|++;

    # Basic usage with the integrated scheduler
    Acme::Parataxis::run(
        sub {
            say 'Main fiber started (TID: ' . Acme::Parataxis->tid . ')';

            # Spawn background workers
            my $f1 = Acme::Parataxis->spawn(
                sub {
                    say '  Task 1: Sleeping (non-blocking for others)...';
                    Acme::Parataxis->await_sleep(1000);
                    return 'Coffee is ready!';
                }
            );
            my $f2 = Acme::Parataxis->spawn(
                sub {
                    say '  Task 2: Calculating... (simulated CPU work)';

eg/thread_pool.pl  view on Meta::CPAN

$|++;

# Demonstrate dynamic thread pool growth and modern API
async {
    say 'Main: Initial thread pool size is ' . Acme::Parataxis::get_thread_pool_size();
    my @futures;
    for my $id ( 1 .. 5 ) {
        push @futures, fiber {
            my $fid        = current_fid();
            my $sleep_time = int( rand(1000) ) + 500;
            say "  [Worker $id] Fiber #$fid will sleep for ${sleep_time}ms in background pool";
            my $start = time();

            # This yields to main, background thread sleeps, scheduler resumes us
            await_sleep($sleep_time);
            my $elapsed = int( ( time() - $start ) * 1000 );
            say "  [Worker $id] Fiber #$fid woke up after ${elapsed}ms!";
            return "Done $id";
        };
    }
    say 'Main: All workers spawned. Waiting for completion...';
    say 'Main: Thread pool size during load: ' . Acme::Parataxis::get_thread_pool_size();
    my $start_time = time();

lib/Acme/Parataxis.c  view on Meta::CPAN

/**
 * @file Parataxis.c
 * @brief Low-level Green Threads (Fibers) and Hybrid Thread Pool for Perl.
 *
 * @section Overview
 * This file implements a cooperative multitasking system (Fibers) integrated
 * with a preemptive native thread pool. It allows Perl to run thousands of
 * user-mode fibers that can offload blocking C-level tasks to background
 * OS threads without stalling the main interpreter.
 *
 * @section Architecture
 * - **Fibers**: The primitive unit of execution. Each fiber has its own OS context
 *   and a complete set of Perl interpreter stacks (Argument, Mark, Scope, Save, Mortal).
 * - **Coroutines**: The execution pattern (yield/call/transfer) used by fibers to
 *   pass control.
 * - **Thread Pool**: A fixed pool of worker threads that poll a job queue for
 *   blocking operations like sleep, I/O, or heavy computation.
 * - **Context Switching**: The `swap_perl_state` function manually saves and restores

lib/Acme/Parataxis.c  view on Meta::CPAN

#include <string.h>

// Forward declarations
DLLEXPORT SV * coro_yield(SV * ret_val);
DLLEXPORT SV * coro_transfer(int fiber_id, SV * args);
DLLEXPORT void destroy_coro(int fiber_id);

/**
 * @brief Get the Operating System's unique Thread ID.
 *
 * Useful for debugging to prove that background tasks are running on
 * different OS threads than the main Perl interpreter.
 *
 * @return int The TID (Windows) or LWP ID (Linux/BSD/macOS).
 */
int get_os_thread_id() {
#ifdef _WIN32
    return (int)GetCurrentThreadId();
#elif defined(__APPLE__)
    uint64_t tid;
    pthread_threadid_np(NULL, &tid);

lib/Acme/Parataxis.c  view on Meta::CPAN

 * @brief Generic container for task input/output data.
 */
typedef union {
    int64_t i; /**< Integer/Pointer storage */
    double d;  /**< Floating point storage */
    char * s;  /**< String storage */
} value_t;

/**
 * @struct job_t
 * @brief Represents a task in the background thread pool queue.
 */
typedef struct {
    int fiber_id;      /**< ID of the Fiber that submitted this task */
    int target_thread; /**< Index of the assigned worker thread */
    int type;          /**< Type of task to perform (TASK_*) */
    value_t input;     /**< Input data for the task */
    value_t output;    /**< Result data populated by the worker */
    int timeout_ms;    /**< Timeout duration for I/O tasks */
    int status;        /**< Current lifecycle state (JOB_*) */
} job_t;

lib/Acme/Parataxis.c  view on Meta::CPAN


/** @brief Maximum number of concurrent fibers allowed */
#define MAX_FIBERS 1024
/** @brief Array of active fiber structures */
static para_fiber_t * fibers[MAX_FIBERS];
/** @brief The context representing the main Perl thread */
static para_fiber_t main_context;
/** @brief ID of the currently executing fiber (-1 for Main) */
static int current_fiber_id = -1;

/** @brief Size of the background job queue */
#define MAX_JOBS 1024
/** @brief Fixed-size array for background tasks */
static job_t job_slots[MAX_JOBS];
/** @brief Mutex protecting access to the job queue */
static para_mutex_t queue_lock;

#ifdef _WIN32
static CONDITION_VARIABLE queue_cond;
#else
static pthread_cond_t queue_cond;
#endif

lib/Acme/Parataxis.c  view on Meta::CPAN

            Sleep(1);
#else
            usleep(1000);
#endif
        }
    }
    return 0;
}

/**
 * @brief Initializes the background thread pool.
 *
 * Automatically detects the CPU count and spawns worker threads. This function
 * is called automatically by `init_system` and `submit_c_job`.
 */
DLLEXPORT void init_threads() {
    dTHX;
    if (threads_initialized)
        return;
    LOCK_INIT(queue_lock);
    PARA_COND_INIT(queue_cond);

lib/Acme/Parataxis.c  view on Meta::CPAN

            max_thread_pool_size = MAX_THREADS;
    }

    /* Start with a small "seed" pool of 2 threads */
    _spawn_workers(2);

    threads_initialized = 1;
}

/**
 * @brief Submits a C-level task to the background pool.
 *
 * @param type The task type constant (TASK_*).
 * @param arg Input integer or pointer data.
 * @param timeout_ms Timeout for I/O operations.
 * @return int The index of the submitted job, or -1 if the queue is full.
 */
DLLEXPORT int submit_c_job(int type, int64_t arg, int timeout_ms) {
    if (!threads_initialized)
        init_threads();
    int idx = -1;

lib/Acme/Parataxis.c  view on Meta::CPAN

        job_slots[idx].input.i = arg;
        job_slots[idx].timeout_ms = timeout_ms;
        job_slots[idx].status = JOB_NEW;
        PARA_COND_SIGNAL(queue_cond);
    }
    UNLOCK(queue_lock);
    return idx;
}

/**
 * @brief Polls the queue for any completed background jobs.
 *
 * @return int Index of a finished job, or -1 if none are ready.
 */
DLLEXPORT int check_for_completion() {
    if (!threads_initialized)
        init_threads();
    int job_idx = -1;
    LOCK(queue_lock);
    for (int i = 0; i < MAX_JOBS; i++) {
        if (job_slots[i].status == JOB_DONE) {

lib/Acme/Parataxis.pod  view on Meta::CPAN

The classic way (as I write this, Acme::Parataxis is 5 days old and already has a 'classic' API...)

    use v5.40;
    use Acme::Parataxis;
    $|++;

    Acme::Parataxis::run(
        sub {
            say 'Main task started';

            # Spawn background workers
            my $f1 = Acme::Parataxis->spawn(
                sub {
                    say '  Task 1: Sleeping in a native thread pool...';
                    Acme::Parataxis->await_sleep(1000);
                    say '  Task 1: Ah! What a nice nap...';
                    return 42;
                }
            );
            my $f2 = Acme::Parataxis->spawn(
                sub {

lib/Acme/Parataxis.pod  view on Meta::CPAN


Suspends the current fiber until the provided filehandle is ready for writing, or the timeout is reached.

    async {
        await_write($socket);
        syswrite($socket, $message);
    };

=head2 C<await_core_id( )>

Returns the ID of the CPU core currently executing the background task. This is a non-blocking operation that offloads
the request to the thread pool and suspends the fiber until the result is ready.

    async {
        my $core = await_core_id( );
        say "Background task handled by CPU core: $core";
    };

=head1 CORE CONCEPTS

=head2 Creating Fibers

lib/Acme/Parataxis.pod  view on Meta::CPAN

current fiber is suspended, and the actual blocking work is performed on a B<different> OS thread in a native pool.
Once the task completes, your fiber is automatically queued for resumption on the main thread.

=head1 SCHEDULER FUNCTIONS

The following functions are the primary interface for the integrated cooperative scheduler.

=head2 C<run( $code )>

Starts the event loop and executes C<$code> as the initial fiber. The loop continues to run as long as there are active
fibers or pending background tasks.

    Acme::Parataxis::run(sub {
        say "The scheduler is running!";
    });

=head2 C<spawn( $code )>

Creates a new fiber and adds it to the scheduler's queue. Returns a L<Future|/"Acme::Parataxis::Future OBJECT METHODS">
that will eventually contain the fiber's return value.

lib/Acme/Parataxis.pod  view on Meta::CPAN

=head2 C<max_threads( )>

Returns the currently configured maximum thread pool size.

=head1 BLOCKING & I/O FUNCTIONS

These functions B<suspend> the current fiber and offload the actual blocking work to the native thread pool.

=head2 C<await_sleep( $ms )>

Suspends the fiber for C<$ms> milliseconds. While the background thread sleeps, other fibers can continue to execute.

=head2 C<await_read( $fh, $timeout = 5000 )>

Suspends the fiber until the filehandle C<$fh> is ready for reading, or the C<$timeout> (in milliseconds) is reached.

    my $status = Acme::Parataxis->await_read($socket);
    if ($status > 0) {
        my $data = <$socket>;
    }

lib/Acme/Parataxis.pod  view on Meta::CPAN

        sub _do_timeout {
            my ($self, $type, $timeout) = @_;
            $timeout //= $self->{timeout} // 60;
            my $start = time;
            while (1) {
                # Check for readiness NOW (0 timeout)
                return 1 if $self->SUPER::_do_timeout($type, 0);
                # Check for overall timeout
                my $elapsed = time - $start;
                return 0 if $elapsed > $timeout;
                # Suspend fiber and wait for background I/O check
                my $wait = ($timeout - $elapsed) > 0.5 ? 0.5 : ($timeout - $elapsed);
                if ($type eq 'read') {
                    Acme::Parataxis->await_read($self->{fh}, int($wait * 1000));
                } else {
                    Acme::Parataxis->await_write($self->{fh}, int($wait * 1000));
                }
            }
        }
    }

lib/Acme/Parataxis.pod  view on Meta::CPAN

    $c->call( ); # Prime consumer
    $p->call( ); # Start producer

=head1 BEST PRACTICES & GOTCHAS

=over

=item * B<Avoid blocking syscalls:> Never call blocking C<sleep( )> or C<sysread( )> on the main interpretation thread.
Always use the C<await_*> equivalents to offload work to the pool.

=item * B<Thread Safety:> While Perl code remains single-threaded, background tasks run on separate OS threads. Shared
C-level data (if accessed via FFI) must be mutex-protected.

=item * B<Stack Limits:> Each fiber is allocated a 512KB stack by default. This is more than sufficient for most
Perl code and allows for high concurrency with a small memory footprint. Extremely deep recursion or massive regex
backtracking might still hit limits.

=item * B<Efficiency:> The native thread pool is initialized dynamically upon the first asynchronous request. It
starts with a small "seed" pool and grows on demand up to the configured limit. Worker threads use condition
variables to sleep efficiently when idle, ensuring near-zero CPU usage when no background tasks are pending.

=item * B<Reference Cycles:> Be careful when passing fiber objects into their own closures, as this can create
memory leaks.

=back

=head1 GORY TECHNICAL DETAILS

=head2 Architectural Inspiration

t/004_synopsis.t  view on Meta::CPAN

use Test2::V1 -ipP;
use blib;
use Acme::Parataxis;
$|++;

# This is the synopsis for Acme::Parataxis but verbose
Acme::Parataxis::run(
    sub {
        diag 'Main fiber started (FID: ' . Acme::Parataxis->current_fid . ')';

        # Spawn background workers
        diag 'Spawning Task 1 (Sleep)...';
        my $f1 = Acme::Parataxis->spawn(
            sub {
                diag 'Task 1 started (FID: ' . Acme::Parataxis->current_fid . ')';
                diag 'Task 1: Sleeping in a native thread pool (1000ms)...';
                Acme::Parataxis->await_sleep(1000);
                diag 'Task 1: Ah! What a nice nap...';
                return 42;
            }
        );

t/009_http_tiny.t  view on Meta::CPAN

        if ( $self->{fh} ) {
            my $start = time();
            while (1) {

                # Immediate check using original select (0 timeout)
                return 1 if $self->SUPER::_do_timeout( $type, 0 );

                # Check for overall timeout
                return 0 if ( time() - $start ) > $timeout;

                # Suspend fiber and wait for background I/O check.
                # await_* submits a job and yields 'WAITING'.
                if ( $type eq 'read' ) {
                    Acme::Parataxis->await_read( $self->{fh}, 500 );
                }
                else {
                    Acme::Parataxis->await_write( $self->{fh}, 500 );
                }
            }
        }
        return $self->SUPER::_do_timeout( $type, 0 );



( run in 1.113 second using v1.01-cache-2.11-cpan-d8267643d1d )