DR-TarantoolQueue

 view release on metacpan or  search on metacpan

t/020-tnt-msgpack/030-stress.t  view on Meta::CPAN

#!/usr/bin/perl

use warnings;
use strict;
use utf8;
use open qw(:std :utf8);
use lib qw(lib ../lib);

use constant PLAN   => 37;
use Test::More;
use Encode qw(decode encode);
use feature 'state';

sub tube_name() {
    state $no = 1;
    sprintf 'test_tube_%02X', $no++;
}


BEGIN {
    unless (eval 'require DR::Tnt') {
        plan skip_all => 'DR::Tnt is not installed';
    }
    plan tests    => PLAN;
    use_ok 'DR::TarantoolQueue';
    use_ok 'DR::Tnt::Test';
    use_ok 'DR::TarantoolQueue::Worker';
    use_ok 'Coro';
    use_ok 'Coro::Channel';
    use_ok 'Coro::AnyEvent';
    tarantool_version_check(1.6);
}


my $t = start_tarantool
    -port   => free_port,
    -lua    => 't/020-tnt-msgpack/lua/queue.lua',
;

diag $t->log unless ok $t->is_started, 'Queue was started';


my @workers;
my %res;
my $ch = Coro::Channel->new;
my $cht = Coro::Channel->new;

for my $i (0 .. 99) {
    my $q = DR::TarantoolQueue->new(
        host        => '127.0.0.1',
        port        => $t->port,
        user        => 'test',
        password    => 'test',
        msgpack     => 1,
        coro        => 1,

        tube        => 'test_tube',

        ttl         => 60,

        defaults    => {
            test_tube   => {
                ttl         => 80
            }
        },

        fake_in_test    => 0,
    );

    $q->tnt->ping;


    
    push @workers => DR::TarantoolQueue::Worker->new(
        queue   => $q,
        timeout => rand 15,
    );


    async {
        $workers[$i]->run(sub {
            my ($task) = @_;
            my $no = $task->data->[0];
            $res{ $no }++;
            return $task->release(delay => rand 0.02) if 20 > rand 100;
            $task->ack;
            $cht->put($no);
        });
    };
}

use constant BLOCK => 1000;
for (1 .. 10){
    my $started = AnyEvent::now();
    for (1 .. BLOCK) {
        async {
            my ($no) = @_;
            my $q = $workers[ int rand @workers ];
            $q->queue->[0]->put(data => [ $no ]);
        } $_;
    }

    my $done = 0;
    while ($cht->get) {
        next unless ++$done >= BLOCK;
        last;
    }

    my $time = AnyEvent::now() - $started;

    is scalar(keys %res), BLOCK, 'count of tasks';
    
    my $rs = 0;
    $rs += $_ for values %res;



( run in 1.084 second using v1.01-cache-2.11-cpan-39bf76dae61 )