EV-Websockets

 view release on metacpan or  search on metacpan

eg/backpressure.pl  view on Meta::CPAN

#!/usr/bin/env perl
use strict;
use warnings;
use if -d 'blib', lib => 'blib/lib', 'blib/arch';

use EV;
use EV::Websockets;
use Time::HiRes qw(time);

$| = 1;

my $CHUNK_SIZE  = 64 * 1024;
my $QUEUE_LIMIT = 256 * 1024;
my $TARGET_MSGS = 500;

my $ctx = EV::Websockets::Context->new();
my ($total_sent, $total_recv, $t0) = (0, 0, 0);

sub try_send {
    my ($c) = @_;
    while ($c->stash->{sent} < $TARGET_MSGS && $c->send_queue_size < $QUEUE_LIMIT) {
        $c->send("x" x $CHUNK_SIZE);
        $c->stash->{sent}++;
        $total_sent += $CHUNK_SIZE;
    }
}

my $port = $ctx->listen(
    port => 0,
    on_connect => sub {
        my ($c) = @_;
        print "Server: client connected, streaming $TARGET_MSGS chunks of ${\ ($CHUNK_SIZE/1024)}KB\n";
        $c->stash->{sent} = 0;
        $t0 = time;
        try_send($c);
    },
    on_drain => sub {
        my ($c) = @_;
        try_send($c);
    },
    on_close => sub {
        print "Server: client disconnected\n";
    },
    on_error => sub {
        my ($c, $err) = @_;
        warn "Server error: $err\n";
    },
    on_message => sub {},
);

print "Backpressure demo: server on port $port, connecting client...\n";

my $client = $ctx->connect(
    url => "ws://127.0.0.1:$port",
    on_connect => sub {
        print "Client: connected\n";
    },
    on_message => sub {
        my ($c, $data) = @_;
        $total_recv += length $data;
        my $msgs = ++$c->stash->{count};
        if ($msgs % 100 == 0) {
            printf "Client: %d/%d msgs, queue=%d bytes\n",
                $msgs, $TARGET_MSGS, $c->send_queue_size;
        }
        if ($msgs >= $TARGET_MSGS) {
            my $elapsed = time - $t0;
            printf "Done: %d msgs, %.1f MB sent in %.2fs (%.1f MB/s)\n",
                $msgs, $total_sent / 1e6, $elapsed,
                $elapsed > 0 ? $total_sent / 1e6 / $elapsed : 0;
            $c->close(1000, "complete");
        }
    },
    on_close => sub {
        print "Client: closed\n";
        EV::break;
    },
    on_error => sub {
        my ($c, $err) = @_;
        warn "Client error: $err\n";
        EV::break;
    },
);

EV::run;



( run in 0.532 second using v1.01-cache-2.11-cpan-140bd7fdf52 )