Data-ReqRep-Shared
view release on metacpan or search on metacpan
bench/vs.pl view on Meta::CPAN
#!/usr/bin/env perl
# Benchmark: Data::ReqRep::Shared vs other IPC mechanisms
# Cross-process echo round-trip throughput
use strict;
use warnings;
use Time::HiRes qw(time);
use File::Temp 'tmpnam';
use Socket;
use IO::Socket::INET;
use POSIX ':sys_wait_h';
my $N = $ARGV[0] || 50_000;
my @sizes = (12, 1024);
sub fmt_rate {
my $r = shift;
return sprintf("%.1fM", $r / 1e6) if $r >= 1e6;
return sprintf("%.1fK", $r / 1e3) if $r >= 1e3;
return sprintf("%.0f", $r);
}
print "Cross-process echo round-trip, $N iterations\n\n";
# --- Data::ReqRep::Shared (Str) ---
for my $size (@sizes) {
my $msg = "x" x $size;
my $path = tmpnam();
my $resp_size = $size + 64;
require Data::ReqRep::Shared;
require Data::ReqRep::Shared::Client;
my $srv = Data::ReqRep::Shared->new($path, 1024, 64, $resp_size);
my $pid = fork // die "fork: $!";
if ($pid == 0) {
for (1..($N + 500)) {
my ($r, $ri) = $srv->recv_wait(10.0);
last unless defined $r;
$srv->reply($ri, $r);
}
exit 0;
}
my $cli = Data::ReqRep::Shared::Client->new($path);
$cli->req($msg) for 1..500; # warmup
my $t0 = time();
$cli->req($msg) for 1..$N;
my $el = time() - $t0;
waitpid $pid, 0;
printf " %-32s %8s req/s (%dB payload)\n",
"Data::ReqRep::Shared", fmt_rate($N / $el), $size;
$srv->unlink;
}
# --- Data::ReqRep::Shared::Int (lock-free) ---
{
require Data::ReqRep::Shared::Int;
require Data::ReqRep::Shared::Int::Client;
my $path = tmpnam();
my $srv = Data::ReqRep::Shared::Int->new($path, 1024, 64);
my $pid = fork // die "fork: $!";
if ($pid == 0) {
for (1..($N + 1000)) {
my ($v, $ri) = $srv->recv_wait(10.0);
last unless defined $v;
$srv->reply($ri, $v);
}
exit 0;
}
my $cli = Data::ReqRep::Shared::Int::Client->new($path);
$cli->req(42) for 1..1000;
my $t0 = time();
$cli->req(42) for 1..$N;
my $el = time() - $t0;
bench/vs.pl view on Meta::CPAN
}
for (1..500) {
$req_q->snd(1, $msg);
my $buf;
$rep_q->rcv($buf, $len + 64, 0);
}
my $t0 = time();
for (1..$N) {
$req_q->snd(1, $msg);
my $buf;
$rep_q->rcv($buf, $len + 64, 0);
}
my $el = time() - $t0;
waitpid $pid, 0;
$req_q->remove;
$rep_q->remove;
printf " %-32s %8s req/s (%dB payload)\n",
"IPC::Msg (SysV)", fmt_rate($N / $el), $size;
}
}
# --- MCE::Channel (req/rep via send/recv pair) ---
if (eval { require MCE::Channel; 1 }) {
print "\n";
for my $size (@sizes) {
my $msg = "x" x $size;
my $req_ch = MCE::Channel->new(impl => 'Simple');
my $rep_ch = MCE::Channel->new(impl => 'Simple');
my $pid = fork // die "fork: $!";
if ($pid == 0) {
for (1..($N + 500)) {
my $buf = $req_ch->recv;
last unless defined $buf;
$rep_ch->send($buf);
}
exit 0;
}
for (1..500) { $req_ch->send($msg); $rep_ch->recv }
my $t0 = time();
for (1..$N) { $req_ch->send($msg); $rep_ch->recv }
my $el = time() - $t0;
waitpid $pid, 0;
printf " %-32s %8s req/s (%dB payload)\n",
"MCE::Channel (Simple)", fmt_rate($N / $el), $size;
}
}
# --- Forks::Queue (Shmem, two queues) ---
if (eval { require Forks::Queue; 1 }) {
print "\n";
for my $size (@sizes) {
my $msg = "x" x $size;
my $n = $N > 10_000 ? 10_000 : $N; # Forks::Queue is slow, cap iterations
my $req_q = Forks::Queue->new(impl => 'Shmem');
my $rep_q = Forks::Queue->new(impl => 'Shmem');
my $pid = fork // die "fork: $!";
if ($pid == 0) {
for (1..($n + 100)) {
my $buf = $req_q->dequeue;
last unless defined $buf;
$rep_q->enqueue($buf);
}
exit 0;
}
for (1..100) { $req_q->enqueue($msg); $rep_q->dequeue }
my $t0 = time();
for (1..$n) { $req_q->enqueue($msg); $rep_q->dequeue }
my $el = time() - $t0;
waitpid $pid, 0;
printf " %-32s %8s req/s (%dB payload, %d iters)\n",
"Forks::Queue (Shmem)", fmt_rate($n / $el), $size, $n;
}
}
print "\nDone.\n";
( run in 1.128 second using v1.01-cache-2.11-cpan-71847e10f99 )