AnyEvent-RabbitMQ-RPC

 view release on metacpan or  search on metacpan

lib/AnyEvent/RabbitMQ/RPC.pm  view on Meta::CPAN


sub reply_queue {
    my $self = shift;
    my %args = @_;

    $self->channel->declare_queue(
        no_ack     => 1,
        durable    => 0,
        exclusive  => 1,
        on_success => sub {
            $args{on_success}->(shift->method_frame->queue);
        },
        on_failure => $args{on_failure},
    );
}

sub register {
    my $self = shift;
    my %args = (
        name => undef,
        run  => sub {},

lib/AnyEvent/RabbitMQ/RPC.pm  view on Meta::CPAN


    # Ensure we have the queue
    $self->rpc_queue(
        queue      => $args{name},
        on_success => sub {
            # And set up a listen on it
            $self->channel->consume(
                queue      => $args{name},
                no_ack     => 0,
                on_consume => sub {
                    my $frame = shift;
                    my $failed;
                    my $args = $frame->{body}->payload;
                    if ($self->{unserialize}) {
                        try {
                            $args = $self->{unserialize}->($args);
                        } catch {
                            $failed = 1;
                            $args{on_failure}->("Unserialization failed: $_");
                        };
                        return if $failed;
                    }

lib/AnyEvent/RabbitMQ/RPC.pm  view on Meta::CPAN

                    my $return;
                    try {
                        $return = $args{run}->( $args );
                    } catch {
                        $failed = 1;
                        $args{on_failure}->("Call died: $_");
                    };
                    return if $failed;

                    # Send the response, if they asked for it
                    if (my $reply_to = $frame->{header}->reply_to) {
                        if ($self->{serialize}) {
                            try {
                                $return = $self->{serialize}->($return);
                            } catch {
                                $failed = 1;
                                $args{on_failure}->("Serialization failed: $_");
                            };
                            return if $failed;
                        }

lib/AnyEvent/RabbitMQ/RPC.pm  view on Meta::CPAN

    $self->rpc_queue(
        queue      => $args{name},
        on_success => sub {
            $self->reply_queue(
                on_success => sub {
                    my $REPLIES = shift;
                    $self->channel->consume(
                        queue => $REPLIES,
                        no_ack => 1,
                        on_consume => sub {
                            my $frame = shift;
                            # We got a reply, tear down our reply queue
                            $self->channel->delete_queue(
                                queue => $REPLIES,
                            );
                            my $return = $frame->{body}->payload;
                            if ($self->{unserialize}) {
                                my $failed;
                                try {
                                    $return = $self->{unserialize}->($return);
                                } catch {
                                    $args{on_failure}->("Unserialization failed: $_");
                                    $failed = 1;
                                };
                                return if $failed;
                            }



( run in 0.608 second using v1.01-cache-2.11-cpan-df04353d9ac )