AnyEvent-RabbitMQ-RPC

 view release on metacpan or  search on metacpan

lib/AnyEvent/RabbitMQ/RPC.pm  view on Meta::CPAN

package AnyEvent::RabbitMQ::RPC;

use strict;
use warnings;

use AnyEvent::RabbitMQ;
use Try::Tiny;

our $VERSION = '0.5';

sub new {
    my $class = shift;
    my %args = @_;

    my $self = bless {}, $class;

    my $cv = AE::cv;
    my $success = $args{on_success} || $cv;
    my $failure = $args{on_failure} || sub {
        warn "@_";
        $cv->(undef);
    };

    $self->{connection} = $args{connection};
    my $channel = sub {
        $self->connection->open_channel(
            on_success => sub {
                $self->{channel} = shift;
                $self->{channel}->qos;
                $success->($self);
            },
            on_failure => sub {
                $failure->("Channel failed: @_");
            }
        );
    };
    if ($self->connection) {
        $channel->();
    } else {
        AnyEvent::RabbitMQ->load_xml_spec;
        $self->{connection} = AnyEvent::RabbitMQ->new(timeout => 1, verbose => 0);
        $self->connection->connect(
            %args,
            on_success => $channel,
            on_failure => sub {
                $failure->("Connect failed: @_");
            }
        );
    }

    $args{serialize} ||= '';
    if ($args{serialize} eq "YAML") {
        require YAML::Any;
        $self->{serialize}   = \&YAML::Any::Dump;
        $self->{unserialize} = \&YAML::Any::Load;
    } elsif ($args{serialize} eq "JSON") {
        require JSON::Any;
        JSON::Any->import;
        my $json = JSON::Any->new;
        $self->{serialize}   = sub { $json->objToJson( [@_] ) };
        $self->{unserialize} = sub { (@{ $json->jsonToObj(@_) })[0] };
    } elsif ($args{serialize} eq "Storable") {
        require Storable;
        $self->{serialize}   = sub { Storable::nfreeze( [@_] )};
        $self->{unserialize} = sub { (@{ Storable::thaw(@_) })[0] };
    }



( run in 2.722 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )