AnyMQ-AMQP

 view release on metacpan or  search on metacpan

lib/AnyMQ/Trait/AMQP.pm  view on Meta::CPAN

        my $msg = $_[0]->recv;
        if ( $msg eq 'init' ) {
            $self->_connected(1);
            $self->on_ready->() if $self->on_ready;
        }
        else {
            my $cv = AE::cv;
            $cv->cb($cb);
            $self->cv($cv);
            carp "Connection failed, retrying in 5 seconds.  Reason: ".$msg;
            my $w; $w = AnyEvent->timer(after => 5,
                                        cb => sub {
                                            undef $w;
                                            $self->connect($cv);
                                        });
        }
    };
    $cv->cb($cb);

    if (!$self->on_ready) {
        while ((my $msg = $self->cv->recv) ne 'init') {};

xt/author/basic.t  view on Meta::CPAN

                       $q2->send([0, 'extra message received.']);
                   }
               });

# messages coming to the exchange should be bound to our queue
$ext_channel->publish( routing_key => 'test_q',
                       exchange => 'foo',
                       body => JSON::to_json({ time => time() }),
                   ) for 1..10;

my $w; $w = AnyEvent->timer( after => 5,
                             cb => sub {
                                 $q->send([0, 'test timeout']);
                             } );
my ($ok, $msg) = @{$q->recv};
ok($ok, $msg);

$bus2->cv(AE::cv);
my $test2 = $bus2->topic({name => 'test_q', publisher_only => 0});
$bus2->cv->recv;

my $client2 = $bus2->new_listener; $client2->subscribe($test2);

$client2->poll( sub {
                    my $timestamp = shift->{time};
                    diag "client2 latency: ".(time() - $timestamp);
                    $c2->end;
                });

$test->publish({ time => time() });

$w = AnyEvent->timer( after => 5,
                      cb => sub {
                          $q2->send([0, 'test timeout']);
                      } );

($ok, $msg) = @{$q2->recv};
ok($ok, $msg);

undef $bus; undef $bus2;

done_testing;



( run in 1.153 second using v1.01-cache-2.11-cpan-49f99fa48dc )