AnyEvent-RabbitMQ-Fork
view release on metacpan or search on metacpan
lib/AnyEvent/RabbitMQ/Fork.pm view on Meta::CPAN
package AnyEvent::RabbitMQ::Fork;
$AnyEvent::RabbitMQ::Fork::VERSION = '0.6';
# ABSTRACT: Run AnyEvent::RabbitMQ inside AnyEvent::Fork(::RPC)
=head1 NAME
AnyEvent::RabbitMQ::Fork - Run AnyEvent::RabbitMQ inside AnyEvent::Fork(::RPC)
=cut
use Moo;
use Types::Standard qw(CodeRef Str HashRef InstanceOf Bool Object);
use Scalar::Util qw(weaken);
use Carp qw(croak);
use File::ShareDir qw(dist_file);
use constant DEFAULT_AMQP_SPEC =>
dist_file('AnyEvent-RabbitMQ', 'fixed_amqp0-9-1.xml');
use namespace::clean;
use AnyEvent::Fork;
use AnyEvent::Fork::RPC;
use Net::AMQP;
use AnyEvent::RabbitMQ::Fork::Channel;
=head1 SYNOPSIS
use AnyEvent::RabbitMQ::Fork;
my $cv = AnyEvent->condvar;
my $ar = AnyEvent::RabbitMQ::Fork->new->load_xml_spec()->connect(
host => 'localhost',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/',
timeout => 1,
tls => 0, # Or 1 if you'd like SSL
tune => { heartbeat => 30, channel_max => $whatever, frame_max = $whatever },
on_success => sub {
my $ar = shift;
$ar->open_channel(
on_success => sub {
my $channel = shift;
$channel->declare_exchange(
exchange => 'test_exchange',
on_success => sub {
$cv->send('Declared exchange');
},
on_failure => $cv,
);
},
on_failure => $cv,
on_close => sub {
my $method_frame = shift->method_frame;
die $method_frame->reply_code, $method_frame->reply_text;
},
);
},
on_failure => $cv,
on_read_failure => sub { die @_ },
on_return => sub {
my $frame = shift;
die "Unable to deliver ", Dumper($frame);
},
on_close => sub {
my $why = shift;
if (ref($why)) {
my $method_frame = $why->method_frame;
die $method_frame->reply_code, ": ", $method_frame->reply_text;
}
else {
die $why;
}
},
);
print $cv->recv, "\n";
=cut
has verbose => (is => 'rw', isa => Bool, default => 0);
has is_open => (is => 'ro', isa => Bool, default => 0);
has login_user => (is => 'ro', isa => Str);
has server_properties => (is => 'ro', isa => Str);
has worker_class => (is => 'lazy', isa => Str);
has channel_class => (is => 'lazy', isa => Str);
has worker_function => (is => 'lazy', isa => Str);
has init_function => (is => 'lazy', isa => Str);
sub _build_worker_class { return __PACKAGE__ . '::Worker' }
sub _build_channel_class { return __PACKAGE__ . '::Channel' }
sub _build_worker_function { return $_[0]->worker_class . '::run' }
sub _build_init_function { return $_[0]->worker_class . '::init' }
has _drain_cv => (is => 'lazy', isa => Object, predicate => 1, clearer => 1);
sub _build__drain_cv { return AE::cv }
has channels => (
is => 'ro',
isa => HashRef [InstanceOf ['AnyEvent::RabbitMQ::Fork::Channel']],
clearer => 1,
default => sub { {} },
init_arg => undef,
);
has cb_registry => (
is => 'ro',
isa => HashRef,
default => sub { {} },
clearer => 1,
init_arg => undef,
);
has rpc => (
is => 'lazy',
isa => CodeRef,
predicate => 1,
clearer => 1,
init_arg => undef,
);
sub _build_rpc {
my $self = shift;
weaken(my $wself = $self);
return AnyEvent::Fork->new #
->require($self->worker_class) #
lib/AnyEvent/RabbitMQ/Fork.pm view on Meta::CPAN
=head1 METHODS
=over
=item load_xml_spec([$amqp_spec_xml_path])
Declare and load the AMQP Specification you wish to use. The default is to use
version 0.9.1 with RabbitMQ specific extensions.
B<Returns: $self>
=cut
my $_loaded_spec;
sub load_xml_spec {
my $self = shift;
my $spec = shift || DEFAULT_AMQP_SPEC;
if ($_loaded_spec and $_loaded_spec ne $spec) {
croak(
"Tried to load AMQP spec $spec, but have already loaded $_loaded_spec, not possible"
);
} elsif (!$_loaded_spec) {
Net::AMQP::Protocol->load_xml_spec($_loaded_spec = $spec);
}
return $self->_delegate(load_xml_spec => 0, $spec);
}
=item connect(%opts)
Open connection to an AMQP server to begin work.
Arguments:
=over
=item B<host>
=item B<port>
=item B<user>
=item B<pass>
=item B<vhost>
=item B<timeout> TCP timeout in seconds. Default: use L<AnyEvent::Socket> default
=item B<tls> Boolean to use SSL/TLS or not. Default: 0
=item B<tune> Hash: (values are negotiated with the server)
=over
=item B<heartbeat> Heartbeat interval in seconds. Default: 0 (off)
=item B<channel_max> Maximum channel ID. Default: 65536
=item B<frame_max> Maximum frame size in bytes. Default: 131072
=back
=item B<on_success> Callback when the connection is successfully established.
=item B<on_failure> Called when a failure occurs over the lifetime of the connection.
=item B<on_read_failure> Called when there is a problem reading response from the server.
=item B<on_return> Called if the server returns a published message.
=item B<on_close> Called when the connection is closed remotely.
=back
B<Returns: $self>
=item open_channel(%opts)
Open a logical channel which is where all the AMQP fun is.
Arguments:
=over
=item B<on_success> Called when the channel is open and ready for use.
=item B<on_failure> Called if there is a problem opening the channel.
=item B<on_close> Called when the channel is closed.
=back
=item close(%opts)
Close this connection.
=over
=item B<on_success> Called on successful shutdown.
=item B<on_failure> Called on failed shutdown. Note: the connection is still
closed after this
=back
=back
=cut
foreach my $method (qw(connect open_channel close)) {
no strict 'refs';
*$method = sub {
my $self = shift;
return $self->_delegate($method => 0, @_);
};
}
sub drain_writes {
my ($self, $to) = @_;
( run in 1.239 second using v1.01-cache-2.11-cpan-df04353d9ac )