Data-ReqRep-Shared

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN

    client suffices. Memory: 64 bytes/slot (Int) or (32 + "resp_size"
    rounded up to 64) bytes/slot (Str).
    "resp_size" -- max response payload bytes (Str only). Fixed per slot.
    Responses exceeding this croak. Pick the 99th percentile.
    "arena" -- request data arena bytes (Str only, default "req_cap * 256").
    Increase for large requests. Monitor "arena_used" in stats().

  Benchmarks
    Linux x86_64. Run "perl -Mblib bench/vs.pl 50000" to reproduce.

        SINGLE-PROCESS ECHO (200K iterations)
        ReqRep::Int (lock-free)    1.8M req/s
        ReqRep::Str (12B, mutex)   1.2M req/s
        ReqRep::Str batch (100x)   1.4M req/s

        CROSS-PROCESS ECHO (50K iterations, 12B payload)
        Pipe pair (1:1)            240K req/s
        Unix socketpair (1:1)      222K req/s
        ReqRep::Int                202K req/s  *
        ReqRep::Str                177K req/s  *
        IPC::Msg (SysV)            165K req/s
        TCP loopback               115K req/s
        MCE::Channel                96K req/s
        Socketpair via broker       82K req/s
        Forks::Queue (Shmem)         5K req/s

bench/bench.pl  view on Meta::CPAN

    }

    my $t0 = time();
    for (1..$N) {
        my $id = $cli->send($msg);
        my ($r, $ri) = $srv->recv;
        $srv->reply($ri, $r);
        $cli->get($id);
    }
    my $el = time() - $t0;
    printf "Single-process echo:     %s req/s (%d iterations, %.1f ms)\n",
        fmt_rate($N / $el), $N, $el * 1000;
    $srv->unlink;
}

# --- Cross-process echo ---
{
    my $path = tmpnam();
    my $srv = Data::ReqRep::Shared->new($path, 1024, 64, 4096);

    my $pid = fork // die "fork: $!";

bench/bench.pl  view on Meta::CPAN

        }
        exit 0;
    }

    my $cli = Data::ReqRep::Shared::Client->new($path);
    for (1..1000) { $cli->req($msg) }  # warmup

    my $t0 = time();
    for (1..$N) { $cli->req($msg) }
    my $el = time() - $t0;
    printf "Cross-process echo:      %s req/s (%d iterations, %.1f ms)\n",
        fmt_rate($N / $el), $N, $el * 1000;
    waitpid $pid, 0;
    $srv->unlink;
}

# --- Batch recv echo ---
{
    my $batch = 100;
    my $iters = int($N / $batch);
    my $total = $iters * $batch;

bench/bench.pl  view on Meta::CPAN

        my @ids;
        push @ids, $cli->send($msg) for 1..$batch;
        my @items = $srv->recv_multi($batch);
        while (@items) {
            my (undef, $id) = splice @items, 0, 2;
            $srv->reply($id, $msg);
        }
        $cli->get($_) for @ids;
    }
    my $el = time() - $t0;
    printf "Batch echo (%d/batch):   %s req/s (%d iterations, %.1f ms)\n",
        $batch, fmt_rate($total / $el), $total, $el * 1000;
    $srv->unlink;
}

# --- req_wait with timeout ---
{
    my $path = tmpnam();
    my $srv = Data::ReqRep::Shared->new($path, 1024, 64, 4096);

    my $pid = fork // die "fork: $!";

bench/bench.pl  view on Meta::CPAN

        }
        exit 0;
    }

    my $cli = Data::ReqRep::Shared::Client->new($path);
    for (1..1000) { $cli->req_wait($msg, 5.0) }  # warmup

    my $t0 = time();
    for (1..$N) { $cli->req_wait($msg, 5.0) }
    my $el = time() - $t0;
    printf "Cross-process req_wait:  %s req/s (%d iterations, %.1f ms)\n",
        fmt_rate($N / $el), $N, $el * 1000;
    waitpid $pid, 0;
    $srv->unlink;
}

bench/bench_int.pl  view on Meta::CPAN


my $N = $ARGV[0] || 200_000;

sub fmt_rate {
    my $r = shift;
    return sprintf("%.1fM", $r / 1e6) if $r >= 1e6;
    return sprintf("%.1fK", $r / 1e3) if $r >= 1e3;
    return sprintf("%.0f", $r);
}

print "ReqRep Int vs Str, $N iterations\n\n";

# --- Single-process Int ---
{
    my $path = tmpnam();
    my $srv = Data::ReqRep::Shared::Int->new($path, 1024, 64);
    my $cli = Data::ReqRep::Shared::Int::Client->new($path);

    for (1..1000) {
        my $id = $cli->send(42);
        my ($v, $ri) = $srv->recv;

bench/vs.pl  view on Meta::CPAN

my $N = $ARGV[0] || 50_000;
my @sizes = (12, 1024);

sub fmt_rate {
    my $r = shift;
    return sprintf("%.1fM", $r / 1e6) if $r >= 1e6;
    return sprintf("%.1fK", $r / 1e3) if $r >= 1e3;
    return sprintf("%.0f", $r);
}

print "Cross-process echo round-trip, $N iterations\n\n";

# --- Data::ReqRep::Shared (Str) ---
for my $size (@sizes) {
    my $msg = "x" x $size;
    my $path = tmpnam();
    my $resp_size = $size + 64;

    require Data::ReqRep::Shared;
    require Data::ReqRep::Shared::Client;

bench/vs.pl  view on Meta::CPAN

        printf "  %-32s %8s req/s  (%dB payload)\n",
            "MCE::Channel (Simple)", fmt_rate($N / $el), $size;
    }
}

# --- Forks::Queue (Shmem, two queues) ---
if (eval { require Forks::Queue; 1 }) {
    print "\n";
    for my $size (@sizes) {
        my $msg = "x" x $size;
        my $n = $N > 10_000 ? 10_000 : $N;  # Forks::Queue is slow, cap iterations

        my $req_q = Forks::Queue->new(impl => 'Shmem');
        my $rep_q = Forks::Queue->new(impl => 'Shmem');

        my $pid = fork // die "fork: $!";
        if ($pid == 0) {
            for (1..($n + 100)) {
                my $buf = $req_q->dequeue;
                last unless defined $buf;
                $rep_q->enqueue($buf);

lib/Data/ReqRep/Shared.pm  view on Meta::CPAN

=item C<arena> -- request data arena bytes (Str only, default
C<req_cap * 256>). Increase for large requests. Monitor
C<arena_used> in C<stats()>.

=back

=head2 Benchmarks

Linux x86_64. Run C<perl -Mblib bench/vs.pl 50000> to reproduce.

    SINGLE-PROCESS ECHO (200K iterations)
    ReqRep::Int (lock-free)    1.8M req/s
    ReqRep::Str (12B, mutex)   1.2M req/s
    ReqRep::Str batch (100x)   1.4M req/s

    CROSS-PROCESS ECHO (50K iterations, 12B payload)
    Pipe pair (1:1)            240K req/s
    Unix socketpair (1:1)      222K req/s
    ReqRep::Int                202K req/s  *
    ReqRep::Str                177K req/s  *
    IPC::Msg (SysV)            165K req/s
    TCP loopback               115K req/s
    MCE::Channel                96K req/s
    Socketpair via broker       82K req/s
    Forks::Queue (Shmem)         5K req/s

xt/concurrent_cancel.t  view on Meta::CPAN

        is $? >> 8, 0, "clear race: child $pid unblocked and got undef";
    }
    my $dt = time - $t0;
    ok $dt < 2.0, sprintf("clear race: all children unblocked in %.3fs", $dt);

    $srv->unlink;
}

# ============================================================
# 4. Rapid cancel/send on same slot: verify generation prevents ABA
#    across many iterations with minimal slot count
# ============================================================
{
    my $path = tmpnam();
    my $srv = Data::ReqRep::Shared->new($path, 256, 1, 64);  # 1 slot!
    my $cli = Data::ReqRep::Shared::Client->new($path);

    my $aba_detected = 0;
    my $ok_count = 0;

    for my $i (1..500) {

xt/concurrent_cancel.t  view on Meta::CPAN

        my ($rq2, $ri2) = $srv->recv;
        my $r2 = $srv->reply($ri2, "good$i");
        if ($r2) {
            my $resp = $cli->get($id2);
            $ok_count++ if defined $resp && $resp eq "good$i";
        }
    }

    ok $aba_detected > 0, "ABA rapid: generation prevented $aba_detected stale replies";
    ok $ok_count > 0, "ABA rapid: $ok_count correct round-trips";
    diag "aba_detected=$aba_detected ok_count=$ok_count out of 500 iterations";

    $srv->unlink;
}

done_testing;



( run in 1.898 second using v1.01-cache-2.11-cpan-71847e10f99 )