DR-TarantoolQueue

 view release on metacpan or  search on metacpan

lib/DR/TarantoolQueue.pm  view on Meta::CPAN

package DR::TarantoolQueue;
use utf8;
use strict;
use warnings;
use Mouse;
use Carp;
use JSON::XS;
require DR::TarantoolQueue::Task;
$Carp::Internal{ (__PACKAGE__) }++;

our $VERSION = '0.44';
use feature 'state';

=head1 NAME

DR::TarantoolQueue - client for tarantool's queue


=head1 SYNOPSIS

    my $queue = DR::TarantoolQueue->new(
        host    => 'tarantool.host',
        port    => 33014,
        tube    => 'request_queue',
        space   => 11,

        connect_opts => {   # see perldoc DR::Tarantool
            reconnect_period    => 1,
            reconnect_always    => 1
        }
    );


    # put empty task into queue with name 'request_queue'
    my $task = $queue->put;

    my $task = $queue->put(data => [ 1, 2, 3 ]);

    printf "task.id = %s\n", $task->id;

=head2 DESCRIPTION

The module contains sync and async (coro) driver for tarantool queue.

=head1 ATTRIBUTES

=head2 host (ro) & port (ro)

Tarantool's parameters.

=head2 connect_opts (ro)

Additional options for L<DR::Tarantool>. HashRef.

=head2 fake_in_test (ro, default=true)

Start fake tarantool (only for msgpack) if C<($0 =~ /\.t$/)>.

For the case the driver uses the following lua code:
    
    log.info('Fake Queue starting')
    
    box.cfg{ listen  = os.getenv('PRIMARY_PORT') }
    
    box.schema.user.create('test', { password = 'test' })
    box.schema.user.grant('test', 'read,write,execute', 'universe')
    
    _G.queue = require('megaqueue')
    queue:init()
    
    log.info('Fake Queue started')

=head2 msgpack (ro)

If true, the driver will use L<DR::Tnt> driver (C<1.6>). Also it will use
L<tarantool-megaqueue|https://github.com/dr-co/tarantool-megaqueue> lua
module with namespace C<queue>.

=head2 coro (ro)

If B<true> (default) the driver will use L<Coro> tarantool's driver,
otherwise the driver will use sync driver.

=head2 ttl (rw)

Default B<ttl> for tasks.

=head2 ttr (rw)

Default B<ttr> for tasks.

=head2 pri (rw)

Default B<pri> for tasks.

=head2 delay (rw)

Default B<delay> for tasks.

=head2 space (rw)

Default B<space> for tasks.

=head2 tube (rw)

Default B<tube> for tasks.


=head2 defaults

Defaults for queues. B<HashRef>. Key is tube name. Value is a hash with
the following fields:

=over

=item ttl

=item ttr

=item delay

=item pri

=back

Methods L</put> (L</urgent>) use these parameters if they
are absent (otherwise it uses the same global attributes).

=cut

with 'DR::TarantoolQueue::JSE';

has host            => is => 'ro', isa => 'Maybe[Str]';
has port            => is => 'ro', isa => 'Maybe[Str]';
has user            => is => 'ro', isa => 'Maybe[Str]';
has password        => is => 'ro', isa => 'Maybe[Str]';

has coro            => is => 'ro', isa => 'Bool',  default  => 1;

has ttl             => is => 'rw', isa => 'Maybe[Num]';
has ttr             => is => 'rw', isa => 'Maybe[Num]';
has pri             => is => 'rw', isa => 'Maybe[Num]';
has delay           => is => 'rw', isa => 'Maybe[Num]';
has space           => is => 'rw', isa => 'Maybe[Str]';
has tube            => is => 'rw', isa => 'Maybe[Str]';
has connect_opts    => is => 'ro', isa => 'HashRef', default => sub {{}};

has defaults        => is => 'ro', isa => 'HashRef', default => sub {{}};
has msgpack         => is => 'ro', isa => 'Bool', default => 0;

# если $0 =~ /\.t$/ то будет запускать фейковый тарантул
has fake_in_test    => is => 'ro', isa => 'Bool', default => 1;


sub _check_opts($@) {
    my $h = shift;
    my %can = map { ($_ => 1) } @_;

    for (keys %$h) {
        next if $can{$_};
        croak 'unknown option: ' . $_;
    }
}

sub _producer_messagepack {
    my ($self, $method, $o) = @_;

    state $alias = {
        urgent  => 'put',
    };

    $method = $alias->{$method} if exists $alias->{$method};

    _check_opts $o, qw(space tube delay ttl ttr pri data domain);
    
    my $tube = $o->{tube};
    $tube  = $self->tube unless defined $tube;
    croak 'tube was not defined' unless defined $tube;

    for ('ttl', 'delay', 'ttr', 'pri') {
        my $n = $_;

        my $res;

        if (exists $o->{$n}) {
            $res = $o->{$n};
        } else {
            if (exists $self->defaults->{ $tube }) {
                if (exists $self->defaults->{ $tube }{ $n }) {
                    $res = $self->defaults->{ $tube }{ $n };
                } else {
                    $res = $self->$n;
                }
            } else {
                $res = $self->$n;
            }
        }
        $res ||= 0;



( run in 0.761 second using v1.01-cache-2.11-cpan-39bf76dae61 )