AnyMQ
view release on metacpan or search on metacpan
lib/AnyMQ/Queue.pm view on Meta::CPAN
package AnyMQ::Queue;
use strict;
use Any::Moose;
use AnyEvent;
use Try::Tiny;
use Scalar::Util qw(weaken refaddr);
use Time::HiRes;
use constant DEBUG => 0;
has id => (is => 'rw', isa => 'Str');
has persistent => (is => "rw", isa => "Bool", default => sub { 0 });
has buffer => (is => "ro", isa => "ArrayRef", default => sub { [] });
has cv => (is => "rw", isa => "AnyEvent::CondVar", default => sub { AE::cv });
has destroyed => (is => "rw", isa => "Bool", default => sub { 0 });
has on_timeout => (is => "rw");
has on_error => (is => "rw");
lib/AnyMQ/Queue.pm view on Meta::CPAN
}
}
sub _reaper {
my ($self, $timeout) = @_;
AnyEvent->timer(
after => $timeout || $self->timeout,
cb => sub {
weaken $self;
warn "Timing out $self long-poll" if DEBUG;
if ($self->on_timeout) {
$self->on_timeout->($self, "timeout")
}
else {
$self->destroyed(1);
}
});
}
lib/AnyMQ/Queue.pm view on Meta::CPAN
my($self, $cb, $timeout) = @_;
warn "already polled by another client" if $self->persistent;
$self->{cv}->cb(sub { $cb->($_[0]->recv) });
# reset garbage collection timeout with the long-poll timeout
# $timeout = 0 is a valid timeout for interval-polling
$timeout = 55 unless defined $timeout;
$self->{timer} = AE::timer $timeout || 55, 0, sub {
weaken $self;
warn "long-poll timeout, flush and wait for client reconnect" if DEBUG;
$self->_flush;
};
weaken $self->{timer};
# flush buffer for a long-poll client
$self->_flush( @{ $self->{buffer} })
if @{ $self->{buffer} };
}
sub poll {
my ($self, $cb) = @_;
$self->cv->cb(sub { $cb->($_[0]->recv) });
$self->persistent(1);
( run in 0.288 second using v1.01-cache-2.11-cpan-a9ef4e587e4 )