AnyMQ

 view release on metacpan or  search on metacpan

lib/AnyMQ/Topic.pm  view on Meta::CPAN

package AnyMQ::Topic;

use strict;
use 5.008_001;
our $VERSION = '0.01';

use AnyEvent;
use Any::Moose;
use Try::Tiny;
use Scalar::Util;
use Time::HiRes;

with any_moose("X::Traits");

has name => (is => 'rw', isa => 'Str');
has bus => (is => "ro", isa => "AnyMQ", weak_ref => 1);
has queues => (traits => ['Hash'],
               is => 'rw',
               isa => 'HashRef',
               default => sub { {} },
               handles => {
                   add_listener      => 'set',
                   has_no_listeners  => 'is_empty',
               }
           );
has recycle => (is => "rw", isa => "Bool", default => sub { 0 });
has 'reaper_interval' => (is => 'ro', isa => 'Int', default => sub { 30 });
has 'publish_to_queues' => (is => 'rw', isa => 'Bool', default => sub { 1 });
has '_listener_reaper' => (is => 'rw');
has '+_trait_namespace' => (default => 'AnyMQ::Topic::Trait');

sub BUILD {
    my $self = shift;
    $self->install_reaper if $self->reaper_interval;
}

sub install_reaper {
    my $self = shift;

    $self->_listener_reaper(
        AnyEvent->timer(interval => $self->reaper_interval,
                        cb => sub { $self->reap_destroyed_listeners })
    );
}

sub reap_destroyed_listeners {
    my $self = shift;
    return if $self->has_no_listeners;
    $self->remove_subscriber($_)
        for grep { $_->destroyed } values %{$self->queues};

    if ($self->recycle && $self->has_no_listeners) {
        delete $self->bus->topics->{$self->name};
    }
}

sub publish {
    my ($self, @messages) = @_;
    $self->append_to_queues(@messages) if $self->publish_to_queues;
    $self->dispatch_messages(@messages);
}

sub dispatch_messages {
    my ($self, @messages) = @_;
}

sub append_to_queues {
    my ($self, @messages) = @_;
    $self->reap_destroyed_listeners;



( run in 0.449 second using v1.01-cache-2.11-cpan-39bf76dae61 )