Acme-Parataxis

 view release on metacpan or  search on metacpan

eg/worker_pool.pl  view on Meta::CPAN

#!/usr/bin/env perl
use v5.40;
use lib 'lib';
use blib;
use Acme::Parataxis;
use Time::HiRes qw[time];
$|++;

# This simulates a pool of workers processing a queue of jobs.
# Each job 'blocks' by sleeping in the native thread pool (simulating I/O),
# allowing other fibers to continue running concurrently on the main thread.
# I haven't really made good use for any of this because it doesn't work everywhere yet...
Acme::Parataxis::run(
    sub {
        my @jobs = (
            { id => 1, task => 'Fetch User Data',  delay => 800 },
            { id => 2, task => 'Process Payment',  delay => 1200 },
            { id => 3, task => 'Send Email',       delay => 500 },
            { id => 4, task => 'Update Inventory', delay => 1500 },
            { id => 5, task => 'Generate Report',  delay => 900 },
            { id => 6, task => 'Sync Analytics',   delay => 1100 }
        );
        say 'Main: Starting worker pool with 3 fibers...';
        my $start_time = time;

        # Simple shared queue
        my @queue = @jobs;
        my @results;

        # Spawn 3 worker fibers
        my @workers;
        for my $w_id ( 1 .. 3 ) {
            push @workers, Acme::Parataxis->spawn(
                sub {
                    say "  [Worker $w_id] Fiber started.";
                    while ( my $job = shift @queue ) {
                        say "  [Worker $w_id] Processing Job $job->{id}: $job->{task}...";

                        # Simulate a blocking I/O call (DB query, API request, idk...)
                        # This yields to the scheduler while the native thread pool sleeps.
                        Acme::Parataxis->await_sleep( $job->{delay} );
                        say "  [Worker $w_id] Finished Job $job->{id}.";
                        push @results, "Job $job->{id} complete";
                    }
                    return "Worker $w_id finished.";
                }
            );
        }

        # Wait for all workers to complete
        say 'Main: Waiting for pool to drain...';
        $_->await for @workers;
        my $elapsed    = time() - $start_time;
        my $sum_delays = 0;
        $sum_delays += $_->{delay} for @jobs;
        say 'Main: All tasks finished!';
        say 'Total Results: ' . scalar(@results);
        say sprintf 'Total Wallclock Time: %.2fs',     $elapsed;
        say sprintf 'Sum of individual delays: %.2fs', $sum_delays / 1000;
        say sprintf 'Speedup: ~%.2fx', ( $sum_delays / 1000 ) / $elapsed;
    }
);



( run in 0.469 second using v1.01-cache-2.11-cpan-cdf2f3d4e48 )