AnyMQ

 view release on metacpan or  search on metacpan

lib/AnyMQ/Queue.pm  view on Meta::CPAN

        }
    };

    return if $self->destroyed;

    if ($self->{persistent}) {
        $self->{cv}->cb($cb);
        $self->_flush( @{ $self->{buffer} })
            if @{ $self->{buffer} };
    } else {
        $self->{timer} = $self->_reaper;
    }

}

sub _reaper {
    my ($self, $timeout) = @_;

    AnyEvent->timer(
        after => $timeout || $self->timeout,
        cb => sub {
            weaken $self;
            warn "Timing out $self long-poll" if DEBUG;
            if ($self->on_timeout) {
                $self->on_timeout->($self, "timeout")
            }
            else {
                $self->destroyed(1);
            }

lib/AnyMQ/Queue.pm  view on Meta::CPAN

sub poll_once {
    my($self, $cb, $timeout) = @_;

    warn "already polled by another client" if $self->persistent;

    $self->{cv}->cb(sub { $cb->($_[0]->recv) });

    # reset garbage collection timeout with the long-poll timeout
    # $timeout = 0 is a valid timeout for interval-polling
    $timeout = 55 unless defined $timeout;
    $self->{timer} = AE::timer $timeout || 55, 0, sub {
        weaken $self;
        warn "long-poll timeout, flush and wait for client reconnect" if DEBUG;
        $self->_flush;
    };
    weaken $self->{timer};

    # flush buffer for a long-poll client
    $self->_flush( @{ $self->{buffer} })
        if @{ $self->{buffer} };
}

sub poll {
    my ($self, $cb) = @_;
    $self->cv->cb(sub { $cb->($_[0]->recv) });
    $self->persistent(1);

    undef $self->{timer};

    $self->_flush( @{ $self->{buffer} })
        if @{ $self->{buffer} };
}

sub unpoll {
    my $self = shift;
    $self->cv->cb(undef);
    $self->persistent(0);
    $self->{timer} = $self->_reaper;
}

__PACKAGE__->meta->make_immutable;
no Any::Moose;
1;

__END__

=encoding utf-8

lib/AnyMQ/Topic.pm  view on Meta::CPAN


sub BUILD {
    my $self = shift;
    $self->install_reaper if $self->reaper_interval;
}

sub install_reaper {
    my $self = shift;

    $self->_listener_reaper(
        AnyEvent->timer(interval => $self->reaper_interval,
                        cb => sub { $self->reap_destroyed_listeners })
    );
}

sub reap_destroyed_listeners {
    my $self = shift;
    return if $self->has_no_listeners;
    $self->remove_subscriber($_)
        for grep { $_->destroyed } values %{$self->queues};

t/downgrade.t  view on Meta::CPAN

my $tests = 2;

my $sequence = 0;

sub do_test {
    my ( $channel, $client ) = @_;
    my $seq = ++$sequence;
    my @send_events = ( { data1 => $seq }, { data2 => $seq }, { data3 => $seq } );

    my $cv = AE::cv;
    my $t  = AE::timer 5, 0, sub { $cv->croak( "timeout" ); };

    my $pub = AnyMQ->topic( $channel );
    my $sub = AnyMQ->new_listener( $pub );
    $sub->on_error(sub {
                       my ($queue, $error, @msg) = @_;
                       $queue->persistent(0);
                       $queue->append(@msg);
                   });
    $sub->poll(sub {
                   if ($_[0]{data2}) {

t/error.t  view on Meta::CPAN

my $tests = 3;

my $sequence = 0;

sub do_test {
    my ( $channel, $client ) = @_;
    my $seq = ++$sequence;
    my @send_events = ( { data1 => $seq }, { data2 => $seq } );

    my $cv = AE::cv;
    my $t  = AE::timer 5, 0, sub { $cv->croak( "timeout" ); };

    my $pub = AnyMQ->topic( $channel );
    my $sub = AnyMQ->new_listener( $pub );
    $sub->on_error(sub {
                       isa_ok($_[0], 'AnyMQ::Queue');
                       like($_[1], qr'failed callback');
                       $_[0]->destroyed(1);
                       $cv->send(1);
                   },
               );

t/events_before_long_poll.t  view on Meta::CPAN


my $tests = 3;

my $sequence = 0;
sub do_test {
	my ( $channel, $client ) = @_;
	my $seq = ++$sequence;
	my @send_events = ( { data1 => $seq }, { data2 => $seq }, );

	my $cv = AE::cv;
	my $t  = AE::timer 1, 0, sub { $cv->croak( "timeout" ); };

	my $pub = AnyMQ->topic( $channel );
	my $sub = AnyMQ->new_listener( $pub );
        $sub->subscribe($pub);

	# Publish events before the client has connected.
	$pub->publish( @send_events );

	# Should be able to get published events.

t/listener-reaper.pl  view on Meta::CPAN

$client->on_error( sub { $_[0]->destroyed(1); $cv->send });
$client->poll(sub { die });

$channel->publish({ data => 1});

$cv->recv;
ok($client->destroyed, 'client is destroyed');
ok(!$channel->has_no_listeners, 'listener still registered with topic');

my $cv = AE::cv;
my $w; $w = AnyEvent->timer(after => 2, cb => sub { $cv->send });

$cv->recv;

ok($channel->has_no_listeners, 'listener automatically reapped');

done_testing;

t/timeout.t  view on Meta::CPAN

my $tests = 3;

my $sequence = 0;

sub do_test {
    my ( $channel, $client ) = @_;
    my $seq = ++$sequence;
    my @send_events = ( { data1 => $seq }, { data2 => $seq } );

    my $cv = AE::cv;
    my $t  = AE::timer 5, 0, sub { $cv->croak( "timeout" ); };

    my $pub = AnyMQ->topic( $channel );
    my $sub = AnyMQ->new_listener( $pub );
    $sub->poll_once(sub {
                        is $_[0]{data1}, $seq
                    });
    $sub->timeout(1);
    $sub->on_timeout(sub {
                       isa_ok($_[0], 'AnyMQ::Queue');
                       like($_[1], qr'timeout');



( run in 1.008 second using v1.01-cache-2.11-cpan-49f99fa48dc )