AnyMQ-AMQP

 view release on metacpan or  search on metacpan

lib/AnyMQ/Trait/AMQP.pm  view on Meta::CPAN


sub connect {
    my $self = shift;
    my $cv = shift;

    my $rf = AnyEvent::RabbitMQ->new(timeout => 1, verbose => 0);
    $self->_rf($rf);

    # XXX: wrapped object with monadic method modifier
    # my $channel = run_monad { $rf->connect(....)->open_channel()->return }
    # my $queue = run_monad { $channel->declare_queue(....)->return }->method_frame->queue;
    # run_monad { $channel->consume( ....) }

    my $init = sub {
        my $channel = shift;
        $channel->declare_queue(
            exclusive => 1,
            on_success => sub {
                my $method = shift;
                my $queue = $method->method_frame->queue;
                $self->_rf_queue($queue);
                $channel->consume(queue => $queue,
                                  no_ack => 1,
                                  on_success => sub {
                                      $cv->send('init');
                                  },
                                  on_consume => $self->on_consume,
                                  on_failure => $cv,
                              );
            },

lib/AnyMQ/Trait/AMQP.pm  view on Meta::CPAN

            # XXX: try to reconnect and reinstantiate all topics
            warn "==> connection closed";
        },
        on_failure => $cv,
    );
}

sub on_consume {
    my $self = shift;
    sub {
        my $frame = shift;
        my $payload = $frame->{body}->payload;
        my $reply_to = $frame->{header}->reply_to;
        return if $reply_to && $reply_to eq $self->_rf_queue;
        my $topic = $frame->{deliver}->method_frame->routing_key;
        try { $self->topics->{$topic}->AnyMQ::Topic::publish(JSON::from_json($payload)) }
        catch { croak "failed to republsih on $topic: $_" };
    };
}

sub new_topic {
    my ($self, $opt) = @_;
    $opt = { name => $opt } unless ref $opt;
    AnyMQ::Topic->new_with_traits(
        traits => ['AMQP'],



( run in 1.117 second using v1.01-cache-2.11-cpan-df04353d9ac )