Acme-Parataxis

 view release on metacpan or  search on metacpan

README.md  view on Meta::CPAN


The classic way (as I write this, Acme::Parataxis is 5 days old and already has a 'classic' API...)

```perl
use v5.40;
use Acme::Parataxis;
$|++;

Acme::Parataxis::run(
    sub {
        say 'Main task started';

        # Spawn background workers
        my $f1 = Acme::Parataxis->spawn(
            sub {
                say '  Task 1: Sleeping in a native thread pool...';
                Acme::Parataxis->await_sleep(1000);
                say '  Task 1: Ah! What a nice nap...';
                return 42;
            }
        );

README.md  view on Meta::CPAN

```

Or do things the more modern way:

```perl
use v5.40;
use Acme::Parataxis qw[:all];
$|++;

async {
    say 'Main task started';

    # 'fiber' is a shorter alias for 'spawn'
    my $f1 = fiber {
        say '  Task 1: Sleeping...';
        await_sleep(1000);
        return 42;
    };

    my $f2 = fiber {
        say '  Task 2: Performing I/O...';

README.md  view on Meta::CPAN


Close the browser and clear your history before this does further harm!

# MODERN API

While the classic object-oriented API is always available, `Acme::Parataxis` exports a set of functions (via the
`:all` tag) that provide a more modern, concise way to write concurrent code.

## `async { ... }`

A convenience wrapper around `run( )`. It starts the scheduler, executes the provided block as the main fiber, and
automatically calls `stop( )` when the block completes.

```
async {
    say "The scheduler is running!";
};
```

## `fiber { ... }`

README.md  view on Meta::CPAN

async {
    my $core = await_core_id( );
    say "Background task handled by CPU core: $core";
};
```

# CORE CONCEPTS

## Creating Fibers

All Perl code in this system runs within a fiber. When you start your script or call `Acme::Parataxis::run`, a "main"
fiber is active. You can create new fibers using `spawn` or by manually instantiating an `Acme::Parataxis` object:

```perl
my $fiber = Acme::Parataxis->new(code => sub {
    say "I'm in a fiber!";
});
```

Creating a fiber does not run it immediately. It simply prepares the context and waits to be invoked.

README.md  view on Meta::CPAN

```

## `yield( @args )`

Pauses the current fiber and returns control to the scheduler. If `@args` are provided, they are passed to the context
that next resumes this fiber. Arguments can be of any Perl data type.

## `stop( )`

Tells the scheduler to exit the loop after the current iteration. Note that this does not immediately terminate other
fibers; it simply prevents the scheduler from starting new ones.

# THREAD POOL CONFIGURATION

`Acme::Parataxis` uses a native thread pool to handle blocking tasks. While it manages itself automatically, you can
tune its behavior using these functions.

## `set_max_threads( $count )`

Sets the maximum number of worker threads the pool is allowed to spawn. By default, this is set to the number of
logical CPU cores detected on your system (up to a hard limit of 64).

README.md  view on Meta::CPAN

        return $res;
    }
}
{
    package My::HTTP::Handle;
    use parent -norequire, 'HTTP::Tiny::Handle';
    use Time::HiRes qw[time];
    sub _do_timeout {
        my ($self, $type, $timeout) = @_;
        $timeout //= $self->{timeout} // 60;
        my $start = time;
        while (1) {
            # Check for readiness NOW (0 timeout)
            return 1 if $self->SUPER::_do_timeout($type, 0);
            # Check for overall timeout
            my $elapsed = time - $start;
            return 0 if $elapsed > $timeout;
            # Suspend fiber and wait for background I/O check
            my $wait = ($timeout - $elapsed) > 0.5 ? 0.5 : ($timeout - $elapsed);
            if ($type eq 'read') {
                Acme::Parataxis->await_read($self->{fh}, int($wait * 1000));
            } else {
                Acme::Parataxis->await_write($self->{fh}, int($wait * 1000));
            }
        }
    }

README.md  view on Meta::CPAN

# BEST PRACTICES & GOTCHAS

- **Avoid blocking syscalls:** Never call blocking `sleep( )` or `sysread( )` on the main interpretation thread.
Always use the `await_*` equivalents to offload work to the pool.
- **Thread Safety:** While Perl code remains single-threaded, background tasks run on separate OS threads. Shared
C-level data (if accessed via FFI) must be mutex-protected.
- **Stack Limits:** Each fiber is allocated a 512KB stack by default. This is more than sufficient for most
Perl code and allows for high concurrency with a small memory footprint. Extremely deep recursion or massive regex
backtracking might still hit limits.
- **Efficiency:** The native thread pool is initialized dynamically upon the first asynchronous request. It
starts with a small "seed" pool and grows on demand up to the configured limit. Worker threads use condition
variables to sleep efficiently when idle, ensuring near-zero CPU usage when no background tasks are pending.
- **Reference Cycles:** Be careful when passing fiber objects into their own closures, as this can create
memory leaks.

# GORY TECHNICAL DETAILS

## Architectural Inspiration

The concurrency model in Parataxis is heavily inspired by the **Wren** programming language, specifically its treatment
of fibers as the primary unit of execution and its deterministic cooperative scheduling.

eg/basic.pl  view on Meta::CPAN

use v5.40;
use blib;
use Acme::Parataxis;
#
say 'Main thread (TID: ' . Acme::Parataxis->tid . ')';

# Create a worker parataxis
my $worker = Acme::Parataxis->new(
    code => sub ($name) {
        say "---> Worker '$name' started (FID: " . Acme::Parataxis->current_fid . ")";
        for my $i ( 1 .. 3 ) {
            say "---> Worker '$name' processing step $i";
            my $input = Acme::Parataxis->yield( 'Result from step ' . $i );
            say "---> Worker '$name' received: $input";
        }
        return 'Final success';
    }
);

# Drive the worker

eg/http_tiny.pl  view on Meta::CPAN

    }
}
#
package My::HTTP::Handle {
    use parent -norequire, 'HTTP::Tiny::Handle';
    use Time::HiRes     qw[time];
    use Acme::Parataxis qw[await_read await_write];

    sub _do_timeout ( $self, $type, $timeout //= $self->{timeout} // 60 ) {
        if ( $self->{fh} ) {
            my $start = time;
            while (1) {

                # Check for readiness NOW (0 timeout)
                return 1 if $self->SUPER::_do_timeout( $type, 0 );

                # Check for overall timeout
                my $elapsed = time() - $start;
                return 0 if $elapsed > $timeout;

                # Suspend fiber and wait for background I/O check.
                # This is where the magic happens: the fiber yields control back
                # to the scheduler while waiting for the socket to be ready.
                my $wait = ( $timeout - $elapsed ) > 0.5 ? 0.5 : ( $timeout - $elapsed );
                if ( $type eq 'read' ) {
                    await_read( $self->{fh}, int( $wait * 1000 ) );
                }
                else {

eg/http_tiny.pl  view on Meta::CPAN

    my @urls = qw[
        http://www.google.com
        http://www.example.com
        https://www.perl.org
        https://metacpan.org
    ];
    my @futures;

    for my $url (@urls) {
        push @futures, fiber {
            my $start = time();
            say sprintf '  [Fiber %d] Fetching %s...', current_fid(), $url;
            my $res     = $http->get($url);
            my $elapsed = time() - $start;
            say sprintf '  [Fiber %d] Done %s (Status: %d) in %.2fs', current_fid(), $url, $res->{status}, $elapsed;
            return $res;
        };
    }

    # Wait for all fibers to complete and collect results
    for my $i ( 0 .. $#urls ) {
        my $res = await $futures[$i];
        say sprintf 'Final Result for %s: %d %s', $urls[$i], $res->{status}, $res->{reason};
    }

eg/modern.pl  view on Meta::CPAN

use v5.40;
use blib;
use Acme::Parataxis qw[:all];
#
async {
    say 'Main fiber started';

    # 'fiber' is the modern alias for 'spawn'
    my $f1 = fiber {
        say '  Fiber 1: Sleeping...';
        await_sleep(1000);
        say '  Fiber 1: Woke up!';
        return 42;
    };

    # 'await' is the modern way to wait for results

eg/port_scanner.pl  view on Meta::CPAN

use Errno       qw[EINPROGRESS EWOULDBLOCK];
use Time::HiRes qw[time];
$|++;

# This demonstrates how fibers can be used to perform concurrent
# network operations using standard Perl modules (IO::Socket) and the
# Parataxis non-blocking I/O scheduler.
my $target = '127.0.0.1';
my @ports  = ( 22, 80, 443, 3306, 3389, 5432, 8080, 9999 );    # Common ports + our demo server

# To make the test interesting, let's start a tiny listener on 9999 in a fiber
Acme::Parataxis::run(
    sub {
        say "Main: Starting parallel scan of $target...";
        my $start_time = time;

        # Start a dummy listener so we have at least one open port to find
        my $server = IO::Socket::INET->new( LocalAddr => $target, LocalPort => 9999, Listen => 5, Reuse => 1, Blocking => 0 );
        say 'Main: Local dummy server started on 9999' if $server;
        my @results;
        my @futures;

        # Spawn a fiber for each port we want to scan
        for my $port (@ports) {
            push @futures, Acme::Parataxis->spawn(
                sub {
                    my $s = IO::Socket::INET->new( PeerAddr => $target, PeerPort => $port, Proto => 'tcp', Blocking => 0 );

                    # If the socket failed immediately (e.g. invalid target)

eg/port_scanner.pl  view on Meta::CPAN

        }

        # Wait for all scanners to finish
        for my $f (@futures) {
            my $r = $f->await();
            push @results, $r;
            if ( $r->{status} eq 'OPEN' ) {
                say "  [!] Port $r->{port} is OPEN";
            }
        }
        my $elapsed = time() - $start_time;
        say 'Main: Scan complete.';
        say 'Summary:';
        for my $r ( sort { $a->{port} <=> $b->{port} } @results ) {
            say sprintf '  Port %-5d: %s', $r->{port}, $r->{status};
        }
        say sprintf 'Total scan time: %.2fs', $elapsed;
        $server->close() if $server;
        Acme::Parataxis::stop();
    }
);

eg/socket.pl  view on Meta::CPAN

Acme::Parataxis::run(
    sub {
        my $server = IO::Socket::INET->new( LocalAddr => '127.0.0.1', LocalPort => 9999, Proto => 'tcp', Listen => 5, Reuse => 1 ) or
            die 'Could not create server: ' . $!;
        $server->blocking(0);
        $server->autoflush(1);
        say 'Server listening on 127.0.0.1:9999';
        my $client_done = 0;
        Acme::Parataxis->spawn(
            sub {
                say 'Client starting...';
                my $client = IO::Socket::INET->new( PeerAddr => '127.0.0.1', PeerPort => 9999, Proto => 'tcp' ) or
                    die 'Could not create client: ' . $!;
                $client->blocking(0);
                $client->autoflush(1);
                say 'Client connected, waiting to write...';
                Acme::Parataxis->await_write($client);
                $client->print("Hello from Parataxis!\n");
                say 'Client waiting for response...';
                Acme::Parataxis->await_read($client);
                my $line = <$client>;

eg/socket.pl  view on Meta::CPAN

            my $res = Acme::Parataxis->await_read($server);
            last if $client_done;    # Check immediately after waking up
            if ( $res > 0 ) {
                my $conn = $server->accept();
                if ($conn) {
                    say 'Server accepted connection!';
                    Acme::Parataxis->spawn(
                        sub {
                            $conn->blocking(0);
                            $conn->autoflush(1);
                            say '  Handler started for ' . fileno($conn);
                            Acme::Parataxis->await_read($conn);
                            my $line = <$conn>;
                            if ( defined $line ) {
                                chomp $line;
                                say '  Server echoing: ' . $line;
                                Acme::Parataxis->await_write($conn);
                                $conn->print("Echo: $line\n");
                            }
                            else {
                                say '  Server got EOF';

eg/stress_fibers.pl  view on Meta::CPAN

use blib;
use Acme::Parataxis qw[:all];
use Time::HiRes     qw[time];

# Stress Test 1: High Fiber Count & Rapid Lifecycle
# This tests the scheduler's ability to handle many short-lived fibers.
async {
    my $total_fibers = 1000;    # We increased MAX_FIBERS in the C code to 1024
    my $iterations   = 5;
    say "Starting Stress Test: $total_fibers fibers, $iterations iterations...";
    my $start_time = time();
    for my $iter ( 1 .. $iterations ) {
        my $iter_start = time();
        my @futures;

        # Spawn a batch of fibers
        for my $i ( 1 .. $total_fibers ) {
            push @futures, fiber {

                # Do a tiny bit of work
                my $val = 0;
                $val += $_ for 1 .. 100;
                no warnings 'recursion';
                yield() if $i % 2 == 0;    # Mix in some yields
                return $val;
            };
        }

        # Wait for all to finish
        for my $f (@futures) {
            await $f;
        }
        my $elapsed = time() - $iter_start;
        say sprintf '  Iteration %2d completed in %.4fs', $iter, $elapsed;
    }
    my $total_elapsed = time() - $start_time;
    say sprintf 'Total time for %d fibers across %d iterations: %.4fs', $total_fibers, $iterations, $total_elapsed;
    say 'Average time per fiber lifecycle: ' . ( $total_elapsed / ( $total_fibers * $iterations ) ) . 's';
};

eg/stress_io.pl  view on Meta::CPAN

use Time::HiRes     qw[time];
$|++;

# Stress Test 2: Concurrent I/O (Background Thread Pool)
# This tests the ability to handle many simultaneous blocking background tasks.
async {
    my $concurrency = 64;    # Half of MAX_FIBERS
    my $iterations  = 5;
    my $sleep_ms    = 500;
    say "Starting I/O Stress Test: $concurrency concurrent sleeps of ${sleep_ms}ms...";
    my $start_time = time();
    for my $iter ( 1 .. $iterations ) {
        my $iter_start = time();
        my @futures;

        # Spawn concurrent sleep fibers
        for my $i ( 1 .. $concurrency ) {
            push @futures, fiber {
                no warnings 'recursion';

                # This offloads to the C-level background thread pool
                await_sleep($sleep_ms);
                return "Worker $i done";
            };
        }

        # Wait for all to finish
        for my $f (@futures) {
            await $f;
        }
        my $elapsed = time() - $iter_start;

        # Since they are concurrent and $concurrency > CPU count (usually),
        # it should take roughly (ceil(concurrency / num_cpus) * sleep_ms)
        say sprintf '  Iteration %2d completed in %.4fs', $iter, $elapsed;
    }
    my $total_elapsed = time() - $start_time;
    say sprintf 'Total time for %d concurrent sleeps across %d iterations: %.4fs', $concurrency, $iterations, $total_elapsed;
};

eg/stress_preempt.pl  view on Meta::CPAN

# This tests the integrity of the stack teleportation during deep recursion
# and the effectiveness of the preemption threshold.
# Configuration
my $recursion_depth = 50;     # Not too deep to overflow 4MB C-stack, but deep enough to test pads
my $preempt_at      = 100;    # Operations before forced yield
my $num_fibers      = 10;     # Number of competing CPU-bound fibers
Acme::Parataxis::set_preempt_threshold($preempt_at);
async {
    say 'Starting Preemption Stress Test...';
    say "Recursion depth: $recursion_depth, Preempt threshold: $preempt_at, Fibers: $num_fibers";
    my $start_time = time();
    my @futures;

    # Recursive function to stress pads and preemption
    sub recursive_math( $depth_in, $fiber_id ) {
        no warnings 'uninitialized';
        my $depth = $depth_in;    # Localize
        return 1 if $depth <= 0;

        # Do some CPU work and potential preemption
        my $acc = 0;

eg/stress_preempt.pl  view on Meta::CPAN

            my $val = recursive_math( $recursion_depth, $i );
            say "  [Fiber $i] Done with result: $val";
            return $val;
        };
    }

    # Wait for all to finish
    for my $f (@futures) {
        await $f;
    }
    my $total_elapsed = time() - $start_time;
    say sprintf 'Total time for %d competing recursive fibers: %.4fs', $num_fibers, $total_elapsed;

    # Reset preemption threshold for the rest of the script/environment
    Acme::Parataxis::set_preempt_threshold(0);
};

eg/symmetric.pl  view on Meta::CPAN

        }
        say '  Consumer: Shutting down.';
        Acme::Parataxis->root->transfer();    # Return to main
    }
);
say 'Main: Starting the dance...';

# We need to prime the consumer so it hits its yield()
$consumer->call();

# Now start the producer
$producer->call();
say 'Main: The dance is over.';

eg/synopsis.pl  view on Meta::CPAN

{
    use v5.40;
    use blib;
    use Acme::Parataxis;
    $|++;

    # Basic usage with the integrated scheduler
    Acme::Parataxis::run(
        sub {
            say 'Main fiber started (TID: ' . Acme::Parataxis->tid . ')';

            # Spawn background workers
            my $f1 = Acme::Parataxis->spawn(
                sub {
                    say '  Task 1: Sleeping (non-blocking for others)...';
                    Acme::Parataxis->await_sleep(1000);
                    return 'Coffee is ready!';
                }
            );
            my $f2 = Acme::Parataxis->spawn(

eg/thread_pool.pl  view on Meta::CPAN


# Demonstrate dynamic thread pool growth and modern API
async {
    say 'Main: Initial thread pool size is ' . Acme::Parataxis::get_thread_pool_size();
    my @futures;
    for my $id ( 1 .. 5 ) {
        push @futures, fiber {
            my $fid        = current_fid();
            my $sleep_time = int( rand(1000) ) + 500;
            say "  [Worker $id] Fiber #$fid will sleep for ${sleep_time}ms in background pool";
            my $start = time();

            # This yields to main, background thread sleeps, scheduler resumes us
            await_sleep($sleep_time);
            my $elapsed = int( ( time() - $start ) * 1000 );
            say "  [Worker $id] Fiber #$fid woke up after ${elapsed}ms!";
            return "Done $id";
        };
    }
    say 'Main: All workers spawned. Waiting for completion...';
    say 'Main: Thread pool size during load: ' . Acme::Parataxis::get_thread_pool_size();
    my $start_time = time();

    # Wait for all workers to finish via futures using modern 'await'
    await $_ for @futures;
    say 'Main: All workers finished!';
    printf "Total wallclock time: %.2fs\n", time() - $start_time;
    say 'Main: Final thread pool size: ' . Acme::Parataxis::get_thread_pool_size();
};

eg/worker_pool.pl  view on Meta::CPAN

    sub {
        my @jobs = (
            { id => 1, task => 'Fetch User Data',  delay => 800 },
            { id => 2, task => 'Process Payment',  delay => 1200 },
            { id => 3, task => 'Send Email',       delay => 500 },
            { id => 4, task => 'Update Inventory', delay => 1500 },
            { id => 5, task => 'Generate Report',  delay => 900 },
            { id => 6, task => 'Sync Analytics',   delay => 1100 }
        );
        say 'Main: Starting worker pool with 3 fibers...';
        my $start_time = time;

        # Simple shared queue
        my @queue = @jobs;
        my @results;

        # Spawn 3 worker fibers
        my @workers;
        for my $w_id ( 1 .. 3 ) {
            push @workers, Acme::Parataxis->spawn(
                sub {
                    say "  [Worker $w_id] Fiber started.";
                    while ( my $job = shift @queue ) {
                        say "  [Worker $w_id] Processing Job $job->{id}: $job->{task}...";

                        # Simulate a blocking I/O call (DB query, API request, idk...)
                        # This yields to the scheduler while the native thread pool sleeps.
                        Acme::Parataxis->await_sleep( $job->{delay} );
                        say "  [Worker $w_id] Finished Job $job->{id}.";
                        push @results, "Job $job->{id} complete";
                    }
                    return "Worker $w_id finished.";
                }
            );
        }

        # Wait for all workers to complete
        say 'Main: Waiting for pool to drain...';
        $_->await for @workers;
        my $elapsed    = time() - $start_time;
        my $sum_delays = 0;
        $sum_delays += $_->{delay} for @jobs;
        say 'Main: All tasks finished!';
        say 'Total Results: ' . scalar(@results);
        say sprintf 'Total Wallclock Time: %.2fs',     $elapsed;
        say sprintf 'Sum of individual delays: %.2fs', $sum_delays / 1000;
        say sprintf 'Speedup: ~%.2fx', ( $sum_delays / 1000 ) / $elapsed;
    }
);

lib/Acme/Parataxis.c  view on Meta::CPAN

#endif

    /*
     * Perl Interpreter State Pointers.
     * These must be saved and restored during every context switch.
     */
    PERL_SI * si;            /**< Current Stack Info (tracks recursion and eval frames) */
    AV * curstack;           /**< The active Argument Stack (AV*) */
    SSize_t stack_sp_offset; /**< Stack Pointer offset from stack base */

    I32 * markstack;     /**< Base of the Mark Stack (tracks list start points) */
    I32 * markstack_ptr; /**< Current pointer into the Mark Stack */
    I32 * markstack_max; /**< Limit of the Mark Stack */

    I32 * scopestack;   /**< Base of the Scope Stack (tracks block nesting) */
    I32 scopestack_ix;  /**< Current index in the Scope Stack */
    I32 scopestack_max; /**< Limit of the Scope Stack */

    ANY * savestack;   /**< Base of the Save Stack (tracks local/my variables for cleanup) */
    I32 savestack_ix;  /**< Current index in the Save Stack */
    I32 savestack_max; /**< Limit of the Save Stack */

lib/Acme/Parataxis.c  view on Meta::CPAN

    if (idx < 0 || idx >= MAX_JOBS)
        return;
    LOCK(queue_lock);
    job_slots[idx].status = JOB_FREE;
    UNLOCK(queue_lock);
}

/**
 * @brief Resets the call depth of a Perl CV to zero.
 *
 * Used to ensure that a newly created fiber starts its coderef with a
 * clean execution state.
 *
 * @param cv_ref SV reference to the coderef.
 */
DLLEXPORT void force_depth_zero(SV * cv_ref) {
    dTHX;
    CV * cv = NULL;
    if (SvROK(cv_ref))
        cv = (CV *)SvRV(cv_ref);
    else if (SvTYPE(cv_ref) == SVt_PVCV)

lib/Acme/Parataxis.c  view on Meta::CPAN

    return res;
}

/**
 * @brief Entry point function for all new fibers.
 *
 * Sets up the Perl environment (ENTER/SAVETMPS), unpacks arguments,
 * calls the user coderef, handles results/errors, and manages the
 * fiber's completion lifecycle.
 *
 * @param c Pointer to the fiber context being started.
 */
static void entry_point(para_fiber_t * c) {
    dTHX;
    ENTER;
    SAVETMPS;
    dSP;
    PUSHMARK(SP);

    /* Unpack arguments passed during coro_call */
    if (c->transfer_data && SvROK(c->transfer_data) && SvTYPE(SvRV(c->transfer_data)) == SVt_PVAV) {

lib/Acme/Parataxis.pod  view on Meta::CPAN

=head1 SYNOPSIS

The classic way (as I write this, Acme::Parataxis is 5 days old and already has a 'classic' API...)

    use v5.40;
    use Acme::Parataxis;
    $|++;

    Acme::Parataxis::run(
        sub {
            say 'Main task started';

            # Spawn background workers
            my $f1 = Acme::Parataxis->spawn(
                sub {
                    say '  Task 1: Sleeping in a native thread pool...';
                    Acme::Parataxis->await_sleep(1000);
                    say '  Task 1: Ah! What a nice nap...';
                    return 42;
                }
            );

lib/Acme/Parataxis.pod  view on Meta::CPAN

        }
    );

Or do things the more modern way:

    use v5.40;
    use Acme::Parataxis qw[:all];
    $|++;

    async {
        say 'Main task started';

        # 'fiber' is a shorter alias for 'spawn'
        my $f1 = fiber {
            say '  Task 1: Sleeping...';
            await_sleep(1000);
            return 42;
        };

        my $f2 = fiber {
            say '  Task 2: Performing I/O...';

lib/Acme/Parataxis.pod  view on Meta::CPAN


Close the browser and clear your history before this does further harm!

=head1 MODERN API

While the classic object-oriented API is always available, C<Acme::Parataxis> exports a set of functions (via the
C<:all> tag) that provide a more modern, concise way to write concurrent code.

=head2 C<async { ... }>

A convenience wrapper around C<run( )>. It starts the scheduler, executes the provided block as the main fiber, and
automatically calls C<stop( )> when the block completes.

    async {
        say "The scheduler is running!";
    };

=head2 C<fiber { ... }>

An alias for C<spawn( )>. It creates a new fiber and returns a L<Future|/"Acme::Parataxis::Future OBJECT METHODS">.

lib/Acme/Parataxis.pod  view on Meta::CPAN


    async {
        my $core = await_core_id( );
        say "Background task handled by CPU core: $core";
    };

=head1 CORE CONCEPTS

=head2 Creating Fibers

All Perl code in this system runs within a fiber. When you start your script or call C<Acme::Parataxis::run>, a "main"
fiber is active. You can create new fibers using C<spawn> or by manually instantiating an C<Acme::Parataxis> object:

    my $fiber = Acme::Parataxis->new(code => sub {
        say "I'm in a fiber!";
    });

Creating a fiber does not run it immediately. It simply prepares the context and waits to be invoked.

=head2 Invoking Fibers

lib/Acme/Parataxis.pod  view on Meta::CPAN

    });

=head2 C<yield( @args )>

Pauses the current fiber and returns control to the scheduler. If C<@args> are provided, they are passed to the context
that next resumes this fiber. Arguments can be of any Perl data type.

=head2 C<stop( )>

Tells the scheduler to exit the loop after the current iteration. Note that this does not immediately terminate other
fibers; it simply prevents the scheduler from starting new ones.

=head1 THREAD POOL CONFIGURATION

C<Acme::Parataxis> uses a native thread pool to handle blocking tasks. While it manages itself automatically, you can
tune its behavior using these functions.

=head2 C<set_max_threads( $count )>

Sets the maximum number of worker threads the pool is allowed to spawn. By default, this is set to the number of
logical CPU cores detected on your system (up to a hard limit of 64).

lib/Acme/Parataxis.pod  view on Meta::CPAN

            return $res;
        }
    }
    {
        package My::HTTP::Handle;
        use parent -norequire, 'HTTP::Tiny::Handle';
        use Time::HiRes qw[time];
        sub _do_timeout {
            my ($self, $type, $timeout) = @_;
            $timeout //= $self->{timeout} // 60;
            my $start = time;
            while (1) {
                # Check for readiness NOW (0 timeout)
                return 1 if $self->SUPER::_do_timeout($type, 0);
                # Check for overall timeout
                my $elapsed = time - $start;
                return 0 if $elapsed > $timeout;
                # Suspend fiber and wait for background I/O check
                my $wait = ($timeout - $elapsed) > 0.5 ? 0.5 : ($timeout - $elapsed);
                if ($type eq 'read') {
                    Acme::Parataxis->await_read($self->{fh}, int($wait * 1000));
                } else {
                    Acme::Parataxis->await_write($self->{fh}, int($wait * 1000));
                }
            }
        }

lib/Acme/Parataxis.pod  view on Meta::CPAN

Always use the C<await_*> equivalents to offload work to the pool.

=item * B<Thread Safety:> While Perl code remains single-threaded, background tasks run on separate OS threads. Shared
C-level data (if accessed via FFI) must be mutex-protected.

=item * B<Stack Limits:> Each fiber is allocated a 512KB stack by default. This is more than sufficient for most
Perl code and allows for high concurrency with a small memory footprint. Extremely deep recursion or massive regex
backtracking might still hit limits.

=item * B<Efficiency:> The native thread pool is initialized dynamically upon the first asynchronous request. It
starts with a small "seed" pool and grows on demand up to the configured limit. Worker threads use condition
variables to sleep efficiently when idle, ensuring near-zero CPU usage when no background tasks are pending.

=item * B<Reference Cycles:> Be careful when passing fiber objects into their own closures, as this can create
memory leaks.

=back

=head1 GORY TECHNICAL DETAILS

=head2 Architectural Inspiration

t/004_synopsis.t  view on Meta::CPAN

use v5.40;
use Test2::V1 -ipP;
use blib;
use Acme::Parataxis;
$|++;

# This is the synopsis for Acme::Parataxis but verbose
Acme::Parataxis::run(
    sub {
        diag 'Main fiber started (FID: ' . Acme::Parataxis->current_fid . ')';

        # Spawn background workers
        diag 'Spawning Task 1 (Sleep)...';
        my $f1 = Acme::Parataxis->spawn(
            sub {
                diag 'Task 1 started (FID: ' . Acme::Parataxis->current_fid . ')';
                diag 'Task 1: Sleeping in a native thread pool (1000ms)...';
                Acme::Parataxis->await_sleep(1000);
                diag 'Task 1: Ah! What a nice nap...';
                return 42;
            }
        );
        diag 'Spawning Task 2 (I/O dummy)...';
        my $f2 = Acme::Parataxis->spawn(
            sub {
                diag 'Task 2 started (FID: ' . Acme::Parataxis->current_fid . ')';
                diag 'Task 2: Performing I/O simulation...';

                # await_read/write for non-blocking socket handling
                return 'I/O Done';
            }
        );

        # Block current fiber until results are ready (without blocking the thread)
        diag 'Main: Waiting for Task 1 result...';
        my $res1 = $f1->await();

t/006_parallel.t  view on Meta::CPAN

use v5.40;
use Test2::V1 -ipP;
use blib;
use Acme::Parataxis;
use Time::HiRes qw[time];
$|++;
Acme::Parataxis::run(
    sub {
        my $start_time = time();
        diag "Starting parallel sleeps at $start_time...";

        # Check thread pool size
        my $pool_size = Acme::Parataxis::get_thread_pool_size();
        diag "Detected thread pool size: $pool_size";

        # Spawn only as many tasks as we have threads (up to 3) to ensure they run in parallel.
        # If pool size is 2, running 3 tasks takes 2 seconds, failing the 1.8s test.
        my $num_tasks = $pool_size;
        $num_tasks = 3 if $num_tasks > 3;
        $num_tasks = 1 if $num_tasks < 1;
        diag "Spawning $num_tasks parallel tasks...";
        my @futures;
        for my $i ( 1 .. $num_tasks ) {
            push @futures, Acme::Parataxis->spawn(
                sub {
                    my $id = $i;    # Closure capture
                    diag "Fiber $id started (FID: " . Acme::Parataxis->current_fid . ')';
                    diag "Fiber $id: Sleeping for 1000ms...";
                    Acme::Parataxis->await_sleep(1000);
                    diag "Fiber $id: Woke up!";
                    return $id;
                }
            );
        }

        # Wait for all
        diag "Main: Waiting for $num_tasks fibers to finish...";
        my @results;
        push @results, $_->await() for @futures;
        my $elapsed = time() - $start_time;
        diag "Total wallclock time: $elapsed seconds";
        diag 'Results: ' . join( ', ', @results );
        my $expected = [ 1 .. $num_tasks ];
        is \@results, $expected, 'Fibers returned correct individual results';

        # If we have at least 1 task, it should take ~1s.
        ok $elapsed < 1.8, "$num_tasks tasks ran in parallel (elapsed < 1.8s)";
    }
);
done_testing();

t/009_http_tiny.t  view on Meta::CPAN

}
{

    package Acme::Parataxis::Test::HTTP::Handle;
    use parent -norequire, 'HTTP::Tiny::Handle';

    sub _do_timeout {
        my ( $self, $type, $timeout ) = @_;
        $timeout //= $self->{timeout};
        if ( $self->{fh} ) {
            my $start = time();
            while (1) {

                # Immediate check using original select (0 timeout)
                return 1 if $self->SUPER::_do_timeout( $type, 0 );

                # Check for overall timeout
                return 0 if ( time() - $start ) > $timeout;

                # Suspend fiber and wait for background I/O check.
                # await_* submits a job and yields 'WAITING'.
                if ( $type eq 'read' ) {
                    Acme::Parataxis->await_read( $self->{fh}, 500 );
                }
                else {
                    Acme::Parataxis->await_write( $self->{fh}, 500 );
                }
            }

t/013_real_http.t  view on Meta::CPAN

}
{

    package Acme::Parataxis::Test::RealHTTP::Handle;
    use parent -norequire, 'HTTP::Tiny::Handle';

    sub _do_timeout {
        my ( $self, $type, $timeout ) = @_;
        $timeout //= $self->{timeout} // 60;
        if ( $self->{fh} ) {
            my $start = time();
            while (1) {
                return 1 if $self->SUPER::_do_timeout( $type, 0 );
                my $elapsed = time() - $start;
                return 0 if $elapsed > $timeout;
                my $wait = ( $timeout - $elapsed ) > 0.5 ? 0.5 : ( $timeout - $elapsed );
                if ( $type eq 'read' ) {
                    Acme::Parataxis->await_read( $self->{fh}, int( $wait * 1000 ) );
                }
                else {
                    Acme::Parataxis->await_write( $self->{fh}, int( $wait * 1000 ) );
                }
            }
        }

t/014_http_pool.t  view on Meta::CPAN

}
{

    package Acme::Parataxis::Test::PoolHTTP::Handle;
    use parent -norequire, 'HTTP::Tiny::Handle';

    sub _do_timeout {
        my ( $self, $type, $timeout ) = @_;
        $timeout //= $self->{timeout} // 60;
        if ( $self->{fh} ) {
            my $start = time();
            while (1) {
                return 1 if $self->SUPER::_do_timeout( $type, 0 );
                my $elapsed = time() - $start;
                return 0 if $elapsed > $timeout;
                my $wait = ( $timeout - $elapsed ) > 0.5 ? 0.5 : ( $timeout - $elapsed );
                if ( $type eq 'read' ) {
                    Acme::Parataxis->await_read( $self->{fh}, int( $wait * 1000 ) );
                }
                else {
                    Acme::Parataxis->await_write( $self->{fh}, int( $wait * 1000 ) );
                }
            }
        }

t/015_http_mock_pool.t  view on Meta::CPAN

    }
}

package Acme::Parataxis::Test::MockPoolHTTP::Handle {
    use parent -norequire, 'HTTP::Tiny::Handle';

    sub _do_timeout {
        my ( $self, $type, $timeout ) = @_;
        $timeout //= $self->{timeout} // 60;
        if ( $self->{fh} ) {
            my $start = time();
            while (1) {
                return 1 if $self->SUPER::_do_timeout( $type, 0 );
                my $elapsed = time() - $start;
                return 0 if $elapsed > $timeout;
                my $wait = ( $timeout - $elapsed ) > 0.5 ? 0.5 : ( $timeout - $elapsed );
                if ( $type eq 'read' ) {
                    Acme::Parataxis->await_read( $self->{fh}, int( $wait * 1000 ) );
                }
                else {
                    Acme::Parataxis->await_write( $self->{fh}, int( $wait * 1000 ) );
                }
            }
        }



( run in 2.854 seconds using v1.01-cache-2.11-cpan-cdf2f3d4e48 )