view release on metacpan or search on metacpan
The classic way (as I write this, Acme::Parataxis is 5 days old and already has a 'classic' API...)
```perl
use v5.40;
use Acme::Parataxis;
$|++;
Acme::Parataxis::run(
sub {
say 'Main task started';
# Spawn background workers
my $f1 = Acme::Parataxis->spawn(
sub {
say ' Task 1: Sleeping in a native thread pool...';
Acme::Parataxis->await_sleep(1000);
say ' Task 1: Ah! What a nice nap...';
return 42;
}
);
my $f2 = Acme::Parataxis->spawn(
sub {
say ' Task 2: Performing I/O...';
# await_read/write for non-blocking socket handling
return 'I/O Done';
}
);
# Block current fiber until results are ready (without blocking the thread)
say 'Result 1: ' . $f1->await( );
say 'Result 2: ' . $f2->await( );
}
);
```
Or do things the more modern way:
```perl
use v5.40;
use Acme::Parataxis qw[:all];
$|++;
async {
say 'Main task started';
# 'fiber' is a shorter alias for 'spawn'
my $f1 = fiber {
say ' Task 1: Sleeping...';
await_sleep(1000);
return 42;
};
my $f2 = fiber {
say ' Task 2: Performing I/O...';
# ...
return 'I/O Done';
};
# 'await' works on fibers and futures
say 'Result 1: ' . await($f1);
say 'Result 2: ' . await($f2);
};
```
# DESCRIPTION
`Acme::Parataxis` implements a hybrid concurrency model for Perl, greatly inspired by the concurrency system for the
[Wren](https://wren.io/concurrency.html) programming language. It combines cooperative multitasking (fibers) with a
preemptive native thread pool.
Fibers are a mechanism for lightweight concurrency. They are similar to threads but are cooperatively scheduled. While
While the classic object-oriented API is always available, `Acme::Parataxis` exports a set of functions (via the
`:all` tag) that provide a more modern, concise way to write concurrent code.
## `async { ... }`
A convenience wrapper around `run( )`. It starts the scheduler, executes the provided block as the main fiber, and
automatically calls `stop( )` when the block completes.
```
async {
say "The scheduler is running!";
};
```
## `fiber { ... }`
An alias for `spawn( )`. It creates a new fiber and returns a [Future](#acme-parataxis-future-object-methods).
```perl
my $f = fiber {
say "Hello from fiber!";
};
```
## `await( $thing )`
A generic await function. It accepts either an `Acme::Parataxis` fiber object or an `Acme::Parataxis::Future` and
suspends the current fiber until the target is ready.
```perl
my $result = await($f);
```
## `await_sleep( $ms )`
Suspends the current fiber for `$ms` milliseconds. This is a non-blocking operation that allows other fibers to run
while the current one is paused.
```
async {
say "Taking a nap...";
await_sleep(1000);
say "I'm awake!";
};
```
## `await_read( $fh, $timeout = 5000 )`
Suspends the current fiber until the provided filehandle is ready for reading, or the timeout is reached.
```perl
async {
await_read($socket);
my $data = <$socket>;
say "Received: $data";
};
```
## `await_write( $fh, $timeout = 5000 )`
Suspends the current fiber until the provided filehandle is ready for writing, or the timeout is reached.
```
async {
await_write($socket);
```
## `await_core_id( )`
Returns the ID of the CPU core currently executing the background task. This is a non-blocking operation that offloads
the request to the thread pool and suspends the fiber until the result is ready.
```perl
async {
my $core = await_core_id( );
say "Background task handled by CPU core: $core";
};
```
# CORE CONCEPTS
## Creating Fibers
All Perl code in this system runs within a fiber. When you start your script or call `Acme::Parataxis::run`, a "main"
fiber is active. You can create new fibers using `spawn` or by manually instantiating an `Acme::Parataxis` object:
```perl
my $fiber = Acme::Parataxis->new(code => sub {
say "I'm in a fiber!";
});
```
Creating a fiber does not run it immediately. It simply prepares the context and waits to be invoked.
## Invoking Fibers
To run a fiber, you "call" it. This suspends the current fiber and executes the called one until it finishes or yields.
```
The following functions are the primary interface for the integrated cooperative scheduler.
## `run( $code )`
Starts the event loop and executes `$code` as the initial fiber. The loop continues to run as long as there are active
fibers or pending background tasks.
```perl
Acme::Parataxis::run(sub {
say "The scheduler is running!";
});
```
## `spawn( $code )`
Creates a new fiber and adds it to the scheduler's queue. Returns a [Future](#acme-parataxis-future-object-methods)
that will eventually contain the fiber's return value.
```perl
my $future = Acme::Parataxis->spawn(sub {
my $http = My::HTTP->new(verify_SSL => 0);
my @urls = qw[http://example.com http://perl.org];
# Spawn tasks for each URL
my @futures = map {
my $url = $_;
Acme::Parataxis->spawn(sub { $http->get($url)->{status} })
} @urls;
# Collect results as they become ready
say "Status for $urls[$_]: " . $futures[$_]->await( ) for 0..$#urls;
});
```
## Symmetric Producer/Consumer
A low-level example of Passing control sideways between fibers.
```perl
my ($p, $c);
$p = Acme::Parataxis->new(code => sub {
for my $item (qw[Apple Banana Cherry]) {
say "Producer: Sending $item";
$c->transfer($item);
}
$c->transfer('DONE');
});
$c = Acme::Parataxis->new(code => sub {
my $item = Acme::Parataxis->yield( ); # Initial wait
while (1) {
last if $item eq 'DONE';
say "Consumer: Eating $item";
$item = $p->transfer( );
}
});
$c->call( ); # Prime consumer
$p->call( ); # Start producer
```
# BEST PRACTICES & GOTCHAS
builder/Acme/Parataxis/Builder.pm view on Meta::CPAN
method write_file( $filename, $content ) { path($filename)->spew_raw($content) or die "Could not open $filename: $!\n" }
method read_file ($filename) { path($filename)->slurp_utf8 or die "Could not open $filename: $!\n" }
method step_build() {
for my $pl_file ( find( qr/\.PL$/, 'lib' ) ) {
( my $pm = $pl_file ) =~ s/\.PL$//;
system $^X, $pl_file->stringify, $pm and die "$pl_file returned $?\n";
}
# C Extension Compilation - removed conditional block
say 'Building libparataxis...';
require Affix::Build; # This module is used for C compilation
require Config;
require File::Spec;
my $arch_dir = catdir(qw[blib arch auto Acme Parataxis]);
mkpath( $arch_dir, $verbose );
my $build = Affix::Build->new(
name => 'parataxis',
flags => {
cflags => join( ' ',
( $Config::Config{ccflags}, '-std=c11', map { '-I' . $_ } ( File::Spec->catdir( $Config::Config{archlibexp}, 'CORE' ), 'src' ) )
builder/Acme/Parataxis/Builder.pm view on Meta::CPAN
File::Spec->catdir( $Config::Config{archlibexp}, 'CORE' ) . ' -l' .
( $Config::Config{libperl} =~ s/^lib//r =~ s/\.a$//r =~ s/\.lib$//r ) .
' -lws2_32' ) : ( $^O eq 'darwin' ? '-undefined dynamic_lookup' : '' )
},
build_dir => $arch_dir,
clean => 0
);
# Add the C source file to be compiled. It's expected to be in 'lib/Acme/'
$build->add('lib/Acme/Parataxis.c');
say "Compiling and linking...";
$build->compile_and_link();
say "Build complete.";
my %modules = map { $_ => catfile( 'blib', $_ ) } find( qr/\.pm$/, 'lib' );
my %docs = map { $_ => catfile( 'blib', $_ ) } find( qr/\.pod$/, 'lib' );
my %scripts = map { $_ => catfile( 'blib', $_ ) } find( qr/(?:)/, 'script' );
my %sdocs = map { $_ => delete $scripts{$_} } grep {/.pod$/} keys %scripts;
pm_to_blib( { %modules, %docs, %sdocs }, catdir(qw[blib lib auto]) );
#
mkpath( catdir(qw[blib script]), $verbose );
for my $src ( keys %scripts ) {
my $dest = $scripts{$src};
my $content = path($src)->slurp_raw;
builder/Acme/Parataxis/Builder.pm view on Meta::CPAN
return;
}
method Build(@args) {
my $method = $self->can( 'step_' . $action );
$method // die "No such action '$action'\n";
exit $method->($self);
}
method Build_PL() {
say sprintf 'Creating new Build script for %s %s', $meta->name, $meta->version;
$self->write_file( 'Build', sprintf <<'', $^X, __PACKAGE__, __PACKAGE__ );
#!%s
use lib 'builder';
use %s;
use Getopt::Long qw[GetOptionsFromArray];
my %%opts = ( @ARGV && $ARGV[0] =~ /\A\w+\z/ ? ( action => shift @ARGV ) : () );
GetOptionsFromArray \@ARGV, \%%opts, qw[install_base=s install_path=s%% installdirs=s destdir=s prefix=s config=s%% uninst:1 verbose:1 dry_run:1 jobs=i];
%s->new(%%opts)->Build();
make_executable('Build');
eg/affinity.pl view on Meta::CPAN
use v5.40;
use blib;
use Acme::Parataxis qw[:all];
$|++;
# Demonstrate Thread Affinity / Core ID using modern API
async {
say 'Main thread (TID: ' . tid() . ')';
my @futures;
for ( 1 .. 20 ) {
push @futures, fiber {
# await_core_id() offloads to the pool and returns the core ID
return await_core_id();
};
}
say 'Main: Tasks spawned, waiting for results...';
my %distribution;
for my $f (@futures) {
my $core = await $f;
if ( defined $core ) {
$distribution{$core}++;
}
else {
warn "Failed to get core ID for a task.\n";
}
}
say 'Background Task Distribution across CPU Cores:';
for my $core ( sort { $a <=> $b } keys %distribution ) {
say " Core $core: " . $distribution{$core} . ' tasks';
}
# Explicitly clear futures to break any potentially clobbered references
# during teardown in some Perl builds
@futures = ();
};
eg/basic.pl view on Meta::CPAN
use v5.40;
use blib;
use Acme::Parataxis;
#
say 'Main thread (TID: ' . Acme::Parataxis->tid . ')';
# Create a worker parataxis
my $worker = Acme::Parataxis->new(
code => sub ($name) {
say "---> Worker '$name' started (FID: " . Acme::Parataxis->current_fid . ")";
for my $i ( 1 .. 3 ) {
say "---> Worker '$name' processing step $i";
my $input = Acme::Parataxis->yield( 'Result from step ' . $i );
say "---> Worker '$name' received: $input";
}
return 'Final success';
}
);
# Drive the worker
say 'Main: Starting worker...';
my $res = $worker->call('Alice');
say 'Main: Worker yielded: ' . $res;
say 'Main: Sending more data...';
$res = $worker->call('Command A');
say 'Main: Worker yielded: ' . $res;
say 'Main: Finishing up...';
$res = $worker->call('Final Command');
say 'Main: Worker returned: ' . $res;
say 'Main: Worker is officially finished.' if $worker->is_done;
eg/http_tiny.pl view on Meta::CPAN
await_write( $self->{fh}, int( $wait * 1000 ) );
}
}
}
$self->SUPER::_do_timeout( $type, 0 );
}
}
# Use the integrated scheduler to run concurrent fetches
async {
say 'Starting concurrent fetches...';
my $http = My::HTTP->new( timeout => 10, verify_SSL => 0 );
my @urls = qw[
http://www.google.com
http://www.example.com
https://www.perl.org
https://metacpan.org
];
my @futures;
for my $url (@urls) {
push @futures, fiber {
my $start = time();
say sprintf ' [Fiber %d] Fetching %s...', current_fid(), $url;
my $res = $http->get($url);
my $elapsed = time() - $start;
say sprintf ' [Fiber %d] Done %s (Status: %d) in %.2fs', current_fid(), $url, $res->{status}, $elapsed;
return $res;
};
}
# Wait for all fibers to complete and collect results
for my $i ( 0 .. $#urls ) {
my $res = await $futures[$i];
say sprintf 'Final Result for %s: %d %s', $urls[$i], $res->{status}, $res->{reason};
}
say 'All tasks finished.';
};
eg/modern.pl view on Meta::CPAN
use v5.40;
use blib;
use Acme::Parataxis qw[:all];
#
async {
say 'Main fiber started';
# 'fiber' is the modern alias for 'spawn'
my $f1 = fiber {
say ' Fiber 1: Sleeping...';
await_sleep(1000);
say ' Fiber 1: Woke up!';
return 42;
};
# 'await' is the modern way to wait for results
my $res1 = await $f1;
say 'Main: Fiber 1 returned ' . $res1;
say 'Creating a manual fiber for bidirectional communication...';
# Even if you create it with Acme::Parataxis->new,
# you can use 'await' if it's running in the scheduler
my $f2 = Acme::Parataxis->new(
code => sub {
say ' Fiber 2: Yielding some data...';
my $received = yield('Data from F2');
say ' Fiber 2: Resumed with: ' . $received;
return 'Final result from F2';
}
);
# The manual call/yield pattern works alongside the scheduler
my $yielded = $f2->call();
say 'Main: Fiber 2 yielded: ' . $yielded;
$f2->call('Data from Main');
say 'Main: Fiber 2 is ' . ( $f2->is_done ? 'done' : 'still running' );
say 'All done!';
};
eg/port_scanner.pl view on Meta::CPAN
# This demonstrates how fibers can be used to perform concurrent
# network operations using standard Perl modules (IO::Socket) and the
# Parataxis non-blocking I/O scheduler.
my $target = '127.0.0.1';
my @ports = ( 22, 80, 443, 3306, 3389, 5432, 8080, 9999 ); # Common ports + our demo server
# To make the test interesting, let's start a tiny listener on 9999 in a fiber
Acme::Parataxis::run(
sub {
say "Main: Starting parallel scan of $target...";
my $start_time = time;
# Start a dummy listener so we have at least one open port to find
my $server = IO::Socket::INET->new( LocalAddr => $target, LocalPort => 9999, Listen => 5, Reuse => 1, Blocking => 0 );
say 'Main: Local dummy server started on 9999' if $server;
my @results;
my @futures;
# Spawn a fiber for each port we want to scan
for my $port (@ports) {
push @futures, Acme::Parataxis->spawn(
sub {
my $s = IO::Socket::INET->new( PeerAddr => $target, PeerPort => $port, Proto => 'tcp', Blocking => 0 );
# If the socket failed immediately (e.g. invalid target)
eg/port_scanner.pl view on Meta::CPAN
}
}
);
}
# Wait for all scanners to finish
for my $f (@futures) {
my $r = $f->await();
push @results, $r;
if ( $r->{status} eq 'OPEN' ) {
say " [!] Port $r->{port} is OPEN";
}
}
my $elapsed = time() - $start_time;
say 'Main: Scan complete.';
say 'Summary:';
for my $r ( sort { $a->{port} <=> $b->{port} } @results ) {
say sprintf ' Port %-5d: %s', $r->{port}, $r->{status};
}
say sprintf 'Total scan time: %.2fs', $elapsed;
$server->close() if $server;
Acme::Parataxis::stop();
}
);
eg/preempt.pl view on Meta::CPAN
use v5.40;
use blib;
use Acme::Parataxis;
$|++;
# Set threshold to 100 opcodes (very low for demo)
Acme::Parataxis::set_preempt_threshold(100);
Acme::Parataxis::run(
sub {
say 'Starting cooperative preemption demo...';
my @futures;
push @futures, Acme::Parataxis->spawn(
sub {
my $count = 0;
while ( $count < 5 ) {
say ' [A] iteration ' . ++$count;
# busy loop
for ( 1 .. 50 ) {
my $x = $_ * 2;
Acme::Parataxis->maybe_yield();
}
}
say ' [A] finished';
}
);
push @futures, Acme::Parataxis->spawn(
sub {
my $count = 0;
while ( $count < 5 ) {
say ' [B] iteration ' . ++$count;
for ( 1 .. 50 ) {
my $x = $_ * 2;
Acme::Parataxis->maybe_yield();
}
}
say ' [B] finished';
}
);
say 'Main spawned both, waiting...';
$_->await for @futures;
say 'Main finished waiting.';
}
);
eg/socket.pl view on Meta::CPAN
use Acme::Parataxis;
use IO::Socket::INET;
# Create a simple echo server in a coroutine
Acme::Parataxis::run(
sub {
my $server = IO::Socket::INET->new( LocalAddr => '127.0.0.1', LocalPort => 9999, Proto => 'tcp', Listen => 5, Reuse => 1 ) or
die 'Could not create server: ' . $!;
$server->blocking(0);
$server->autoflush(1);
say 'Server listening on 127.0.0.1:9999';
my $client_done = 0;
Acme::Parataxis->spawn(
sub {
say 'Client starting...';
my $client = IO::Socket::INET->new( PeerAddr => '127.0.0.1', PeerPort => 9999, Proto => 'tcp' ) or
die 'Could not create client: ' . $!;
$client->blocking(0);
$client->autoflush(1);
say 'Client connected, waiting to write...';
Acme::Parataxis->await_write($client);
$client->print("Hello from Parataxis!\n");
say 'Client waiting for response...';
Acme::Parataxis->await_read($client);
my $line = <$client>;
say 'Client received: ' . ( $line // 'UNDEF' );
$client->close();
$client_done = 1;
say 'Client finished, stopping runtime...';
Acme::Parataxis::stop();
}
);
while ( !$client_done ) {
# Check if there is a connection waiting (non-blocking-ish)
# We use a short sleep to not hog the CPU while polling client_done
# but in a real app you'd just wait on the server socket.
my $res = Acme::Parataxis->await_read($server);
last if $client_done; # Check immediately after waking up
if ( $res > 0 ) {
my $conn = $server->accept();
if ($conn) {
say 'Server accepted connection!';
Acme::Parataxis->spawn(
sub {
$conn->blocking(0);
$conn->autoflush(1);
say ' Handler started for ' . fileno($conn);
Acme::Parataxis->await_read($conn);
my $line = <$conn>;
if ( defined $line ) {
chomp $line;
say ' Server echoing: ' . $line;
Acme::Parataxis->await_write($conn);
$conn->print("Echo: $line\n");
}
else {
say ' Server got EOF';
}
$conn->close();
say ' Handler finished';
}
);
}
}
# If no client connection and client is done, we exit the loop
last if $client_done;
}
say 'Main server loop exiting...';
$server->close();
}
);
say 'Script finished.';
eg/stress_fibers.pl view on Meta::CPAN
use v5.40;
use blib;
use Acme::Parataxis qw[:all];
use Time::HiRes qw[time];
# Stress Test 1: High Fiber Count & Rapid Lifecycle
# This tests the scheduler's ability to handle many short-lived fibers.
async {
my $total_fibers = 1000; # We increased MAX_FIBERS in the C code to 1024
my $iterations = 5;
say "Starting Stress Test: $total_fibers fibers, $iterations iterations...";
my $start_time = time();
for my $iter ( 1 .. $iterations ) {
my $iter_start = time();
my @futures;
# Spawn a batch of fibers
for my $i ( 1 .. $total_fibers ) {
push @futures, fiber {
# Do a tiny bit of work
eg/stress_fibers.pl view on Meta::CPAN
yield() if $i % 2 == 0; # Mix in some yields
return $val;
};
}
# Wait for all to finish
for my $f (@futures) {
await $f;
}
my $elapsed = time() - $iter_start;
say sprintf ' Iteration %2d completed in %.4fs', $iter, $elapsed;
}
my $total_elapsed = time() - $start_time;
say sprintf 'Total time for %d fibers across %d iterations: %.4fs', $total_fibers, $iterations, $total_elapsed;
say 'Average time per fiber lifecycle: ' . ( $total_elapsed / ( $total_fibers * $iterations ) ) . 's';
};
eg/stress_io.pl view on Meta::CPAN
use Acme::Parataxis qw[:all];
use Time::HiRes qw[time];
$|++;
# Stress Test 2: Concurrent I/O (Background Thread Pool)
# This tests the ability to handle many simultaneous blocking background tasks.
async {
my $concurrency = 64; # Half of MAX_FIBERS
my $iterations = 5;
my $sleep_ms = 500;
say "Starting I/O Stress Test: $concurrency concurrent sleeps of ${sleep_ms}ms...";
my $start_time = time();
for my $iter ( 1 .. $iterations ) {
my $iter_start = time();
my @futures;
# Spawn concurrent sleep fibers
for my $i ( 1 .. $concurrency ) {
push @futures, fiber {
no warnings 'recursion';
eg/stress_io.pl view on Meta::CPAN
}
# Wait for all to finish
for my $f (@futures) {
await $f;
}
my $elapsed = time() - $iter_start;
# Since they are concurrent and $concurrency > CPU count (usually),
# it should take roughly (ceil(concurrency / num_cpus) * sleep_ms)
say sprintf ' Iteration %2d completed in %.4fs', $iter, $elapsed;
}
my $total_elapsed = time() - $start_time;
say sprintf 'Total time for %d concurrent sleeps across %d iterations: %.4fs', $concurrency, $iterations, $total_elapsed;
};
eg/stress_preempt.pl view on Meta::CPAN
# Stress Test 3: Preemption and Deep Call Stacks
# This tests the integrity of the stack teleportation during deep recursion
# and the effectiveness of the preemption threshold.
# Configuration
my $recursion_depth = 50; # Not too deep to overflow 4MB C-stack, but deep enough to test pads
my $preempt_at = 100; # Operations before forced yield
my $num_fibers = 10; # Number of competing CPU-bound fibers
Acme::Parataxis::set_preempt_threshold($preempt_at);
async {
say 'Starting Preemption Stress Test...';
say "Recursion depth: $recursion_depth, Preempt threshold: $preempt_at, Fibers: $num_fibers";
my $start_time = time();
my @futures;
# Recursive function to stress pads and preemption
sub recursive_math( $depth_in, $fiber_id ) {
no warnings 'uninitialized';
my $depth = $depth_in; # Localize
return 1 if $depth <= 0;
# Do some CPU work and potential preemption
eg/stress_preempt.pl view on Meta::CPAN
my $result = $sub_result + $acc;
# Another preemption check after coming back up the stack
maybe_yield();
return $result;
}
# Spawn competing CPU-bound recursive fibers
for my $i ( 1 .. $num_fibers ) {
push @futures, fiber {
say " [Fiber $i] Starting deep recursion...";
my $val = recursive_math( $recursion_depth, $i );
say " [Fiber $i] Done with result: $val";
return $val;
};
}
# Wait for all to finish
for my $f (@futures) {
await $f;
}
my $total_elapsed = time() - $start_time;
say sprintf 'Total time for %d competing recursive fibers: %.4fs', $num_fibers, $total_elapsed;
# Reset preemption threshold for the rest of the script/environment
Acme::Parataxis::set_preempt_threshold(0);
};
eg/symmetric.pl view on Meta::CPAN
use v5.40;
use blib;
use Acme::Parataxis;
# Symmetric coroutines (Producer/Consumer)
my ( $producer, $consumer );
$producer = Acme::Parataxis->new(
code => sub {
for my $item (qw[Apple Banana Cherry]) {
say "Producer: Created $item. Transferring to Consumer...";
$consumer->transfer($item);
say 'Producer: Consumer gave control back. Moving to next item.';
}
say 'Producer: Out of items. Telling consumer to finish.';
$consumer->transfer(undef);
}
);
$consumer = Acme::Parataxis->new(
code => sub {
# Initial yield to wait for the first item from the Producer
my $item = Acme::Parataxis->yield();
while ( defined $item ) {
say " Consumer: Received '$item'. Processing...";
say ' Consumer: Done processing. Transferring back to Producer...';
# Transfer back to producer and wait for the NEXT item
$item = $producer->transfer();
}
say ' Consumer: Shutting down.';
Acme::Parataxis->root->transfer(); # Return to main
}
);
say 'Main: Starting the dance...';
# We need to prime the consumer so it hits its yield()
$consumer->call();
# Now start the producer
$producer->call();
say 'Main: The dance is over.';
eg/synopsis.pl view on Meta::CPAN
{
use v5.40;
use blib;
use Acme::Parataxis;
$|++;
# Basic usage with the integrated scheduler
Acme::Parataxis::run(
sub {
say 'Main fiber started (TID: ' . Acme::Parataxis->tid . ')';
# Spawn background workers
my $f1 = Acme::Parataxis->spawn(
sub {
say ' Task 1: Sleeping (non-blocking for others)...';
Acme::Parataxis->await_sleep(1000);
return 'Coffee is ready!';
}
);
my $f2 = Acme::Parataxis->spawn(
sub {
say ' Task 2: Calculating... (simulated CPU work)';
my $sum = 0;
for ( 1 .. 100 ) {
$sum += $_;
Acme::Parataxis->maybe_yield(); # Be a good neighbor
}
return $sum;
}
);
# Await results without blocking the main OS thread
say 'Result 1: ' . $f1->await();
say 'Result 2: ' . $f2->await();
}
);
}
eg/thread_pool.pl view on Meta::CPAN
use v5.40;
use blib;
use Acme::Parataxis qw[:all];
use Time::HiRes qw[time];
$|++;
# Demonstrate dynamic thread pool growth and modern API
async {
say 'Main: Initial thread pool size is ' . Acme::Parataxis::get_thread_pool_size();
my @futures;
for my $id ( 1 .. 5 ) {
push @futures, fiber {
my $fid = current_fid();
my $sleep_time = int( rand(1000) ) + 500;
say " [Worker $id] Fiber #$fid will sleep for ${sleep_time}ms in background pool";
my $start = time();
# This yields to main, background thread sleeps, scheduler resumes us
await_sleep($sleep_time);
my $elapsed = int( ( time() - $start ) * 1000 );
say " [Worker $id] Fiber #$fid woke up after ${elapsed}ms!";
return "Done $id";
};
}
say 'Main: All workers spawned. Waiting for completion...';
say 'Main: Thread pool size during load: ' . Acme::Parataxis::get_thread_pool_size();
my $start_time = time();
# Wait for all workers to finish via futures using modern 'await'
await $_ for @futures;
say 'Main: All workers finished!';
printf "Total wallclock time: %.2fs\n", time() - $start_time;
say 'Main: Final thread pool size: ' . Acme::Parataxis::get_thread_pool_size();
};
eg/worker_pool.pl view on Meta::CPAN
Acme::Parataxis::run(
sub {
my @jobs = (
{ id => 1, task => 'Fetch User Data', delay => 800 },
{ id => 2, task => 'Process Payment', delay => 1200 },
{ id => 3, task => 'Send Email', delay => 500 },
{ id => 4, task => 'Update Inventory', delay => 1500 },
{ id => 5, task => 'Generate Report', delay => 900 },
{ id => 6, task => 'Sync Analytics', delay => 1100 }
);
say 'Main: Starting worker pool with 3 fibers...';
my $start_time = time;
# Simple shared queue
my @queue = @jobs;
my @results;
# Spawn 3 worker fibers
my @workers;
for my $w_id ( 1 .. 3 ) {
push @workers, Acme::Parataxis->spawn(
sub {
say " [Worker $w_id] Fiber started.";
while ( my $job = shift @queue ) {
say " [Worker $w_id] Processing Job $job->{id}: $job->{task}...";
# Simulate a blocking I/O call (DB query, API request, idk...)
# This yields to the scheduler while the native thread pool sleeps.
Acme::Parataxis->await_sleep( $job->{delay} );
say " [Worker $w_id] Finished Job $job->{id}.";
push @results, "Job $job->{id} complete";
}
return "Worker $w_id finished.";
}
);
}
# Wait for all workers to complete
say 'Main: Waiting for pool to drain...';
$_->await for @workers;
my $elapsed = time() - $start_time;
my $sum_delays = 0;
$sum_delays += $_->{delay} for @jobs;
say 'Main: All tasks finished!';
say 'Total Results: ' . scalar(@results);
say sprintf 'Total Wallclock Time: %.2fs', $elapsed;
say sprintf 'Sum of individual delays: %.2fs', $sum_delays / 1000;
say sprintf 'Speedup: ~%.2fx', ( $sum_delays / 1000 ) / $elapsed;
}
);
lib/Acme/Parataxis.pod view on Meta::CPAN
=head1 SYNOPSIS
The classic way (as I write this, Acme::Parataxis is 5 days old and already has a 'classic' API...)
use v5.40;
use Acme::Parataxis;
$|++;
Acme::Parataxis::run(
sub {
say 'Main task started';
# Spawn background workers
my $f1 = Acme::Parataxis->spawn(
sub {
say ' Task 1: Sleeping in a native thread pool...';
Acme::Parataxis->await_sleep(1000);
say ' Task 1: Ah! What a nice nap...';
return 42;
}
);
my $f2 = Acme::Parataxis->spawn(
sub {
say ' Task 2: Performing I/O...';
# await_read/write for non-blocking socket handling
return 'I/O Done';
}
);
# Block current fiber until results are ready (without blocking the thread)
say 'Result 1: ' . $f1->await( );
say 'Result 2: ' . $f2->await( );
}
);
Or do things the more modern way:
use v5.40;
use Acme::Parataxis qw[:all];
$|++;
async {
say 'Main task started';
# 'fiber' is a shorter alias for 'spawn'
my $f1 = fiber {
say ' Task 1: Sleeping...';
await_sleep(1000);
return 42;
};
my $f2 = fiber {
say ' Task 2: Performing I/O...';
# ...
return 'I/O Done';
};
# 'await' works on fibers and futures
say 'Result 1: ' . await($f1);
say 'Result 2: ' . await($f2);
};
=head1 DESCRIPTION
C<Acme::Parataxis> implements a hybrid concurrency model for Perl, greatly inspired by the concurrency system for the
L<Wren|https://wren.io/concurrency.html> programming language. It combines cooperative multitasking (fibers) with a
preemptive native thread pool.
Fibers are a mechanism for lightweight concurrency. They are similar to threads but are cooperatively scheduled. While
the OS may switch between threads at any time, a fiber only passes control when explicitly told to do so. This makes
lib/Acme/Parataxis.pod view on Meta::CPAN
While the classic object-oriented API is always available, C<Acme::Parataxis> exports a set of functions (via the
C<:all> tag) that provide a more modern, concise way to write concurrent code.
=head2 C<async { ... }>
A convenience wrapper around C<run( )>. It starts the scheduler, executes the provided block as the main fiber, and
automatically calls C<stop( )> when the block completes.
async {
say "The scheduler is running!";
};
=head2 C<fiber { ... }>
An alias for C<spawn( )>. It creates a new fiber and returns a L<Future|/"Acme::Parataxis::Future OBJECT METHODS">.
my $f = fiber {
say "Hello from fiber!";
};
=head2 C<await( $thing )>
A generic await function. It accepts either an C<Acme::Parataxis> fiber object or an C<Acme::Parataxis::Future> and
suspends the current fiber until the target is ready.
my $result = await($f);
=head2 C<await_sleep( $ms )>
Suspends the current fiber for C<$ms> milliseconds. This is a non-blocking operation that allows other fibers to run
while the current one is paused.
async {
say "Taking a nap...";
await_sleep(1000);
say "I'm awake!";
};
=head2 C<await_read( $fh, $timeout = 5000 )>
Suspends the current fiber until the provided filehandle is ready for reading, or the timeout is reached.
async {
await_read($socket);
my $data = <$socket>;
say "Received: $data";
};
=head2 C<await_write( $fh, $timeout = 5000 )>
Suspends the current fiber until the provided filehandle is ready for writing, or the timeout is reached.
async {
await_write($socket);
syswrite($socket, $message);
};
=head2 C<await_core_id( )>
Returns the ID of the CPU core currently executing the background task. This is a non-blocking operation that offloads
the request to the thread pool and suspends the fiber until the result is ready.
async {
my $core = await_core_id( );
say "Background task handled by CPU core: $core";
};
=head1 CORE CONCEPTS
=head2 Creating Fibers
All Perl code in this system runs within a fiber. When you start your script or call C<Acme::Parataxis::run>, a "main"
fiber is active. You can create new fibers using C<spawn> or by manually instantiating an C<Acme::Parataxis> object:
my $fiber = Acme::Parataxis->new(code => sub {
say "I'm in a fiber!";
});
Creating a fiber does not run it immediately. It simply prepares the context and waits to be invoked.
=head2 Invoking Fibers
To run a fiber, you "call" it. This suspends the current fiber and executes the called one until it finishes or yields.
$fiber->call( );
lib/Acme/Parataxis.pod view on Meta::CPAN
=head1 SCHEDULER FUNCTIONS
The following functions are the primary interface for the integrated cooperative scheduler.
=head2 C<run( $code )>
Starts the event loop and executes C<$code> as the initial fiber. The loop continues to run as long as there are active
fibers or pending background tasks.
Acme::Parataxis::run(sub {
say "The scheduler is running!";
});
=head2 C<spawn( $code )>
Creates a new fiber and adds it to the scheduler's queue. Returns a L<Future|/"Acme::Parataxis::Future OBJECT METHODS">
that will eventually contain the fiber's return value.
my $future = Acme::Parataxis->spawn(sub {
return "Hello from fiber #" . Acme::Parataxis->current_fid;
});
lib/Acme/Parataxis.pod view on Meta::CPAN
my $http = My::HTTP->new(verify_SSL => 0);
my @urls = qw[http://example.com http://perl.org];
# Spawn tasks for each URL
my @futures = map {
my $url = $_;
Acme::Parataxis->spawn(sub { $http->get($url)->{status} })
} @urls;
# Collect results as they become ready
say "Status for $urls[$_]: " . $futures[$_]->await( ) for 0..$#urls;
});
=head2 Symmetric Producer/Consumer
A low-level example of Passing control sideways between fibers.
my ($p, $c);
$p = Acme::Parataxis->new(code => sub {
for my $item (qw[Apple Banana Cherry]) {
say "Producer: Sending $item";
$c->transfer($item);
}
$c->transfer('DONE');
});
$c = Acme::Parataxis->new(code => sub {
my $item = Acme::Parataxis->yield( ); # Initial wait
while (1) {
last if $item eq 'DONE';
say "Consumer: Eating $item";
$item = $p->transfer( );
}
});
$c->call( ); # Prime consumer
$p->call( ); # Start producer
=head1 BEST PRACTICES & GOTCHAS
=over