Acme-Parataxis

 view release on metacpan or  search on metacpan

README.md  view on Meta::CPAN


A generic await function. It accepts either an `Acme::Parataxis` fiber object or an `Acme::Parataxis::Future` and
suspends the current fiber until the target is ready.

```perl
my $result = await($f);
```

## `await_sleep( $ms )`

Suspends the current fiber for `$ms` milliseconds. This is a non-blocking operation that allows other fibers to run
while the current one is paused.

```
async {
    say "Taking a nap...";
    await_sleep(1000);
    say "I'm awake!";
};
```

README.md  view on Meta::CPAN


```
$fiber->call( );
```

When the called fiber finishes, control returns to the fiber that called it. It is an error to call a fiber that is
already done.

## Yielding

Yielding is the "secret sauce" of fibers.

A yielded fiber passes control back to its caller but remembers its exact state including all variables and the current
instruction pointer. The next time it's called, it resumes exactly where it left off.

```
Acme::Parataxis->yield( );
```

## Communication (Passing Values)

README.md  view on Meta::CPAN

## `max_threads( )`

Returns the currently configured maximum thread pool size.

# BLOCKING & I/O FUNCTIONS

These functions **suspend** the current fiber and offload the actual blocking work to the native thread pool.

## `await_sleep( $ms )`

Suspends the fiber for `$ms` milliseconds. While the background thread sleeps, other fibers can continue to execute.

## `await_read( $fh, $timeout = 5000 )`

Suspends the fiber until the filehandle `$fh` is ready for reading, or the `$timeout` (in milliseconds) is reached.

```perl
my $status = Acme::Parataxis->await_read($socket);
if ($status > 0) {
    my $data = <$socket>;
}
```

## `await_write( $fh, $timeout = 5000 )`

lib/Acme/Parataxis.c  view on Meta::CPAN

/**
 * @file Parataxis.c
 * @brief Low-level Green Threads (Fibers) and Hybrid Thread Pool for Perl.
 *
 * @section Overview
 * This file implements a cooperative multitasking system (Fibers) integrated
 * with a preemptive native thread pool. It allows Perl to run thousands of
 * user-mode fibers that can offload blocking C-level tasks to background
 * OS threads without stalling the main interpreter.
 *
 * @section Architecture
 * - **Fibers**: The primitive unit of execution. Each fiber has its own OS context
 *   and a complete set of Perl interpreter stacks (Argument, Mark, Scope, Save, Mortal).
 * - **Coroutines**: The execution pattern (yield/call/transfer) used by fibers to
 *   pass control.
 * - **Thread Pool**: A fixed pool of worker threads that poll a job queue for
 *   blocking operations like sleep, I/O, or heavy computation.
 * - **Context Switching**: The `swap_perl_state` function manually saves and restores
 *   the global state of the Perl interpreter (`PL_*` variables) to allow disjoint
 *   execution flows.
 *
 * @section Caveats
 * Shared subroutines (CVs) with re-entrant yielding calls are handled by a
 * specialized pad-clearing mechanism in `_activate_current_depths` to satisfy
 * Perl's internal `AvFILLp` assertions in debug builds.
 */

#ifdef _WIN32
#define WIN32_LEAN_AND_MEAN
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0601
#endif

lib/Acme/Parataxis.c  view on Meta::CPAN

/** @name Job Status Constants */
///@{
#define JOB_FREE 0 /**< Slot is available for new tasks */
#define JOB_NEW 1  /**< Task is submitted but not yet picked up by a worker */
#define JOB_BUSY 2 /**< Task is currently being processed by a worker thread */
#define JOB_DONE 3 /**< Task has completed and results are ready */
///@}

/** @name Task Type Constants */
///@{
#define TASK_SLEEP 0   /**< Sleep for N milliseconds */
#define TASK_GET_CPU 1 /**< Retrieve current core ID */
#define TASK_READ 2    /**< Wait for read-readiness on a file descriptor */
#define TASK_WRITE 3   /**< Wait for write-readiness on a file descriptor */
///@}

/**
 * @union value_t
 * @brief Generic container for task input/output data.
 */
typedef union {

lib/Acme/Parataxis.c  view on Meta::CPAN

#else
                int fd = (int)job->input.i;
                FD_SET(fd, &fds);
#endif
                struct timeval tv;
                int res;
                int elapsed_ms = 0;
                int timeout = job->timeout_ms > 0 ? job->timeout_ms : 5000;

                while (threads_keep_running) {
                    tv.tv_sec = 0;
                    tv.tv_usec = 10000;

                    fd_set work_fds = fds;
                    if (job->type == TASK_READ)
#ifdef _WIN32
                        res = select(0, &work_fds, NULL, NULL, &tv);
#else
                        res = select(fd + 1, &work_fds, NULL, NULL, &tv);
#endif
                    else
#ifdef _WIN32

lib/Acme/Parataxis.pod  view on Meta::CPAN


=head2 C<await( $thing )>

A generic await function. It accepts either an C<Acme::Parataxis> fiber object or an C<Acme::Parataxis::Future> and
suspends the current fiber until the target is ready.

    my $result = await($f);

=head2 C<await_sleep( $ms )>

Suspends the current fiber for C<$ms> milliseconds. This is a non-blocking operation that allows other fibers to run
while the current one is paused.

    async {
        say "Taking a nap...";
        await_sleep(1000);
        say "I'm awake!";
    };

=head2 C<await_read( $fh, $timeout = 5000 )>

lib/Acme/Parataxis.pod  view on Meta::CPAN


To run a fiber, you "call" it. This suspends the current fiber and executes the called one until it finishes or yields.

    $fiber->call( );

When the called fiber finishes, control returns to the fiber that called it. It is an error to call a fiber that is
already done.

=head2 Yielding

Yielding is the "secret sauce" of fibers.

A yielded fiber passes control back to its caller but remembers its exact state including all variables and the current
instruction pointer. The next time it's called, it resumes exactly where it left off.

    Acme::Parataxis->yield( );

=head2 Communication (Passing Values)

Fibers can pass data back and forth through C<call> and C<yield>:

lib/Acme/Parataxis.pod  view on Meta::CPAN

=head2 C<max_threads( )>

Returns the currently configured maximum thread pool size.

=head1 BLOCKING & I/O FUNCTIONS

These functions B<suspend> the current fiber and offload the actual blocking work to the native thread pool.

=head2 C<await_sleep( $ms )>

Suspends the fiber for C<$ms> milliseconds. While the background thread sleeps, other fibers can continue to execute.

=head2 C<await_read( $fh, $timeout = 5000 )>

Suspends the fiber until the filehandle C<$fh> is ready for reading, or the C<$timeout> (in milliseconds) is reached.

    my $status = Acme::Parataxis->await_read($socket);
    if ($status > 0) {
        my $data = <$socket>;
    }

=head2 C<await_write( $fh, $timeout = 5000 )>

Suspends the fiber until the filehandle C<$fh> is ready for writing.

t/002_exceptions.t  view on Meta::CPAN

use Test2::V1 -ipP;
use blib;
use Acme::Parataxis;
use experimental 'class';
$|++;
#
diag 'Testing exception handling in Acme::Parataxis fibers...';
subtest 'Die inside coroutine, catch outside' => sub {
    my $fiber = Acme::Parataxis->new(
        code => sub {
            pass 'Inside the sub. Hang on a sec while I die...';
            die 'Death in coro';
        }
    );
    like dies { $fiber->call(); }, qr/Death in coro/, 'Caught exception from coroutine';
    ok $fiber->is_done, 'Coroutine is marked as done after die';
};
subtest 'eval inside coroutine' => sub {
    my $fiber = Acme::Parataxis->new(
        code => sub {
            pass 'Inside the sub. Hang on a sec while I die inside an eval...';
            eval { die 'Inner death' };
            my $err = $@;
            return 'Survived: ' . $err;
        }
    );
    like $fiber->call(), qr/Survived: Inner death/, 'Inner eval caught the death';
    ok $fiber->is_done, 'Coroutine finished normally';
};
subtest 'try/catch inside coroutine' => sub {
    my $fiber = Acme::Parataxis->new(

t/006_parallel.t  view on Meta::CPAN

Acme::Parataxis::run(
    sub {
        my $start_time = time();
        diag "Starting parallel sleeps at $start_time...";

        # Check thread pool size
        my $pool_size = Acme::Parataxis::get_thread_pool_size();
        diag "Detected thread pool size: $pool_size";

        # Spawn only as many tasks as we have threads (up to 3) to ensure they run in parallel.
        # If pool size is 2, running 3 tasks takes 2 seconds, failing the 1.8s test.
        my $num_tasks = $pool_size;
        $num_tasks = 3 if $num_tasks > 3;
        $num_tasks = 1 if $num_tasks < 1;
        diag "Spawning $num_tasks parallel tasks...";
        my @futures;
        for my $i ( 1 .. $num_tasks ) {
            push @futures, Acme::Parataxis->spawn(
                sub {
                    my $id = $i;    # Closure capture
                    diag "Fiber $id started (FID: " . Acme::Parataxis->current_fid . ')';

t/006_parallel.t  view on Meta::CPAN

                    return $id;
                }
            );
        }

        # Wait for all
        diag "Main: Waiting for $num_tasks fibers to finish...";
        my @results;
        push @results, $_->await() for @futures;
        my $elapsed = time() - $start_time;
        diag "Total wallclock time: $elapsed seconds";
        diag 'Results: ' . join( ', ', @results );
        my $expected = [ 1 .. $num_tasks ];
        is \@results, $expected, 'Fibers returned correct individual results';

        # If we have at least 1 task, it should take ~1s.
        ok $elapsed < 1.8, "$num_tasks tasks ran in parallel (elapsed < 1.8s)";
    }
);
done_testing();



( run in 1.439 second using v1.01-cache-2.11-cpan-39bf76dae61 )