DR-TarantoolQueue
view release on metacpan or search on metacpan
t/020-tnt-msgpack/500-worker.t view on Meta::CPAN
use lib qw(lib ../lib);
use constant PLAN => 27;
use Test::More;
use Encode qw(decode encode);
use feature 'state';
sub tube_name() {
state $no = 1;
$no++;
sprintf 'test_tube_%02x', $no;
}
BEGIN {
unless (eval 'require DR::Tnt') {
plan skip_all => 'DR::Tnt is not installed';
}
plan tests => PLAN;
use_ok 'DR::TarantoolQueue';
use_ok 'DR::Tnt::Test';
tarantool_version_check(1.6);
}
use Encode qw(decode encode);
use Cwd 'cwd';
use File::Spec::Functions 'catfile';
#use feature 'state';
BEGIN {
# ÐодгоÑовка обÑекÑа ÑеÑÑиÑÐ¾Ð²Ð°Ð½Ð¸Ñ Ð´Ð»Ñ ÑабоÑÑ Ñ utf8
my $builder = Test::More->builder;
binmode $builder->output, ":utf8";
binmode $builder->failure_output, ":utf8";
binmode $builder->todo_output, ":utf8";
use_ok 'DR::TarantoolQueue';
use_ok 'Coro';
use_ok 'Time::HiRes', 'time';
use_ok 'Coro::AnyEvent';
use_ok 'DR::TarantoolQueue::Worker';
}
my $t = start_tarantool
-port => free_port,
-lua => 't/020-tnt-msgpack/lua/queue.lua',
;
diag $t->log unless ok $t->is_started, 'Queue was started';
my $q = DR::TarantoolQueue->new(
host => '127.0.0.1',
port => $t->port,
user => 'test',
password => 'test',
msgpack => 1,
tube => 'test_queue',
coro => 1,
fake_in_test => 0,
);
isa_ok $q => 'DR::TarantoolQueue';
my $wrk = DR::TarantoolQueue::Worker->new(
queue => $q,
timeout => .5,
count => 10
);
note 'autoack tests';
async {
$wrk->run(sub {});
};
my @tasks;
for (1 .. 31) {
push @tasks => $q->put(data => $_);
}
Coro::AnyEvent::sleep 0.2;
for (@tasks) {
$_ = eval { $q->peek(id => $_->id) };
}
is scalar grep({ !defined($_) } @tasks), scalar @tasks, 'All tasks were ack';
is $wrk->stop, 0, 'workers were stopped';
note 'release tests';
@tasks = ();
async {
$wrk->run(sub {
my ($task) = @_;
ok $task->release(delay => 200), 'task was released ' . $task->id;
});
};
@tasks = ();
for (1 .. 5) {
push @tasks => $q->put(data => {task => $_});
}
Coro::AnyEvent::sleep .2;
for (@tasks) {
is $_->peek->status, 'delayed', 'task was released ' . $_->id;
};
is $wrk->stop, 0, 'workers were stopped';
note 'autobury';
async {
$wrk->run(sub { die 123 });
};
( run in 0.611 second using v1.01-cache-2.11-cpan-39bf76dae61 )