DR-TarantoolQueue

 view release on metacpan or  search on metacpan

t/010-tarantool-1.5/010-dr-tqueue.t  view on Meta::CPAN

use utf8;
use open qw(:std :utf8);
use lib qw(lib ../lib);

use Test::More;
use constant PLAN => 82;

BEGIN {
    system 'which tarantool_box >/dev/null 2>&1';
    if ($? == 0) {
        if (eval "require DR::Tarantool; 1") {
            plan tests    => PLAN;
        } else {
            plan skip_all => 'DR::Tarantool is not installed';
        }
    } else {
        plan skip_all => 'tarantool_box not found';
    }
}

use Encode qw(decode encode);
use Cwd 'cwd';
use File::Spec::Functions 'catfile';
# use feature 'state';



BEGIN {
    # Подготовка объекта тестирования для работы с utf8
    my $builder = Test::More->builder;
    binmode $builder->output,         ":utf8";
    binmode $builder->failure_output, ":utf8";
    binmode $builder->todo_output,    ":utf8";

    use_ok 'DR::TarantoolQueue';
    use_ok 'Coro';
    use_ok 'DR::Tarantool', ':all';
    use_ok 'DR::Tarantool::StartTest';
    use_ok 'Time::HiRes', 'time';
    use_ok 'Coro::AnyEvent';
}
my $t = DR::Tarantool::StartTest->run(
    cfg         => catfile(cwd, 'queue-lts/tarantool.cfg'),
    script_dir  => catfile(cwd, 'queue-lts/queue')
);


my $q = DR::TarantoolQueue->new(
    host            => '127.0.0.1',
    port            => $t->primary_port,
    space           => 0,
    tube            => 'test_queue',
    fake_in_test    => 0,
);

my $qs = DR::TarantoolQueue->new(
    host            => '127.0.0.1',
    port            => $t->primary_port,
    space           => 0,
    tube            => 'test_queue',
    coro            => 0,
    fake_in_test    => 0,
);

isa_ok $q => 'DR::TarantoolQueue';

my (@f, @fh);
push @f => async {
    ok $q->tnt->ping, 'tnt ping';
    push @fh => fileno($q->tnt->_llc->{fh})
} for 1 .. 5;
$_->join for @f;
@f = ();

if (DR::Tarantool->can('rsync_tarantool')) {
    isa_ok $qs->tnt => 'DR::Tarantool::RealSyncClient';
} else {
    isa_ok $qs->tnt => 'DR::Tarantool::SyncClient';
}

ok 5 == grep({ $_ == $fh[0] } @fh), 'connection established once';
ok $qs->tnt->ping, 'ping by sync client';

for ('put', 'urgent') {
    my $task1 = $q->$_;
    is_deeply $task1->data, undef, "$_()";
    like $task1->id, qr[^[0-9a-fA-F]{32}$], 'task1.id';
    my $task2 = $q->$_(data => { 1 => 2 });
    like $task2->id, qr[^[0-9a-fA-F]{32}$], 'task2.id';
    is_deeply $task2->data, { 1 => 2 }, "$_(data => hashref)";
    my $task3 = $q->$_(data => [ 3, 4, 'привет' ]);
    like $task3->id, qr[^[0-9a-fA-F]{32}$], 'task3.id';
    is_deeply $task3->data, [ 3, 4, 'привет' ], "$_(data => arrayref)";
    
}


my $task1_t = $q->take;
isa_ok $task1_t => 'DR::TarantoolQueue::Task';
my $task2_t = $q->take;
isa_ok $task2_t => 'DR::TarantoolQueue::Task';
my $task3_t = $q->take;
isa_ok $task3_t => 'DR::TarantoolQueue::Task';


isnt $task1_t->id, $task2_t->id, "task1 and task2 aren't the same";
isnt $task1_t->id, $task3_t->id, "task1 and task3 aren't the same";


is $task1_t->status, 'taken', 'task is taken';
isa_ok $task1_t->ack => 'DR::TarantoolQueue::Task', 'task1.ack';
is $task1_t->status, 'ack(removed)', 'task is ack';
isa_ok $q->ack(id => $task2_t->id), 'DR::TarantoolQueue::Task', 'task2.ack';

my $meta = $task3_t->get_meta;
isa_ok $meta => 'HASH', 'task3.meta.meta';
is $meta->{status}, 'taken', 'task3.meta.status';
is $meta->{ctaken}, 1, 'task3.meta.ctaken';
is $meta->{cbury}, 0, 'task3.meta.cbury';
is $meta->{tube}, 'test_queue', 'task3.meta.tube';
is $meta->{status}, $task3_t->status, 'task3.status';

t/010-tarantool-1.5/010-dr-tqueue.t  view on Meta::CPAN

is $task2->status, 'delayed', 'task2 released as delayed';
is $task3_t->status, 'ready', 'task3 released as ready';
is $task3->status, 'ready', 'task3 released as ready';

cmp_ok $task2->get_meta->{ttl}, '<', $meta->{ttl}, 'release updated ttl';
cmp_ok $task2->get_meta->{ttl}, '>=', (30+20) * 1_000_000,
    'ttl is more than 50s';
cmp_ok $task2->get_meta->{ttl}, '<', (30+30) * 1_000_000,
    'ttl is less than 60s';


$task1 = $q->take;
isa_ok $task1 => 'DR::TarantoolQueue::Task';
is $task1->status, 'taken', 'task1 is taken';
$task1_t = $task1->done(data => {'превед', 'медвед'});
is $task1->status, 'done', 'task1 is done';
is_deeply $task1->data, { 'превед', 'медвед' }, 'task1 is done';
isa_ok $task1_t => 'DR::TarantoolQueue::Task';
is $task1_t->status, 'done', 'task is done';
is_deeply $task1_t->data, { 'превед', 'медвед' }, 'task.data';


my $task4 = $q->put(tube    => 'utftube', data    => [ 3, 4, 'привет' ]);
like $task4->id, qr[^[0-9a-fA-F]{32}$], 'task3.id';
is_deeply $task4->data, [ 3, 4, 'привет' ],
    "put(data => arrayref)";

my $task5 =
    $q->urgent(tube => 'utftube', data => [ 3, 4, encode utf8 => 'медвед' ]);
like $task5->id, qr[^[0-9a-fA-F]{32}$], 'task3.id';
is_deeply $task5->data, [ 3, 4, encode utf8 => 'медвед' ],
    "urgent(data => arrayref)";

my $task5_t = $q->take(tube => 'utftube');
my $task4_t = $q->take(tube => 'utftube');

is_deeply $task4->data, $task4_t->data, 'Task and decoded utf data';
is_deeply $task5->data, $task5_t->data, 'Task and encoded utf data';

SKIP: {
    my $task_unique1 = eval {
        $q->put_unique(tube => 'utftube_unique',
            data => [ 3, 4, 'привет' ]);
    };
    skip 'tarantool is not configured for put_unique', 1
        if !$task_unique1 and $@ =~ /put_unique/;
    my $task_unique2 = $q->put_unique(tube => 'utftube_unique',
        data    => [ 3, 4, 'привет' ]);

    is_deeply $task_unique1->id, $task_unique2->id,
        'Unique tasks putting has equal ids';

}
{
    use Scalar::Util 'refaddr';
    $q = DR::TarantoolQueue->new(
        host    => '127.0.0.1',
        port    => $t->primary_port,
        space   => 0,
        tube    => 'test_queue',
        coro    => 1
    );
    ok $q, 'queue instance is created';
    is $q->{tnt}, undef, 'connection is not created yet';

    my (@clist, @f);
    for (1 .. 100) {
        push @f => async { $q->put(data => $_) };
        push @clist => $q->{tnt};
    }
    $_->join for @f;
    is_deeply [ map { refaddr $_ } @clist ],
        [ map { refaddr $clist[0] } 1 .. 100 ],
        'Only one connection established';


}
END {
    note $t->log if $ENV{DEBUG};
}



( run in 2.080 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )