AnyEvent-RabbitMQ-Fork

 view release on metacpan or  search on metacpan

lib/AnyEvent/RabbitMQ/Fork.pm  view on Meta::CPAN

package AnyEvent::RabbitMQ::Fork;
$AnyEvent::RabbitMQ::Fork::VERSION = '0.6';
# ABSTRACT: Run AnyEvent::RabbitMQ inside AnyEvent::Fork(::RPC)

=head1 NAME

AnyEvent::RabbitMQ::Fork - Run AnyEvent::RabbitMQ inside AnyEvent::Fork(::RPC)

=cut

use Moo;
use Types::Standard qw(CodeRef Str HashRef InstanceOf Bool Object);
use Scalar::Util qw(weaken);
use Carp qw(croak);
use File::ShareDir qw(dist_file);

use constant DEFAULT_AMQP_SPEC =>
  dist_file('AnyEvent-RabbitMQ', 'fixed_amqp0-9-1.xml');

use namespace::clean;

use AnyEvent::Fork;
use AnyEvent::Fork::RPC;

use Net::AMQP;

use AnyEvent::RabbitMQ::Fork::Channel;

=head1 SYNOPSIS

  use AnyEvent::RabbitMQ::Fork;

  my $cv = AnyEvent->condvar;

  my $ar = AnyEvent::RabbitMQ::Fork->new->load_xml_spec()->connect(
      host       => 'localhost',
      port       => 5672,
      user       => 'guest',
      pass       => 'guest',
      vhost      => '/',
      timeout    => 1,
      tls        => 0, # Or 1 if you'd like SSL
      tune       => { heartbeat => 30, channel_max => $whatever, frame_max = $whatever },
      on_success => sub {
          my $ar = shift;
          $ar->open_channel(
              on_success => sub {
                  my $channel = shift;
                  $channel->declare_exchange(
                      exchange   => 'test_exchange',
                      on_success => sub {
                          $cv->send('Declared exchange');
                      },
                      on_failure => $cv,
                  );
              },
              on_failure => $cv,
              on_close   => sub {
                  my $method_frame = shift->method_frame;
                  die $method_frame->reply_code, $method_frame->reply_text;
              },
          );
      },
      on_failure => $cv,
      on_read_failure => sub { die @_ },
      on_return  => sub {
          my $frame = shift;
          die "Unable to deliver ", Dumper($frame);
      },
      on_close   => sub {
          my $why = shift;
          if (ref($why)) {
              my $method_frame = $why->method_frame;
              die $method_frame->reply_code, ": ", $method_frame->reply_text;
          }
          else {
              die $why;
          }
      },
  );

  print $cv->recv, "\n";

=cut

has verbose => (is => 'rw', isa => Bool, default => 0);
has is_open => (is => 'ro', isa => Bool, default => 0);
has login_user        => (is => 'ro', isa => Str);
has server_properties => (is => 'ro', isa => Str);

has worker_class    => (is => 'lazy', isa => Str);
has channel_class   => (is => 'lazy', isa => Str);
has worker_function => (is => 'lazy', isa => Str);
has init_function   => (is => 'lazy', isa => Str);

sub _build_worker_class    { return __PACKAGE__ . '::Worker' }
sub _build_channel_class   { return __PACKAGE__ . '::Channel' }
sub _build_worker_function { return $_[0]->worker_class . '::run' }
sub _build_init_function   { return $_[0]->worker_class . '::init' }

has _drain_cv => (is => 'lazy', isa => Object, predicate => 1, clearer => 1);

sub _build__drain_cv { return AE::cv }

has channels => (
    is      => 'ro',
    isa     => HashRef [InstanceOf ['AnyEvent::RabbitMQ::Fork::Channel']],
    clearer => 1,
    default  => sub { {} },
    init_arg => undef,
);

has cb_registry => (
    is       => 'ro',
    isa      => HashRef,
    default  => sub { {} },
    clearer  => 1,
    init_arg => undef,
);

has rpc => (
    is        => 'lazy',
    isa       => CodeRef,
    predicate => 1,
    clearer   => 1,
    init_arg  => undef,
);

sub _build_rpc {
    my $self = shift;
    weaken(my $wself = $self);

    return AnyEvent::Fork->new          #
      ->require($self->worker_class)    #

lib/AnyEvent/RabbitMQ/Fork.pm  view on Meta::CPAN


=head1 METHODS

=over

=item load_xml_spec([$amqp_spec_xml_path])

Declare and load the AMQP Specification you wish to use. The default is to use
version 0.9.1 with RabbitMQ specific extensions.

B<Returns: $self>

=cut

my $_loaded_spec;
sub load_xml_spec {
    my $self = shift;
    my $spec = shift || DEFAULT_AMQP_SPEC;

    if ($_loaded_spec and $_loaded_spec ne $spec) {
        croak(
            "Tried to load AMQP spec $spec, but have already loaded $_loaded_spec, not possible"
        );
    } elsif (!$_loaded_spec) {
        Net::AMQP::Protocol->load_xml_spec($_loaded_spec = $spec);
    }

    return $self->_delegate(load_xml_spec => 0, $spec);
}

=item connect(%opts)

Open connection to an AMQP server to begin work.

Arguments:

=over

=item B<host>

=item B<port>

=item B<user>

=item B<pass>

=item B<vhost>

=item B<timeout> TCP timeout in seconds. Default: use L<AnyEvent::Socket> default

=item B<tls> Boolean to use SSL/TLS or not. Default: 0

=item B<tune> Hash: (values are negotiated with the server)

=over

=item B<heartbeat> Heartbeat interval in seconds. Default: 0 (off)

=item B<channel_max> Maximum channel ID. Default: 65536

=item B<frame_max> Maximum frame size in bytes. Default: 131072

=back

=item B<on_success> Callback when the connection is successfully established.

=item B<on_failure> Called when a failure occurs over the lifetime of the connection.

=item B<on_read_failure> Called when there is a problem reading response from the server.

=item B<on_return> Called if the server returns a published message.

=item B<on_close> Called when the connection is closed remotely.

=back

B<Returns: $self>

=item open_channel(%opts)

Open a logical channel which is where all the AMQP fun is.

Arguments:

=over

=item B<on_success> Called when the channel is open and ready for use.

=item B<on_failure> Called if there is a problem opening the channel.

=item B<on_close> Called when the channel is closed.

=back

=item close(%opts)

Close this connection.

=over

=item B<on_success> Called on successful shutdown.

=item B<on_failure> Called on failed shutdown. Note: the connection is still
closed after this

=back

=back

=cut

foreach my $method (qw(connect open_channel close)) {
    no strict 'refs';
    *$method = sub {
        my $self = shift;
        return $self->_delegate($method => 0, @_);
    };
}

sub drain_writes {
    my ($self, $to) = @_;



( run in 1.239 second using v1.01-cache-2.11-cpan-df04353d9ac )