AnyMQ

 view release on metacpan or  search on metacpan

lib/AnyMQ/Queue.pm  view on Meta::CPAN

package AnyMQ::Queue;
use strict;
use Any::Moose;
use AnyEvent;
use Try::Tiny;
use Scalar::Util qw(weaken refaddr);
use Time::HiRes;
use constant DEBUG => 0;

has id => (is => 'rw', isa => 'Str');
has persistent => (is => "rw", isa => "Bool", default => sub { 0 });
has buffer => (is => "ro", isa => "ArrayRef", default => sub { [] });
has cv => (is => "rw", isa => "AnyEvent::CondVar", default => sub { AE::cv });
has destroyed => (is => "rw", isa => "Bool", default => sub { 0 });
has on_timeout => (is => "rw");
has on_error  => (is => "rw");
has timeout => (is => "rw", isa => "Int", default => sub { 55 });

sub BUILD {
    my $self = shift;
    $self->id('AnyMQ-'.refaddr($self)) unless $self->id;
}

sub append {
    my($self, @messages) = @_;

    if ($self->{cv}->cb) {
        # currently listening: flush and send the events right away
        $self->_flush(@messages);
    } else {
        # between long poll comet: buffer the events
        push @{$self->{buffer}}, @messages;
    }
}

sub subscribe {
    my ($self, $topic) = @_;
    Carp::cluck unless $topic;
    $topic->add_subscriber($self);
}

sub _flush {
    my ($self, @messages) = @_;

    my $cb = $self->{cv}->cb;
    my $cv = $self->{cv};
    $self->{cv} = AE::cv;
    $self->{buffer} = [];

    try {
        $cv->send(@messages);
    } catch {
        if ($self->on_error) {
            $self->on_error->($self, $_, @messages);
        }
        else {
            $self->destroyed(1);
        }
    };

    return if $self->destroyed;

    if ($self->{persistent}) {
        $self->{cv}->cb($cb);
        $self->_flush( @{ $self->{buffer} })



( run in 1.259 second using v1.01-cache-2.11-cpan-39bf76dae61 )