AnyEvent-RabbitMQ-Fork

 view release on metacpan or  search on metacpan

lib/AnyEvent/RabbitMQ/Fork.pm  view on Meta::CPAN

package AnyEvent::RabbitMQ::Fork;
$AnyEvent::RabbitMQ::Fork::VERSION = '0.6';
# ABSTRACT: Run AnyEvent::RabbitMQ inside AnyEvent::Fork(::RPC)

=head1 NAME

AnyEvent::RabbitMQ::Fork - Run AnyEvent::RabbitMQ inside AnyEvent::Fork(::RPC)

=cut

use Moo;
use Types::Standard qw(CodeRef Str HashRef InstanceOf Bool Object);
use Scalar::Util qw(weaken);
use Carp qw(croak);
use File::ShareDir qw(dist_file);

use constant DEFAULT_AMQP_SPEC =>
  dist_file('AnyEvent-RabbitMQ', 'fixed_amqp0-9-1.xml');

use namespace::clean;

use AnyEvent::Fork;
use AnyEvent::Fork::RPC;

use Net::AMQP;

use AnyEvent::RabbitMQ::Fork::Channel;

=head1 SYNOPSIS

  use AnyEvent::RabbitMQ::Fork;

  my $cv = AnyEvent->condvar;

  my $ar = AnyEvent::RabbitMQ::Fork->new->load_xml_spec()->connect(
      host       => 'localhost',
      port       => 5672,
      user       => 'guest',
      pass       => 'guest',
      vhost      => '/',
      timeout    => 1,
      tls        => 0, # Or 1 if you'd like SSL
      tune       => { heartbeat => 30, channel_max => $whatever, frame_max = $whatever },
      on_success => sub {
          my $ar = shift;
          $ar->open_channel(
              on_success => sub {
                  my $channel = shift;
                  $channel->declare_exchange(
                      exchange   => 'test_exchange',
                      on_success => sub {
                          $cv->send('Declared exchange');
                      },
                      on_failure => $cv,
                  );
              },
              on_failure => $cv,
              on_close   => sub {
                  my $method_frame = shift->method_frame;
                  die $method_frame->reply_code, $method_frame->reply_text;
              },
          );
      },
      on_failure => $cv,
      on_read_failure => sub { die @_ },
      on_return  => sub {
          my $frame = shift;
          die "Unable to deliver ", Dumper($frame);
      },
      on_close   => sub {
          my $why = shift;
          if (ref($why)) {
              my $method_frame = $why->method_frame;
              die $method_frame->reply_code, ": ", $method_frame->reply_text;
          }
          else {
              die $why;
          }
      },
  );

  print $cv->recv, "\n";

=cut

has verbose => (is => 'rw', isa => Bool, default => 0);
has is_open => (is => 'ro', isa => Bool, default => 0);
has login_user        => (is => 'ro', isa => Str);
has server_properties => (is => 'ro', isa => Str);

has worker_class    => (is => 'lazy', isa => Str);
has channel_class   => (is => 'lazy', isa => Str);
has worker_function => (is => 'lazy', isa => Str);
has init_function   => (is => 'lazy', isa => Str);

sub _build_worker_class    { return __PACKAGE__ . '::Worker' }
sub _build_channel_class   { return __PACKAGE__ . '::Channel' }
sub _build_worker_function { return $_[0]->worker_class . '::run' }
sub _build_init_function   { return $_[0]->worker_class . '::init' }

has _drain_cv => (is => 'lazy', isa => Object, predicate => 1, clearer => 1);

sub _build__drain_cv { return AE::cv }

has channels => (
    is      => 'ro',
    isa     => HashRef [InstanceOf ['AnyEvent::RabbitMQ::Fork::Channel']],
    clearer => 1,
    default  => sub { {} },
    init_arg => undef,
);

has cb_registry => (
    is       => 'ro',
    isa      => HashRef,
    default  => sub { {} },
    clearer  => 1,
    init_arg => undef,
);

has rpc => (
    is        => 'lazy',
    isa       => CodeRef,
    predicate => 1,
    clearer   => 1,
    init_arg  => undef,
);

sub _build_rpc {
    my $self = shift;
    weaken(my $wself = $self);

    return AnyEvent::Fork->new          #
      ->require($self->worker_class)    #
      ->send_arg($self->worker_class, verbose => $self->verbose)    #
      ->AnyEvent::Fork::RPC::run(
        $self->worker_function,
        async      => 1,
        serialiser => $AnyEvent::Fork::RPC::STORABLE_SERIALISER,
        on_event   => sub { $wself && $wself->_on_event(@_) },
        on_error   => sub { $wself && $wself->_on_error(@_) },
        on_destroy => sub { $wself && $wself->_on_destroy(@_) },
        init       => $self->init_function,
        # TODO look into
        #done => '',
      );
}

=head1 DESCRIPTION

This module is mean't to be a close to a drop-in facade for running
L<AnyEvent::RabbitMQ> in a background process via L<AnyEvent::Fork::RPC>.

Tha main use case is for programs where other operations block with little
control due to difficulty/laziness. In this way, the process hosting the
connection RabbitMQ is doing nothing else but processing messages.

=cut

my $cb_id = 'a';    # textual ++ gives a bigger space than numerical ++

sub _delegate {
    my ($self, $method, $ch_id, @args, %args) = @_;

    unless (@args % 2) {
        %args = @args;
        @args = ();
        foreach my $event (grep { /^on_/ } keys %args) {
            my $id = $cb_id++;

            # store the user callback
            $self->cb_registry->{$id} = delete $args{$event};

            # create a signature to send back to on_event
            $args{$event} = [$id, $event, $method, scalar caller];
        }
    }

    $self->rpc->(
        $method, $ch_id,
        (@args ? @args : %args),
        sub {
            croak @_ if @_;
        }
    );

    return $self;
}

=head1 CONSTRCTOR



( run in 1.429 second using v1.01-cache-2.11-cpan-39bf76dae61 )