Data-Queue-Shared

 view release on metacpan or  search on metacpan

bench/contention.pl  view on Meta::CPAN

            while ($got < $per_consumer) {
                $got++ if defined $q->pop;
            }
            POSIX::_exit(0);
        }
        push @cons_pids, $pid;
    }

    my $parent_share = $total - $per_consumer * ($consumers - 1);
    my $got = 0;
    while ($got < $parent_share) {
        $got++ if defined $q->pop;
    }

    waitpid($_, 0) for @prod_pids, @cons_pids;
    print rate_fmt("Shared::Str (mutex, 50B)", time - $t0, $total), "\n";
}

# ----------------------------------------------------------------
# Data::Queue::Shared::Str with push_wait/pop_wait
# ----------------------------------------------------------------
{
    my $msg = "x" x 50;
    my $q = Data::Queue::Shared::Str->new(undef, $qcap);
    my $t0 = time;

    my @prod_pids;
    for (1..$producers) {
        my $pid = fork // die "fork: $!";
        if ($pid == 0) {
            $q->push_wait($msg, 10) for 1..$per_producer;
            POSIX::_exit(0);
        }
        push @prod_pids, $pid;
    }

    my @cons_pids;
    my $per_consumer = int($total / $consumers);
    for my $c (1..$consumers - 1) {
        my $pid = fork // die "fork: $!";
        if ($pid == 0) {
            $q->pop_wait(10) for 1..$per_consumer;
            POSIX::_exit(0);
        }
        push @cons_pids, $pid;
    }

    my $parent_share = $total - $per_consumer * ($consumers - 1);
    $q->pop_wait(10) for 1..$parent_share;

    waitpid($_, 0) for @prod_pids, @cons_pids;
    print rate_fmt("Shared::Str (futex blocking, 50B)", time - $t0, $total), "\n";
}

# ----------------------------------------------------------------
# MCE::Queue (concurrent produce+consume via MCE task groups)
# ----------------------------------------------------------------
if ($has_mce) {
    require MCE;

    # MCE::Step runs task groups concurrently with transparent queue plumbing.
    # task_end + $q->end() is the documented pattern for producer/consumer.
    # MCE::Queue uses socket IPC routed through the manager process.
    # True concurrent MPMC (N producers + N consumers) is not a supported
    # MCE pattern — it's designed for workers→manager or pipeline flows.
    # See bench/vs.pl for MCE single-process and produce→drain comparisons.
    print "  (MCE::Queue: no concurrent MPMC — see bench/vs.pl)\n";
}

# ----------------------------------------------------------------
# Scaling: vary producer/consumer counts
# ----------------------------------------------------------------
print "\n";
print "--- Scaling: Int lock-free, push_wait/pop_wait, cap=$qcap ---\n\n";

for my $config ([1,1], [2,1], [1,2], [2,2], [4,1], [1,4], [4,4]) {
    my ($np, $nc) = @$config;
    my $pp = int($n / $np);
    my $tot = $pp * $np;
    my $pc = int($tot / $nc);

    my $q = Data::Queue::Shared::Int->new(undef, $qcap);
    my $t0 = time;

    my @pids;
    for (1..$np) {
        my $pid = fork // die "fork: $!";
        if ($pid == 0) {
            $q->push_wait($_, 10) for 1..$pp;
            POSIX::_exit(0);
        }
        push @pids, $pid;
    }

    for my $c (1..$nc - 1) {
        my $pid = fork // die "fork: $!";
        if ($pid == 0) {
            $q->pop_wait(10) for 1..$pc;
            POSIX::_exit(0);
        }
        push @pids, $pid;
    }

    my $parent_share = $tot - $pc * ($nc - 1);
    $q->pop_wait(10) for 1..$parent_share;

    waitpid($_, 0) for @pids;
    print rate_fmt(sprintf("  %dP x %dC", $np, $nc), time - $t0, $tot), "\n";
}



( run in 0.776 second using v1.01-cache-2.11-cpan-39bf76dae61 )