Data-ReqRep-Shared
view release on metacpan or search on metacpan
client suffices. Memory: 64 bytes/slot (Int) or (32 + "resp_size"
rounded up to 64) bytes/slot (Str).
"resp_size" -- max response payload bytes (Str only). Fixed per slot.
Responses exceeding this croak. Pick the 99th percentile.
"arena" -- request data arena bytes (Str only, default "req_cap * 256").
Increase for large requests. Monitor "arena_used" in stats().
Benchmarks
Linux x86_64. Run "perl -Mblib bench/vs.pl 50000" to reproduce.
SINGLE-PROCESS ECHO (200K iterations)
ReqRep::Int (lock-free) 1.8M req/s
ReqRep::Str (12B, mutex) 1.2M req/s
ReqRep::Str batch (100x) 1.4M req/s
CROSS-PROCESS ECHO (50K iterations, 12B payload)
Pipe pair (1:1) 240K req/s
Unix socketpair (1:1) 222K req/s
ReqRep::Int 202K req/s *
ReqRep::Str 177K req/s *
IPC::Msg (SysV) 165K req/s
TCP loopback 115K req/s
MCE::Channel 96K req/s
Socketpair via broker 82K req/s
Forks::Queue (Shmem) 5K req/s
bench/bench.pl view on Meta::CPAN
}
my $t0 = time();
for (1..$N) {
my $id = $cli->send($msg);
my ($r, $ri) = $srv->recv;
$srv->reply($ri, $r);
$cli->get($id);
}
my $el = time() - $t0;
printf "Single-process echo: %s req/s (%d iterations, %.1f ms)\n",
fmt_rate($N / $el), $N, $el * 1000;
$srv->unlink;
}
# --- Cross-process echo ---
{
my $path = tmpnam();
my $srv = Data::ReqRep::Shared->new($path, 1024, 64, 4096);
my $pid = fork // die "fork: $!";
bench/bench.pl view on Meta::CPAN
}
exit 0;
}
my $cli = Data::ReqRep::Shared::Client->new($path);
for (1..1000) { $cli->req($msg) } # warmup
my $t0 = time();
for (1..$N) { $cli->req($msg) }
my $el = time() - $t0;
printf "Cross-process echo: %s req/s (%d iterations, %.1f ms)\n",
fmt_rate($N / $el), $N, $el * 1000;
waitpid $pid, 0;
$srv->unlink;
}
# --- Batch recv echo ---
{
my $batch = 100;
my $iters = int($N / $batch);
my $total = $iters * $batch;
bench/bench.pl view on Meta::CPAN
my @ids;
push @ids, $cli->send($msg) for 1..$batch;
my @items = $srv->recv_multi($batch);
while (@items) {
my (undef, $id) = splice @items, 0, 2;
$srv->reply($id, $msg);
}
$cli->get($_) for @ids;
}
my $el = time() - $t0;
printf "Batch echo (%d/batch): %s req/s (%d iterations, %.1f ms)\n",
$batch, fmt_rate($total / $el), $total, $el * 1000;
$srv->unlink;
}
# --- req_wait with timeout ---
{
my $path = tmpnam();
my $srv = Data::ReqRep::Shared->new($path, 1024, 64, 4096);
my $pid = fork // die "fork: $!";
bench/bench.pl view on Meta::CPAN
}
exit 0;
}
my $cli = Data::ReqRep::Shared::Client->new($path);
for (1..1000) { $cli->req_wait($msg, 5.0) } # warmup
my $t0 = time();
for (1..$N) { $cli->req_wait($msg, 5.0) }
my $el = time() - $t0;
printf "Cross-process req_wait: %s req/s (%d iterations, %.1f ms)\n",
fmt_rate($N / $el), $N, $el * 1000;
waitpid $pid, 0;
$srv->unlink;
}
bench/bench_int.pl view on Meta::CPAN
my $N = $ARGV[0] || 200_000;
sub fmt_rate {
my $r = shift;
return sprintf("%.1fM", $r / 1e6) if $r >= 1e6;
return sprintf("%.1fK", $r / 1e3) if $r >= 1e3;
return sprintf("%.0f", $r);
}
print "ReqRep Int vs Str, $N iterations\n\n";
# --- Single-process Int ---
{
my $path = tmpnam();
my $srv = Data::ReqRep::Shared::Int->new($path, 1024, 64);
my $cli = Data::ReqRep::Shared::Int::Client->new($path);
for (1..1000) {
my $id = $cli->send(42);
my ($v, $ri) = $srv->recv;
bench/vs.pl view on Meta::CPAN
my $N = $ARGV[0] || 50_000;
my @sizes = (12, 1024);
sub fmt_rate {
my $r = shift;
return sprintf("%.1fM", $r / 1e6) if $r >= 1e6;
return sprintf("%.1fK", $r / 1e3) if $r >= 1e3;
return sprintf("%.0f", $r);
}
print "Cross-process echo round-trip, $N iterations\n\n";
# --- Data::ReqRep::Shared (Str) ---
for my $size (@sizes) {
my $msg = "x" x $size;
my $path = tmpnam();
my $resp_size = $size + 64;
require Data::ReqRep::Shared;
require Data::ReqRep::Shared::Client;
bench/vs.pl view on Meta::CPAN
printf " %-32s %8s req/s (%dB payload)\n",
"MCE::Channel (Simple)", fmt_rate($N / $el), $size;
}
}
# --- Forks::Queue (Shmem, two queues) ---
if (eval { require Forks::Queue; 1 }) {
print "\n";
for my $size (@sizes) {
my $msg = "x" x $size;
my $n = $N > 10_000 ? 10_000 : $N; # Forks::Queue is slow, cap iterations
my $req_q = Forks::Queue->new(impl => 'Shmem');
my $rep_q = Forks::Queue->new(impl => 'Shmem');
my $pid = fork // die "fork: $!";
if ($pid == 0) {
for (1..($n + 100)) {
my $buf = $req_q->dequeue;
last unless defined $buf;
$rep_q->enqueue($buf);
lib/Data/ReqRep/Shared.pm view on Meta::CPAN
=item C<arena> -- request data arena bytes (Str only, default
C<req_cap * 256>). Increase for large requests. Monitor
C<arena_used> in C<stats()>.
=back
=head2 Benchmarks
Linux x86_64. Run C<perl -Mblib bench/vs.pl 50000> to reproduce.
SINGLE-PROCESS ECHO (200K iterations)
ReqRep::Int (lock-free) 1.8M req/s
ReqRep::Str (12B, mutex) 1.2M req/s
ReqRep::Str batch (100x) 1.4M req/s
CROSS-PROCESS ECHO (50K iterations, 12B payload)
Pipe pair (1:1) 240K req/s
Unix socketpair (1:1) 222K req/s
ReqRep::Int 202K req/s *
ReqRep::Str 177K req/s *
IPC::Msg (SysV) 165K req/s
TCP loopback 115K req/s
MCE::Channel 96K req/s
Socketpair via broker 82K req/s
Forks::Queue (Shmem) 5K req/s
xt/concurrent_cancel.t view on Meta::CPAN
is $? >> 8, 0, "clear race: child $pid unblocked and got undef";
}
my $dt = time - $t0;
ok $dt < 2.0, sprintf("clear race: all children unblocked in %.3fs", $dt);
$srv->unlink;
}
# ============================================================
# 4. Rapid cancel/send on same slot: verify generation prevents ABA
# across many iterations with minimal slot count
# ============================================================
{
my $path = tmpnam();
my $srv = Data::ReqRep::Shared->new($path, 256, 1, 64); # 1 slot!
my $cli = Data::ReqRep::Shared::Client->new($path);
my $aba_detected = 0;
my $ok_count = 0;
for my $i (1..500) {
xt/concurrent_cancel.t view on Meta::CPAN
my ($rq2, $ri2) = $srv->recv;
my $r2 = $srv->reply($ri2, "good$i");
if ($r2) {
my $resp = $cli->get($id2);
$ok_count++ if defined $resp && $resp eq "good$i";
}
}
ok $aba_detected > 0, "ABA rapid: generation prevented $aba_detected stale replies";
ok $ok_count > 0, "ABA rapid: $ok_count correct round-trips";
diag "aba_detected=$aba_detected ok_count=$ok_count out of 500 iterations";
$srv->unlink;
}
done_testing;
( run in 1.898 second using v1.01-cache-2.11-cpan-71847e10f99 )