AnyMQ

 view release on metacpan or  search on metacpan

lib/AnyMQ/Queue.pm  view on Meta::CPAN

package AnyMQ::Queue;
use strict;
use Any::Moose;
use AnyEvent;
use Try::Tiny;
use Scalar::Util qw(weaken refaddr);
use Time::HiRes;
use constant DEBUG => 0;

has id => (is => 'rw', isa => 'Str');
has persistent => (is => "rw", isa => "Bool", default => sub { 0 });
has buffer => (is => "ro", isa => "ArrayRef", default => sub { [] });
has cv => (is => "rw", isa => "AnyEvent::CondVar", default => sub { AE::cv });
has destroyed => (is => "rw", isa => "Bool", default => sub { 0 });
has on_timeout => (is => "rw");
has on_error  => (is => "rw");

lib/AnyMQ/Queue.pm  view on Meta::CPAN

    }

}

sub _reaper {
    my ($self, $timeout) = @_;

    AnyEvent->timer(
        after => $timeout || $self->timeout,
        cb => sub {
            weaken $self;
            warn "Timing out $self long-poll" if DEBUG;
            if ($self->on_timeout) {
                $self->on_timeout->($self, "timeout")
            }
            else {
                $self->destroyed(1);
            }
        });
}

lib/AnyMQ/Queue.pm  view on Meta::CPAN

    my($self, $cb, $timeout) = @_;

    warn "already polled by another client" if $self->persistent;

    $self->{cv}->cb(sub { $cb->($_[0]->recv) });

    # reset garbage collection timeout with the long-poll timeout
    # $timeout = 0 is a valid timeout for interval-polling
    $timeout = 55 unless defined $timeout;
    $self->{timer} = AE::timer $timeout || 55, 0, sub {
        weaken $self;
        warn "long-poll timeout, flush and wait for client reconnect" if DEBUG;
        $self->_flush;
    };
    weaken $self->{timer};

    # flush buffer for a long-poll client
    $self->_flush( @{ $self->{buffer} })
        if @{ $self->{buffer} };
}

sub poll {
    my ($self, $cb) = @_;
    $self->cv->cb(sub { $cb->($_[0]->recv) });
    $self->persistent(1);



( run in 0.288 second using v1.01-cache-2.11-cpan-a9ef4e587e4 )