AnyMQ
view release on metacpan or search on metacpan
lib/AnyMQ/Topic.pm view on Meta::CPAN
package AnyMQ::Topic;
use strict;
use 5.008_001;
our $VERSION = '0.01';
use AnyEvent;
use Any::Moose;
use Try::Tiny;
use Scalar::Util;
use Time::HiRes;
with any_moose("X::Traits");
has name => (is => 'rw', isa => 'Str');
has bus => (is => "ro", isa => "AnyMQ", weak_ref => 1);
has queues => (traits => ['Hash'],
is => 'rw',
isa => 'HashRef',
default => sub { {} },
handles => {
add_listener => 'set',
has_no_listeners => 'is_empty',
}
);
has recycle => (is => "rw", isa => "Bool", default => sub { 0 });
has 'reaper_interval' => (is => 'ro', isa => 'Int', default => sub { 30 });
has 'publish_to_queues' => (is => 'rw', isa => 'Bool', default => sub { 1 });
has '_listener_reaper' => (is => 'rw');
has '+_trait_namespace' => (default => 'AnyMQ::Topic::Trait');
sub BUILD {
my $self = shift;
$self->install_reaper if $self->reaper_interval;
}
sub install_reaper {
my $self = shift;
$self->_listener_reaper(
AnyEvent->timer(interval => $self->reaper_interval,
cb => sub { $self->reap_destroyed_listeners })
);
}
sub reap_destroyed_listeners {
my $self = shift;
return if $self->has_no_listeners;
$self->remove_subscriber($_)
for grep { $_->destroyed } values %{$self->queues};
if ($self->recycle && $self->has_no_listeners) {
delete $self->bus->topics->{$self->name};
}
}
sub publish {
my ($self, @messages) = @_;
$self->append_to_queues(@messages) if $self->publish_to_queues;
$self->dispatch_messages(@messages);
}
sub dispatch_messages {
my ($self, @messages) = @_;
}
sub append_to_queues {
my ($self, @messages) = @_;
$self->reap_destroyed_listeners;
( run in 0.449 second using v1.01-cache-2.11-cpan-39bf76dae61 )