AnyMQ
view release on metacpan or search on metacpan
lib/AnyMQ/Queue.pm view on Meta::CPAN
}
};
return if $self->destroyed;
if ($self->{persistent}) {
$self->{cv}->cb($cb);
$self->_flush( @{ $self->{buffer} })
if @{ $self->{buffer} };
} else {
$self->{timer} = $self->_reaper;
}
}
sub _reaper {
my ($self, $timeout) = @_;
AnyEvent->timer(
after => $timeout || $self->timeout,
cb => sub {
weaken $self;
warn "Timing out $self long-poll" if DEBUG;
if ($self->on_timeout) {
$self->on_timeout->($self, "timeout")
}
else {
$self->destroyed(1);
}
lib/AnyMQ/Queue.pm view on Meta::CPAN
sub poll_once {
my($self, $cb, $timeout) = @_;
warn "already polled by another client" if $self->persistent;
$self->{cv}->cb(sub { $cb->($_[0]->recv) });
# reset garbage collection timeout with the long-poll timeout
# $timeout = 0 is a valid timeout for interval-polling
$timeout = 55 unless defined $timeout;
$self->{timer} = AE::timer $timeout || 55, 0, sub {
weaken $self;
warn "long-poll timeout, flush and wait for client reconnect" if DEBUG;
$self->_flush;
};
weaken $self->{timer};
# flush buffer for a long-poll client
$self->_flush( @{ $self->{buffer} })
if @{ $self->{buffer} };
}
sub poll {
my ($self, $cb) = @_;
$self->cv->cb(sub { $cb->($_[0]->recv) });
$self->persistent(1);
undef $self->{timer};
$self->_flush( @{ $self->{buffer} })
if @{ $self->{buffer} };
}
sub unpoll {
my $self = shift;
$self->cv->cb(undef);
$self->persistent(0);
$self->{timer} = $self->_reaper;
}
__PACKAGE__->meta->make_immutable;
no Any::Moose;
1;
__END__
=encoding utf-8
lib/AnyMQ/Topic.pm view on Meta::CPAN
sub BUILD {
my $self = shift;
$self->install_reaper if $self->reaper_interval;
}
sub install_reaper {
my $self = shift;
$self->_listener_reaper(
AnyEvent->timer(interval => $self->reaper_interval,
cb => sub { $self->reap_destroyed_listeners })
);
}
sub reap_destroyed_listeners {
my $self = shift;
return if $self->has_no_listeners;
$self->remove_subscriber($_)
for grep { $_->destroyed } values %{$self->queues};
t/downgrade.t view on Meta::CPAN
my $tests = 2;
my $sequence = 0;
sub do_test {
my ( $channel, $client ) = @_;
my $seq = ++$sequence;
my @send_events = ( { data1 => $seq }, { data2 => $seq }, { data3 => $seq } );
my $cv = AE::cv;
my $t = AE::timer 5, 0, sub { $cv->croak( "timeout" ); };
my $pub = AnyMQ->topic( $channel );
my $sub = AnyMQ->new_listener( $pub );
$sub->on_error(sub {
my ($queue, $error, @msg) = @_;
$queue->persistent(0);
$queue->append(@msg);
});
$sub->poll(sub {
if ($_[0]{data2}) {
my $tests = 3;
my $sequence = 0;
sub do_test {
my ( $channel, $client ) = @_;
my $seq = ++$sequence;
my @send_events = ( { data1 => $seq }, { data2 => $seq } );
my $cv = AE::cv;
my $t = AE::timer 5, 0, sub { $cv->croak( "timeout" ); };
my $pub = AnyMQ->topic( $channel );
my $sub = AnyMQ->new_listener( $pub );
$sub->on_error(sub {
isa_ok($_[0], 'AnyMQ::Queue');
like($_[1], qr'failed callback');
$_[0]->destroyed(1);
$cv->send(1);
},
);
t/events_before_long_poll.t view on Meta::CPAN
my $tests = 3;
my $sequence = 0;
sub do_test {
my ( $channel, $client ) = @_;
my $seq = ++$sequence;
my @send_events = ( { data1 => $seq }, { data2 => $seq }, );
my $cv = AE::cv;
my $t = AE::timer 1, 0, sub { $cv->croak( "timeout" ); };
my $pub = AnyMQ->topic( $channel );
my $sub = AnyMQ->new_listener( $pub );
$sub->subscribe($pub);
# Publish events before the client has connected.
$pub->publish( @send_events );
# Should be able to get published events.
t/listener-reaper.pl view on Meta::CPAN
$client->on_error( sub { $_[0]->destroyed(1); $cv->send });
$client->poll(sub { die });
$channel->publish({ data => 1});
$cv->recv;
ok($client->destroyed, 'client is destroyed');
ok(!$channel->has_no_listeners, 'listener still registered with topic');
my $cv = AE::cv;
my $w; $w = AnyEvent->timer(after => 2, cb => sub { $cv->send });
$cv->recv;
ok($channel->has_no_listeners, 'listener automatically reapped');
done_testing;
t/timeout.t view on Meta::CPAN
my $tests = 3;
my $sequence = 0;
sub do_test {
my ( $channel, $client ) = @_;
my $seq = ++$sequence;
my @send_events = ( { data1 => $seq }, { data2 => $seq } );
my $cv = AE::cv;
my $t = AE::timer 5, 0, sub { $cv->croak( "timeout" ); };
my $pub = AnyMQ->topic( $channel );
my $sub = AnyMQ->new_listener( $pub );
$sub->poll_once(sub {
is $_[0]{data1}, $seq
});
$sub->timeout(1);
$sub->on_timeout(sub {
isa_ok($_[0], 'AnyMQ::Queue');
like($_[1], qr'timeout');
( run in 1.008 second using v1.01-cache-2.11-cpan-49f99fa48dc )