Acme-Parataxis

 view release on metacpan or  search on metacpan

README.md  view on Meta::CPAN


The classic way (as I write this, Acme::Parataxis is 5 days old and already has a 'classic' API...)

```perl
use v5.40;
use Acme::Parataxis;
$|++;

Acme::Parataxis::run(
    sub {
        say 'Main task started';

        # Spawn background workers
        my $f1 = Acme::Parataxis->spawn(
            sub {
                say '  Task 1: Sleeping in a native thread pool...';
                Acme::Parataxis->await_sleep(1000);
                say '  Task 1: Ah! What a nice nap...';
                return 42;
            }
        );
        my $f2 = Acme::Parataxis->spawn(
            sub {
                say '  Task 2: Performing I/O...';

                # await_read/write for non-blocking socket handling
                return 'I/O Done';
            }
        );

        # Block current fiber until results are ready (without blocking the thread)
        say 'Result 1: ' . $f1->await( );
        say 'Result 2: ' . $f2->await( );
    }
);
```

Or do things the more modern way:

```perl
use v5.40;
use Acme::Parataxis qw[:all];
$|++;

async {
    say 'Main task started';

    # 'fiber' is a shorter alias for 'spawn'
    my $f1 = fiber {
        say '  Task 1: Sleeping...';
        await_sleep(1000);
        return 42;
    };

    my $f2 = fiber {
        say '  Task 2: Performing I/O...';
        # ...
        return 'I/O Done';
    };

    # 'await' works on fibers and futures
    say 'Result 1: ' . await($f1);
    say 'Result 2: ' . await($f2);
};
```

# DESCRIPTION

`Acme::Parataxis` implements a hybrid concurrency model for Perl, greatly inspired by the concurrency system for the
[Wren](https://wren.io/concurrency.html) programming language. It combines cooperative multitasking (fibers) with a
preemptive native thread pool.

Fibers are a mechanism for lightweight concurrency. They are similar to threads but are cooperatively scheduled. While

README.md  view on Meta::CPAN

While the classic object-oriented API is always available, `Acme::Parataxis` exports a set of functions (via the
`:all` tag) that provide a more modern, concise way to write concurrent code.

## `async { ... }`

A convenience wrapper around `run( )`. It starts the scheduler, executes the provided block as the main fiber, and
automatically calls `stop( )` when the block completes.

```
async {
    say "The scheduler is running!";
};
```

## `fiber { ... }`

An alias for `spawn( )`. It creates a new fiber and returns a [Future](#acme-parataxis-future-object-methods).

```perl
my $f = fiber {
    say "Hello from fiber!";
};
```

## `await( $thing )`

A generic await function. It accepts either an `Acme::Parataxis` fiber object or an `Acme::Parataxis::Future` and
suspends the current fiber until the target is ready.

```perl
my $result = await($f);
```

## `await_sleep( $ms )`

Suspends the current fiber for `$ms` milliseconds. This is a non-blocking operation that allows other fibers to run
while the current one is paused.

```
async {
    say "Taking a nap...";
    await_sleep(1000);
    say "I'm awake!";
};
```

## `await_read( $fh, $timeout = 5000 )`

Suspends the current fiber until the provided filehandle is ready for reading, or the timeout is reached.

```perl
async {
    await_read($socket);
    my $data = <$socket>;
    say "Received: $data";
};
```

## `await_write( $fh, $timeout = 5000 )`

Suspends the current fiber until the provided filehandle is ready for writing, or the timeout is reached.

```
async {
    await_write($socket);

README.md  view on Meta::CPAN

```

## `await_core_id( )`

Returns the ID of the CPU core currently executing the background task. This is a non-blocking operation that offloads
the request to the thread pool and suspends the fiber until the result is ready.

```perl
async {
    my $core = await_core_id( );
    say "Background task handled by CPU core: $core";
};
```

# CORE CONCEPTS

## Creating Fibers

All Perl code in this system runs within a fiber. When you start your script or call `Acme::Parataxis::run`, a "main"
fiber is active. You can create new fibers using `spawn` or by manually instantiating an `Acme::Parataxis` object:

```perl
my $fiber = Acme::Parataxis->new(code => sub {
    say "I'm in a fiber!";
});
```

Creating a fiber does not run it immediately. It simply prepares the context and waits to be invoked.

## Invoking Fibers

To run a fiber, you "call" it. This suspends the current fiber and executes the called one until it finishes or yields.

```

README.md  view on Meta::CPAN


The following functions are the primary interface for the integrated cooperative scheduler.

## `run( $code )`

Starts the event loop and executes `$code` as the initial fiber. The loop continues to run as long as there are active
fibers or pending background tasks.

```perl
Acme::Parataxis::run(sub {
    say "The scheduler is running!";
});
```

## `spawn( $code )`

Creates a new fiber and adds it to the scheduler's queue. Returns a [Future](#acme-parataxis-future-object-methods)
that will eventually contain the fiber's return value.

```perl
my $future = Acme::Parataxis->spawn(sub {

README.md  view on Meta::CPAN

    my $http = My::HTTP->new(verify_SSL => 0);
    my @urls = qw[http://example.com http://perl.org];

    # Spawn tasks for each URL
    my @futures = map {
        my $url = $_;
        Acme::Parataxis->spawn(sub { $http->get($url)->{status} })
    } @urls;

    # Collect results as they become ready
    say "Status for $urls[$_]: " . $futures[$_]->await( ) for 0..$#urls;
});
```

## Symmetric Producer/Consumer

A low-level example of Passing control sideways between fibers.

```perl
my ($p, $c);

$p = Acme::Parataxis->new(code => sub {
    for my $item (qw[Apple Banana Cherry]) {
        say "Producer: Sending $item";
        $c->transfer($item);
    }
    $c->transfer('DONE');
});

$c = Acme::Parataxis->new(code => sub {
    my $item = Acme::Parataxis->yield( ); # Initial wait
    while (1) {
        last if $item eq 'DONE';
        say "Consumer: Eating $item";
        $item = $p->transfer( );
    }
});

$c->call( ); # Prime consumer
$p->call( ); # Start producer
```

# BEST PRACTICES & GOTCHAS

builder/Acme/Parataxis/Builder.pm  view on Meta::CPAN

    method write_file( $filename, $content ) { path($filename)->spew_raw($content) or die "Could not open $filename: $!\n" }
    method read_file ($filename)             { path($filename)->slurp_utf8         or die "Could not open $filename: $!\n" }

    method step_build() {
        for my $pl_file ( find( qr/\.PL$/, 'lib' ) ) {
            ( my $pm = $pl_file ) =~ s/\.PL$//;
            system $^X, $pl_file->stringify, $pm and die "$pl_file returned $?\n";
        }

        # C Extension Compilation - removed conditional block
        say 'Building libparataxis...';
        require Affix::Build;    # This module is used for C compilation
        require Config;
        require File::Spec;
        my $arch_dir = catdir(qw[blib arch auto Acme Parataxis]);
        mkpath( $arch_dir, $verbose );
        my $build = Affix::Build->new(
            name  => 'parataxis',
            flags => {
                cflags => join( ' ',
                    ( $Config::Config{ccflags}, '-std=c11', map { '-I' . $_ } ( File::Spec->catdir( $Config::Config{archlibexp}, 'CORE' ), 'src' ) )

builder/Acme/Parataxis/Builder.pm  view on Meta::CPAN

                        File::Spec->catdir( $Config::Config{archlibexp}, 'CORE' ) . ' -l' .
                        ( $Config::Config{libperl} =~ s/^lib//r =~ s/\.a$//r =~ s/\.lib$//r ) .
                        ' -lws2_32' ) : ( $^O eq 'darwin' ? '-undefined dynamic_lookup' : '' )
            },
            build_dir => $arch_dir,
            clean     => 0
        );

        # Add the C source file to be compiled. It's expected to be in 'lib/Acme/'
        $build->add('lib/Acme/Parataxis.c');
        say "Compiling and linking...";
        $build->compile_and_link();
        say "Build complete.";
        my %modules = map { $_ => catfile( 'blib', $_ ) } find( qr/\.pm$/,  'lib' );
        my %docs    = map { $_ => catfile( 'blib', $_ ) } find( qr/\.pod$/, 'lib' );
        my %scripts = map { $_ => catfile( 'blib', $_ ) } find( qr/(?:)/,   'script' );
        my %sdocs   = map { $_ => delete $scripts{$_} } grep {/.pod$/} keys %scripts;
        pm_to_blib( { %modules, %docs, %sdocs }, catdir(qw[blib lib auto]) );
        #
        mkpath( catdir(qw[blib script]), $verbose );
        for my $src ( keys %scripts ) {
            my $dest    = $scripts{$src};
            my $content = path($src)->slurp_raw;

builder/Acme/Parataxis/Builder.pm  view on Meta::CPAN

        return;
    }

    method Build(@args) {
        my $method = $self->can( 'step_' . $action );
        $method // die "No such action '$action'\n";
        exit $method->($self);
    }

    method Build_PL() {
        say sprintf 'Creating new Build script for %s %s', $meta->name, $meta->version;
        $self->write_file( 'Build', sprintf <<'', $^X, __PACKAGE__, __PACKAGE__ );
#!%s
use lib 'builder';
use %s;
use Getopt::Long qw[GetOptionsFromArray];
my %%opts = ( @ARGV && $ARGV[0] =~ /\A\w+\z/ ? ( action => shift @ARGV ) : () );
GetOptionsFromArray \@ARGV, \%%opts, qw[install_base=s install_path=s%% installdirs=s destdir=s prefix=s config=s%% uninst:1 verbose:1 dry_run:1 jobs=i];
%s->new(%%opts)->Build();

        make_executable('Build');

eg/affinity.pl  view on Meta::CPAN

use v5.40;
use blib;
use Acme::Parataxis qw[:all];
$|++;

# Demonstrate Thread Affinity / Core ID using modern API
async {
    say 'Main thread (TID: ' . tid() . ')';
    my @futures;
    for ( 1 .. 20 ) {
        push @futures, fiber {

            # await_core_id() offloads to the pool and returns the core ID
            return await_core_id();
        };
    }
    say 'Main: Tasks spawned, waiting for results...';
    my %distribution;
    for my $f (@futures) {
        my $core = await $f;
        if ( defined $core ) {
            $distribution{$core}++;
        }
        else {
            warn "Failed to get core ID for a task.\n";
        }
    }
    say 'Background Task Distribution across CPU Cores:';
    for my $core ( sort { $a <=> $b } keys %distribution ) {
        say "  Core $core: " . $distribution{$core} . ' tasks';
    }

    # Explicitly clear futures to break any potentially clobbered references
    # during teardown in some Perl builds
    @futures = ();
};

eg/basic.pl  view on Meta::CPAN

use v5.40;
use blib;
use Acme::Parataxis;
#
say 'Main thread (TID: ' . Acme::Parataxis->tid . ')';

# Create a worker parataxis
my $worker = Acme::Parataxis->new(
    code => sub ($name) {
        say "---> Worker '$name' started (FID: " . Acme::Parataxis->current_fid . ")";
        for my $i ( 1 .. 3 ) {
            say "---> Worker '$name' processing step $i";
            my $input = Acme::Parataxis->yield( 'Result from step ' . $i );
            say "---> Worker '$name' received: $input";
        }
        return 'Final success';
    }
);

# Drive the worker
say 'Main: Starting worker...';
my $res = $worker->call('Alice');
say 'Main: Worker yielded: ' . $res;
say 'Main: Sending more data...';
$res = $worker->call('Command A');
say 'Main: Worker yielded: ' . $res;
say 'Main: Finishing up...';
$res = $worker->call('Final Command');
say 'Main: Worker returned: ' . $res;
say 'Main: Worker is officially finished.' if $worker->is_done;

eg/http_tiny.pl  view on Meta::CPAN

                    await_write( $self->{fh}, int( $wait * 1000 ) );
                }
            }
        }
        $self->SUPER::_do_timeout( $type, 0 );
    }
}

# Use the integrated scheduler to run concurrent fetches
async {
    say 'Starting concurrent fetches...';
    my $http = My::HTTP->new( timeout => 10, verify_SSL => 0 );
    my @urls = qw[
        http://www.google.com
        http://www.example.com
        https://www.perl.org
        https://metacpan.org
    ];
    my @futures;

    for my $url (@urls) {
        push @futures, fiber {
            my $start = time();
            say sprintf '  [Fiber %d] Fetching %s...', current_fid(), $url;
            my $res     = $http->get($url);
            my $elapsed = time() - $start;
            say sprintf '  [Fiber %d] Done %s (Status: %d) in %.2fs', current_fid(), $url, $res->{status}, $elapsed;
            return $res;
        };
    }

    # Wait for all fibers to complete and collect results
    for my $i ( 0 .. $#urls ) {
        my $res = await $futures[$i];
        say sprintf 'Final Result for %s: %d %s', $urls[$i], $res->{status}, $res->{reason};
    }
    say 'All tasks finished.';
};

eg/modern.pl  view on Meta::CPAN

use v5.40;
use blib;
use Acme::Parataxis qw[:all];
#
async {
    say 'Main fiber started';

    # 'fiber' is the modern alias for 'spawn'
    my $f1 = fiber {
        say '  Fiber 1: Sleeping...';
        await_sleep(1000);
        say '  Fiber 1: Woke up!';
        return 42;
    };

    # 'await' is the modern way to wait for results
    my $res1 = await $f1;
    say 'Main: Fiber 1 returned ' . $res1;
    say 'Creating a manual fiber for bidirectional communication...';

    # Even if you create it with Acme::Parataxis->new,
    # you can use 'await' if it's running in the scheduler
    my $f2 = Acme::Parataxis->new(
        code => sub {
            say '  Fiber 2: Yielding some data...';
            my $received = yield('Data from F2');
            say '  Fiber 2: Resumed with: ' . $received;
            return 'Final result from F2';
        }
    );

    # The manual call/yield pattern works alongside the scheduler
    my $yielded = $f2->call();
    say 'Main: Fiber 2 yielded: ' . $yielded;
    $f2->call('Data from Main');
    say 'Main: Fiber 2 is ' . ( $f2->is_done ? 'done' : 'still running' );
    say 'All done!';
};

eg/port_scanner.pl  view on Meta::CPAN


# This demonstrates how fibers can be used to perform concurrent
# network operations using standard Perl modules (IO::Socket) and the
# Parataxis non-blocking I/O scheduler.
my $target = '127.0.0.1';
my @ports  = ( 22, 80, 443, 3306, 3389, 5432, 8080, 9999 );    # Common ports + our demo server

# To make the test interesting, let's start a tiny listener on 9999 in a fiber
Acme::Parataxis::run(
    sub {
        say "Main: Starting parallel scan of $target...";
        my $start_time = time;

        # Start a dummy listener so we have at least one open port to find
        my $server = IO::Socket::INET->new( LocalAddr => $target, LocalPort => 9999, Listen => 5, Reuse => 1, Blocking => 0 );
        say 'Main: Local dummy server started on 9999' if $server;
        my @results;
        my @futures;

        # Spawn a fiber for each port we want to scan
        for my $port (@ports) {
            push @futures, Acme::Parataxis->spawn(
                sub {
                    my $s = IO::Socket::INET->new( PeerAddr => $target, PeerPort => $port, Proto => 'tcp', Blocking => 0 );

                    # If the socket failed immediately (e.g. invalid target)

eg/port_scanner.pl  view on Meta::CPAN

                    }
                }
            );
        }

        # Wait for all scanners to finish
        for my $f (@futures) {
            my $r = $f->await();
            push @results, $r;
            if ( $r->{status} eq 'OPEN' ) {
                say "  [!] Port $r->{port} is OPEN";
            }
        }
        my $elapsed = time() - $start_time;
        say 'Main: Scan complete.';
        say 'Summary:';
        for my $r ( sort { $a->{port} <=> $b->{port} } @results ) {
            say sprintf '  Port %-5d: %s', $r->{port}, $r->{status};
        }
        say sprintf 'Total scan time: %.2fs', $elapsed;
        $server->close() if $server;
        Acme::Parataxis::stop();
    }
);

eg/preempt.pl  view on Meta::CPAN

use v5.40;
use blib;
use Acme::Parataxis;
$|++;

# Set threshold to 100 opcodes (very low for demo)
Acme::Parataxis::set_preempt_threshold(100);
Acme::Parataxis::run(
    sub {
        say 'Starting cooperative preemption demo...';
        my @futures;
        push @futures, Acme::Parataxis->spawn(
            sub {
                my $count = 0;
                while ( $count < 5 ) {
                    say '  [A] iteration ' . ++$count;

                    # busy loop
                    for ( 1 .. 50 ) {
                        my $x = $_ * 2;
                        Acme::Parataxis->maybe_yield();
                    }
                }
                say '  [A] finished';
            }
        );
        push @futures, Acme::Parataxis->spawn(
            sub {
                my $count = 0;
                while ( $count < 5 ) {
                    say '  [B] iteration ' . ++$count;
                    for ( 1 .. 50 ) {
                        my $x = $_ * 2;
                        Acme::Parataxis->maybe_yield();
                    }
                }
                say '  [B] finished';
            }
        );
        say 'Main spawned both, waiting...';
        $_->await for @futures;
        say 'Main finished waiting.';
    }
);

eg/socket.pl  view on Meta::CPAN

use Acme::Parataxis;
use IO::Socket::INET;

# Create a simple echo server in a coroutine
Acme::Parataxis::run(
    sub {
        my $server = IO::Socket::INET->new( LocalAddr => '127.0.0.1', LocalPort => 9999, Proto => 'tcp', Listen => 5, Reuse => 1 ) or
            die 'Could not create server: ' . $!;
        $server->blocking(0);
        $server->autoflush(1);
        say 'Server listening on 127.0.0.1:9999';
        my $client_done = 0;
        Acme::Parataxis->spawn(
            sub {
                say 'Client starting...';
                my $client = IO::Socket::INET->new( PeerAddr => '127.0.0.1', PeerPort => 9999, Proto => 'tcp' ) or
                    die 'Could not create client: ' . $!;
                $client->blocking(0);
                $client->autoflush(1);
                say 'Client connected, waiting to write...';
                Acme::Parataxis->await_write($client);
                $client->print("Hello from Parataxis!\n");
                say 'Client waiting for response...';
                Acme::Parataxis->await_read($client);
                my $line = <$client>;
                say 'Client received: ' . ( $line // 'UNDEF' );
                $client->close();
                $client_done = 1;
                say 'Client finished, stopping runtime...';
                Acme::Parataxis::stop();
            }
        );
        while ( !$client_done ) {

            # Check if there is a connection waiting (non-blocking-ish)
            # We use a short sleep to not hog the CPU while polling client_done
            # but in a real app you'd just wait on the server socket.
            my $res = Acme::Parataxis->await_read($server);
            last if $client_done;    # Check immediately after waking up
            if ( $res > 0 ) {
                my $conn = $server->accept();
                if ($conn) {
                    say 'Server accepted connection!';
                    Acme::Parataxis->spawn(
                        sub {
                            $conn->blocking(0);
                            $conn->autoflush(1);
                            say '  Handler started for ' . fileno($conn);
                            Acme::Parataxis->await_read($conn);
                            my $line = <$conn>;
                            if ( defined $line ) {
                                chomp $line;
                                say '  Server echoing: ' . $line;
                                Acme::Parataxis->await_write($conn);
                                $conn->print("Echo: $line\n");
                            }
                            else {
                                say '  Server got EOF';
                            }
                            $conn->close();
                            say '  Handler finished';
                        }
                    );
                }
            }

            # If no client connection and client is done, we exit the loop
            last if $client_done;
        }
        say 'Main server loop exiting...';
        $server->close();
    }
);
say 'Script finished.';

eg/stress_fibers.pl  view on Meta::CPAN

use v5.40;
use blib;
use Acme::Parataxis qw[:all];
use Time::HiRes     qw[time];

# Stress Test 1: High Fiber Count & Rapid Lifecycle
# This tests the scheduler's ability to handle many short-lived fibers.
async {
    my $total_fibers = 1000;    # We increased MAX_FIBERS in the C code to 1024
    my $iterations   = 5;
    say "Starting Stress Test: $total_fibers fibers, $iterations iterations...";
    my $start_time = time();
    for my $iter ( 1 .. $iterations ) {
        my $iter_start = time();
        my @futures;

        # Spawn a batch of fibers
        for my $i ( 1 .. $total_fibers ) {
            push @futures, fiber {

                # Do a tiny bit of work

eg/stress_fibers.pl  view on Meta::CPAN

                yield() if $i % 2 == 0;    # Mix in some yields
                return $val;
            };
        }

        # Wait for all to finish
        for my $f (@futures) {
            await $f;
        }
        my $elapsed = time() - $iter_start;
        say sprintf '  Iteration %2d completed in %.4fs', $iter, $elapsed;
    }
    my $total_elapsed = time() - $start_time;
    say sprintf 'Total time for %d fibers across %d iterations: %.4fs', $total_fibers, $iterations, $total_elapsed;
    say 'Average time per fiber lifecycle: ' . ( $total_elapsed / ( $total_fibers * $iterations ) ) . 's';
};

eg/stress_io.pl  view on Meta::CPAN

use Acme::Parataxis qw[:all];
use Time::HiRes     qw[time];
$|++;

# Stress Test 2: Concurrent I/O (Background Thread Pool)
# This tests the ability to handle many simultaneous blocking background tasks.
async {
    my $concurrency = 64;    # Half of MAX_FIBERS
    my $iterations  = 5;
    my $sleep_ms    = 500;
    say "Starting I/O Stress Test: $concurrency concurrent sleeps of ${sleep_ms}ms...";
    my $start_time = time();
    for my $iter ( 1 .. $iterations ) {
        my $iter_start = time();
        my @futures;

        # Spawn concurrent sleep fibers
        for my $i ( 1 .. $concurrency ) {
            push @futures, fiber {
                no warnings 'recursion';

eg/stress_io.pl  view on Meta::CPAN

        }

        # Wait for all to finish
        for my $f (@futures) {
            await $f;
        }
        my $elapsed = time() - $iter_start;

        # Since they are concurrent and $concurrency > CPU count (usually),
        # it should take roughly (ceil(concurrency / num_cpus) * sleep_ms)
        say sprintf '  Iteration %2d completed in %.4fs', $iter, $elapsed;
    }
    my $total_elapsed = time() - $start_time;
    say sprintf 'Total time for %d concurrent sleeps across %d iterations: %.4fs', $concurrency, $iterations, $total_elapsed;
};

eg/stress_preempt.pl  view on Meta::CPAN


# Stress Test 3: Preemption and Deep Call Stacks
# This tests the integrity of the stack teleportation during deep recursion
# and the effectiveness of the preemption threshold.
# Configuration
my $recursion_depth = 50;     # Not too deep to overflow 4MB C-stack, but deep enough to test pads
my $preempt_at      = 100;    # Operations before forced yield
my $num_fibers      = 10;     # Number of competing CPU-bound fibers
Acme::Parataxis::set_preempt_threshold($preempt_at);
async {
    say 'Starting Preemption Stress Test...';
    say "Recursion depth: $recursion_depth, Preempt threshold: $preempt_at, Fibers: $num_fibers";
    my $start_time = time();
    my @futures;

    # Recursive function to stress pads and preemption
    sub recursive_math( $depth_in, $fiber_id ) {
        no warnings 'uninitialized';
        my $depth = $depth_in;    # Localize
        return 1 if $depth <= 0;

        # Do some CPU work and potential preemption

eg/stress_preempt.pl  view on Meta::CPAN

        my $result     = $sub_result + $acc;

        # Another preemption check after coming back up the stack
        maybe_yield();
        return $result;
    }

    # Spawn competing CPU-bound recursive fibers
    for my $i ( 1 .. $num_fibers ) {
        push @futures, fiber {
            say "  [Fiber $i] Starting deep recursion...";
            my $val = recursive_math( $recursion_depth, $i );
            say "  [Fiber $i] Done with result: $val";
            return $val;
        };
    }

    # Wait for all to finish
    for my $f (@futures) {
        await $f;
    }
    my $total_elapsed = time() - $start_time;
    say sprintf 'Total time for %d competing recursive fibers: %.4fs', $num_fibers, $total_elapsed;

    # Reset preemption threshold for the rest of the script/environment
    Acme::Parataxis::set_preempt_threshold(0);
};

eg/symmetric.pl  view on Meta::CPAN

use v5.40;
use blib;
use Acme::Parataxis;

# Symmetric coroutines (Producer/Consumer)
my ( $producer, $consumer );
$producer = Acme::Parataxis->new(
    code => sub {
        for my $item (qw[Apple Banana Cherry]) {
            say "Producer: Created $item. Transferring to Consumer...";
            $consumer->transfer($item);
            say 'Producer: Consumer gave control back. Moving to next item.';
        }
        say 'Producer: Out of items. Telling consumer to finish.';
        $consumer->transfer(undef);
    }
);
$consumer = Acme::Parataxis->new(
    code => sub {

        # Initial yield to wait for the first item from the Producer
        my $item = Acme::Parataxis->yield();
        while ( defined $item ) {
            say "  Consumer: Received '$item'. Processing...";
            say '  Consumer: Done processing. Transferring back to Producer...';

            # Transfer back to producer and wait for the NEXT item
            $item = $producer->transfer();
        }
        say '  Consumer: Shutting down.';
        Acme::Parataxis->root->transfer();    # Return to main
    }
);
say 'Main: Starting the dance...';

# We need to prime the consumer so it hits its yield()
$consumer->call();

# Now start the producer
$producer->call();
say 'Main: The dance is over.';

eg/synopsis.pl  view on Meta::CPAN

{
    use v5.40;
    use blib;
    use Acme::Parataxis;
    $|++;

    # Basic usage with the integrated scheduler
    Acme::Parataxis::run(
        sub {
            say 'Main fiber started (TID: ' . Acme::Parataxis->tid . ')';

            # Spawn background workers
            my $f1 = Acme::Parataxis->spawn(
                sub {
                    say '  Task 1: Sleeping (non-blocking for others)...';
                    Acme::Parataxis->await_sleep(1000);
                    return 'Coffee is ready!';
                }
            );
            my $f2 = Acme::Parataxis->spawn(
                sub {
                    say '  Task 2: Calculating... (simulated CPU work)';
                    my $sum = 0;
                    for ( 1 .. 100 ) {
                        $sum += $_;
                        Acme::Parataxis->maybe_yield();    # Be a good neighbor
                    }
                    return $sum;
                }
            );

            # Await results without blocking the main OS thread
            say 'Result 1: ' . $f1->await();
            say 'Result 2: ' . $f2->await();
        }
    );
}

eg/thread_pool.pl  view on Meta::CPAN

use v5.40;
use blib;
use Acme::Parataxis qw[:all];
use Time::HiRes     qw[time];
$|++;

# Demonstrate dynamic thread pool growth and modern API
async {
    say 'Main: Initial thread pool size is ' . Acme::Parataxis::get_thread_pool_size();
    my @futures;
    for my $id ( 1 .. 5 ) {
        push @futures, fiber {
            my $fid        = current_fid();
            my $sleep_time = int( rand(1000) ) + 500;
            say "  [Worker $id] Fiber #$fid will sleep for ${sleep_time}ms in background pool";
            my $start = time();

            # This yields to main, background thread sleeps, scheduler resumes us
            await_sleep($sleep_time);
            my $elapsed = int( ( time() - $start ) * 1000 );
            say "  [Worker $id] Fiber #$fid woke up after ${elapsed}ms!";
            return "Done $id";
        };
    }
    say 'Main: All workers spawned. Waiting for completion...';
    say 'Main: Thread pool size during load: ' . Acme::Parataxis::get_thread_pool_size();
    my $start_time = time();

    # Wait for all workers to finish via futures using modern 'await'
    await $_ for @futures;
    say 'Main: All workers finished!';
    printf "Total wallclock time: %.2fs\n", time() - $start_time;
    say 'Main: Final thread pool size: ' . Acme::Parataxis::get_thread_pool_size();
};

eg/worker_pool.pl  view on Meta::CPAN

Acme::Parataxis::run(
    sub {
        my @jobs = (
            { id => 1, task => 'Fetch User Data',  delay => 800 },
            { id => 2, task => 'Process Payment',  delay => 1200 },
            { id => 3, task => 'Send Email',       delay => 500 },
            { id => 4, task => 'Update Inventory', delay => 1500 },
            { id => 5, task => 'Generate Report',  delay => 900 },
            { id => 6, task => 'Sync Analytics',   delay => 1100 }
        );
        say 'Main: Starting worker pool with 3 fibers...';
        my $start_time = time;

        # Simple shared queue
        my @queue = @jobs;
        my @results;

        # Spawn 3 worker fibers
        my @workers;
        for my $w_id ( 1 .. 3 ) {
            push @workers, Acme::Parataxis->spawn(
                sub {
                    say "  [Worker $w_id] Fiber started.";
                    while ( my $job = shift @queue ) {
                        say "  [Worker $w_id] Processing Job $job->{id}: $job->{task}...";

                        # Simulate a blocking I/O call (DB query, API request, idk...)
                        # This yields to the scheduler while the native thread pool sleeps.
                        Acme::Parataxis->await_sleep( $job->{delay} );
                        say "  [Worker $w_id] Finished Job $job->{id}.";
                        push @results, "Job $job->{id} complete";
                    }
                    return "Worker $w_id finished.";
                }
            );
        }

        # Wait for all workers to complete
        say 'Main: Waiting for pool to drain...';
        $_->await for @workers;
        my $elapsed    = time() - $start_time;
        my $sum_delays = 0;
        $sum_delays += $_->{delay} for @jobs;
        say 'Main: All tasks finished!';
        say 'Total Results: ' . scalar(@results);
        say sprintf 'Total Wallclock Time: %.2fs',     $elapsed;
        say sprintf 'Sum of individual delays: %.2fs', $sum_delays / 1000;
        say sprintf 'Speedup: ~%.2fx', ( $sum_delays / 1000 ) / $elapsed;
    }
);

lib/Acme/Parataxis.pod  view on Meta::CPAN

=head1 SYNOPSIS

The classic way (as I write this, Acme::Parataxis is 5 days old and already has a 'classic' API...)

    use v5.40;
    use Acme::Parataxis;
    $|++;

    Acme::Parataxis::run(
        sub {
            say 'Main task started';

            # Spawn background workers
            my $f1 = Acme::Parataxis->spawn(
                sub {
                    say '  Task 1: Sleeping in a native thread pool...';
                    Acme::Parataxis->await_sleep(1000);
                    say '  Task 1: Ah! What a nice nap...';
                    return 42;
                }
            );
            my $f2 = Acme::Parataxis->spawn(
                sub {
                    say '  Task 2: Performing I/O...';

                    # await_read/write for non-blocking socket handling
                    return 'I/O Done';
                }
            );

            # Block current fiber until results are ready (without blocking the thread)
            say 'Result 1: ' . $f1->await( );
            say 'Result 2: ' . $f2->await( );
        }
    );

Or do things the more modern way:

    use v5.40;
    use Acme::Parataxis qw[:all];
    $|++;

    async {
        say 'Main task started';

        # 'fiber' is a shorter alias for 'spawn'
        my $f1 = fiber {
            say '  Task 1: Sleeping...';
            await_sleep(1000);
            return 42;
        };

        my $f2 = fiber {
            say '  Task 2: Performing I/O...';
            # ...
            return 'I/O Done';
        };

        # 'await' works on fibers and futures
        say 'Result 1: ' . await($f1);
        say 'Result 2: ' . await($f2);
    };

=head1 DESCRIPTION

C<Acme::Parataxis> implements a hybrid concurrency model for Perl, greatly inspired by the concurrency system for the
L<Wren|https://wren.io/concurrency.html> programming language. It combines cooperative multitasking (fibers) with a
preemptive native thread pool.

Fibers are a mechanism for lightweight concurrency. They are similar to threads but are cooperatively scheduled. While
the OS may switch between threads at any time, a fiber only passes control when explicitly told to do so. This makes

lib/Acme/Parataxis.pod  view on Meta::CPAN


While the classic object-oriented API is always available, C<Acme::Parataxis> exports a set of functions (via the
C<:all> tag) that provide a more modern, concise way to write concurrent code.

=head2 C<async { ... }>

A convenience wrapper around C<run( )>. It starts the scheduler, executes the provided block as the main fiber, and
automatically calls C<stop( )> when the block completes.

    async {
        say "The scheduler is running!";
    };

=head2 C<fiber { ... }>

An alias for C<spawn( )>. It creates a new fiber and returns a L<Future|/"Acme::Parataxis::Future OBJECT METHODS">.

    my $f = fiber {
        say "Hello from fiber!";
    };

=head2 C<await( $thing )>

A generic await function. It accepts either an C<Acme::Parataxis> fiber object or an C<Acme::Parataxis::Future> and
suspends the current fiber until the target is ready.

    my $result = await($f);

=head2 C<await_sleep( $ms )>

Suspends the current fiber for C<$ms> milliseconds. This is a non-blocking operation that allows other fibers to run
while the current one is paused.

    async {
        say "Taking a nap...";
        await_sleep(1000);
        say "I'm awake!";
    };

=head2 C<await_read( $fh, $timeout = 5000 )>

Suspends the current fiber until the provided filehandle is ready for reading, or the timeout is reached.

    async {
        await_read($socket);
        my $data = <$socket>;
        say "Received: $data";
    };

=head2 C<await_write( $fh, $timeout = 5000 )>

Suspends the current fiber until the provided filehandle is ready for writing, or the timeout is reached.

    async {
        await_write($socket);
        syswrite($socket, $message);
    };

=head2 C<await_core_id( )>

Returns the ID of the CPU core currently executing the background task. This is a non-blocking operation that offloads
the request to the thread pool and suspends the fiber until the result is ready.

    async {
        my $core = await_core_id( );
        say "Background task handled by CPU core: $core";
    };

=head1 CORE CONCEPTS

=head2 Creating Fibers

All Perl code in this system runs within a fiber. When you start your script or call C<Acme::Parataxis::run>, a "main"
fiber is active. You can create new fibers using C<spawn> or by manually instantiating an C<Acme::Parataxis> object:

    my $fiber = Acme::Parataxis->new(code => sub {
        say "I'm in a fiber!";
    });

Creating a fiber does not run it immediately. It simply prepares the context and waits to be invoked.

=head2 Invoking Fibers

To run a fiber, you "call" it. This suspends the current fiber and executes the called one until it finishes or yields.

    $fiber->call( );

lib/Acme/Parataxis.pod  view on Meta::CPAN

=head1 SCHEDULER FUNCTIONS

The following functions are the primary interface for the integrated cooperative scheduler.

=head2 C<run( $code )>

Starts the event loop and executes C<$code> as the initial fiber. The loop continues to run as long as there are active
fibers or pending background tasks.

    Acme::Parataxis::run(sub {
        say "The scheduler is running!";
    });

=head2 C<spawn( $code )>

Creates a new fiber and adds it to the scheduler's queue. Returns a L<Future|/"Acme::Parataxis::Future OBJECT METHODS">
that will eventually contain the fiber's return value.

    my $future = Acme::Parataxis->spawn(sub {
        return "Hello from fiber #" . Acme::Parataxis->current_fid;
    });

lib/Acme/Parataxis.pod  view on Meta::CPAN

        my $http = My::HTTP->new(verify_SSL => 0);
        my @urls = qw[http://example.com http://perl.org];

        # Spawn tasks for each URL
        my @futures = map {
            my $url = $_;
            Acme::Parataxis->spawn(sub { $http->get($url)->{status} })
        } @urls;

        # Collect results as they become ready
        say "Status for $urls[$_]: " . $futures[$_]->await( ) for 0..$#urls;
    });

=head2 Symmetric Producer/Consumer

A low-level example of Passing control sideways between fibers.

    my ($p, $c);

    $p = Acme::Parataxis->new(code => sub {
        for my $item (qw[Apple Banana Cherry]) {
            say "Producer: Sending $item";
            $c->transfer($item);
        }
        $c->transfer('DONE');
    });

    $c = Acme::Parataxis->new(code => sub {
        my $item = Acme::Parataxis->yield( ); # Initial wait
        while (1) {
            last if $item eq 'DONE';
            say "Consumer: Eating $item";
            $item = $p->transfer( );
        }
    });

    $c->call( ); # Prime consumer
    $p->call( ); # Start producer

=head1 BEST PRACTICES & GOTCHAS

=over



( run in 2.375 seconds using v1.01-cache-2.11-cpan-d7a12ab2c7f )