Data-ReqRep-Shared

 view release on metacpan or  search on metacpan

bench/vs.pl  view on Meta::CPAN

#!/usr/bin/env perl
# Benchmark: Data::ReqRep::Shared vs other IPC mechanisms
# Cross-process echo round-trip throughput
use strict;
use warnings;
use Time::HiRes qw(time);
use File::Temp 'tmpnam';
use Socket;
use IO::Socket::INET;
use POSIX ':sys_wait_h';

my $N = $ARGV[0] || 50_000;
my @sizes = (12, 1024);

sub fmt_rate {
    my $r = shift;
    return sprintf("%.1fM", $r / 1e6) if $r >= 1e6;
    return sprintf("%.1fK", $r / 1e3) if $r >= 1e3;
    return sprintf("%.0f", $r);
}

print "Cross-process echo round-trip, $N iterations\n\n";

# --- Data::ReqRep::Shared (Str) ---
for my $size (@sizes) {
    my $msg = "x" x $size;
    my $path = tmpnam();
    my $resp_size = $size + 64;

    require Data::ReqRep::Shared;
    require Data::ReqRep::Shared::Client;

    my $srv = Data::ReqRep::Shared->new($path, 1024, 64, $resp_size);

    my $pid = fork // die "fork: $!";
    if ($pid == 0) {
        for (1..($N + 500)) {
            my ($r, $ri) = $srv->recv_wait(10.0);
            last unless defined $r;
            $srv->reply($ri, $r);
        }
        exit 0;
    }

    my $cli = Data::ReqRep::Shared::Client->new($path);
    $cli->req($msg) for 1..500;  # warmup

    my $t0 = time();
    $cli->req($msg) for 1..$N;
    my $el = time() - $t0;

    waitpid $pid, 0;
    printf "  %-32s %8s req/s  (%dB payload)\n",
        "Data::ReqRep::Shared", fmt_rate($N / $el), $size;
    $srv->unlink;
}

# --- Data::ReqRep::Shared::Int (lock-free) ---
{
    require Data::ReqRep::Shared::Int;
    require Data::ReqRep::Shared::Int::Client;

    my $path = tmpnam();
    my $srv = Data::ReqRep::Shared::Int->new($path, 1024, 64);

    my $pid = fork // die "fork: $!";
    if ($pid == 0) {
        for (1..($N + 1000)) {
            my ($v, $ri) = $srv->recv_wait(10.0);
            last unless defined $v;
            $srv->reply($ri, $v);
        }
        exit 0;
    }

    my $cli = Data::ReqRep::Shared::Int::Client->new($path);
    $cli->req(42) for 1..1000;

    my $t0 = time();
    $cli->req(42) for 1..$N;
    my $el = time() - $t0;

bench/vs.pl  view on Meta::CPAN

        }

        for (1..500) {
            $req_q->snd(1, $msg);
            my $buf;
            $rep_q->rcv($buf, $len + 64, 0);
        }

        my $t0 = time();
        for (1..$N) {
            $req_q->snd(1, $msg);
            my $buf;
            $rep_q->rcv($buf, $len + 64, 0);
        }
        my $el = time() - $t0;

        waitpid $pid, 0;
        $req_q->remove;
        $rep_q->remove;
        printf "  %-32s %8s req/s  (%dB payload)\n",
            "IPC::Msg (SysV)", fmt_rate($N / $el), $size;
    }
}

# --- MCE::Channel (req/rep via send/recv pair) ---
if (eval { require MCE::Channel; 1 }) {
    print "\n";
    for my $size (@sizes) {
        my $msg = "x" x $size;

        my $req_ch = MCE::Channel->new(impl => 'Simple');
        my $rep_ch = MCE::Channel->new(impl => 'Simple');

        my $pid = fork // die "fork: $!";
        if ($pid == 0) {
            for (1..($N + 500)) {
                my $buf = $req_ch->recv;
                last unless defined $buf;
                $rep_ch->send($buf);
            }
            exit 0;
        }

        for (1..500) { $req_ch->send($msg); $rep_ch->recv }

        my $t0 = time();
        for (1..$N) { $req_ch->send($msg); $rep_ch->recv }
        my $el = time() - $t0;

        waitpid $pid, 0;
        printf "  %-32s %8s req/s  (%dB payload)\n",
            "MCE::Channel (Simple)", fmt_rate($N / $el), $size;
    }
}

# --- Forks::Queue (Shmem, two queues) ---
if (eval { require Forks::Queue; 1 }) {
    print "\n";
    for my $size (@sizes) {
        my $msg = "x" x $size;
        my $n = $N > 10_000 ? 10_000 : $N;  # Forks::Queue is slow, cap iterations

        my $req_q = Forks::Queue->new(impl => 'Shmem');
        my $rep_q = Forks::Queue->new(impl => 'Shmem');

        my $pid = fork // die "fork: $!";
        if ($pid == 0) {
            for (1..($n + 100)) {
                my $buf = $req_q->dequeue;
                last unless defined $buf;
                $rep_q->enqueue($buf);
            }
            exit 0;
        }

        for (1..100) { $req_q->enqueue($msg); $rep_q->dequeue }

        my $t0 = time();
        for (1..$n) { $req_q->enqueue($msg); $rep_q->dequeue }
        my $el = time() - $t0;

        waitpid $pid, 0;
        printf "  %-32s %8s req/s  (%dB payload, %d iters)\n",
            "Forks::Queue (Shmem)", fmt_rate($n / $el), $size, $n;
    }
}

print "\nDone.\n";



( run in 1.128 second using v1.01-cache-2.11-cpan-71847e10f99 )