AnyMQ
view release on metacpan or search on metacpan
lib/AnyMQ/Queue.pm view on Meta::CPAN
package AnyMQ::Queue;
use strict;
use Any::Moose;
use AnyEvent;
use Try::Tiny;
use Scalar::Util qw(weaken refaddr);
use Time::HiRes;
use constant DEBUG => 0;
has id => (is => 'rw', isa => 'Str');
has persistent => (is => "rw", isa => "Bool", default => sub { 0 });
has buffer => (is => "ro", isa => "ArrayRef", default => sub { [] });
has cv => (is => "rw", isa => "AnyEvent::CondVar", default => sub { AE::cv });
has destroyed => (is => "rw", isa => "Bool", default => sub { 0 });
has on_timeout => (is => "rw");
has on_error => (is => "rw");
has timeout => (is => "rw", isa => "Int", default => sub { 55 });
sub BUILD {
my $self = shift;
$self->id('AnyMQ-'.refaddr($self)) unless $self->id;
}
sub append {
my($self, @messages) = @_;
if ($self->{cv}->cb) {
# currently listening: flush and send the events right away
$self->_flush(@messages);
} else {
# between long poll comet: buffer the events
push @{$self->{buffer}}, @messages;
}
}
sub subscribe {
my ($self, $topic) = @_;
Carp::cluck unless $topic;
$topic->add_subscriber($self);
}
sub _flush {
my ($self, @messages) = @_;
my $cb = $self->{cv}->cb;
my $cv = $self->{cv};
$self->{cv} = AE::cv;
$self->{buffer} = [];
try {
$cv->send(@messages);
} catch {
if ($self->on_error) {
$self->on_error->($self, $_, @messages);
}
else {
$self->destroyed(1);
}
};
return if $self->destroyed;
if ($self->{persistent}) {
$self->{cv}->cb($cb);
$self->_flush( @{ $self->{buffer} })
( run in 1.259 second using v1.01-cache-2.11-cpan-39bf76dae61 )