AnyEvent-RabbitMQ-Fork
view release on metacpan or search on metacpan
lib/AnyEvent/RabbitMQ/Fork.pm view on Meta::CPAN
package AnyEvent::RabbitMQ::Fork;
$AnyEvent::RabbitMQ::Fork::VERSION = '0.6';
# ABSTRACT: Run AnyEvent::RabbitMQ inside AnyEvent::Fork(::RPC)
=head1 NAME
AnyEvent::RabbitMQ::Fork - Run AnyEvent::RabbitMQ inside AnyEvent::Fork(::RPC)
=cut
use Moo;
use Types::Standard qw(CodeRef Str HashRef InstanceOf Bool Object);
use Scalar::Util qw(weaken);
use Carp qw(croak);
use File::ShareDir qw(dist_file);
use constant DEFAULT_AMQP_SPEC =>
dist_file('AnyEvent-RabbitMQ', 'fixed_amqp0-9-1.xml');
use namespace::clean;
use AnyEvent::Fork;
use AnyEvent::Fork::RPC;
use Net::AMQP;
use AnyEvent::RabbitMQ::Fork::Channel;
=head1 SYNOPSIS
use AnyEvent::RabbitMQ::Fork;
my $cv = AnyEvent->condvar;
my $ar = AnyEvent::RabbitMQ::Fork->new->load_xml_spec()->connect(
host => 'localhost',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/',
timeout => 1,
tls => 0, # Or 1 if you'd like SSL
tune => { heartbeat => 30, channel_max => $whatever, frame_max = $whatever },
on_success => sub {
my $ar = shift;
$ar->open_channel(
on_success => sub {
my $channel = shift;
$channel->declare_exchange(
exchange => 'test_exchange',
on_success => sub {
$cv->send('Declared exchange');
},
on_failure => $cv,
);
},
on_failure => $cv,
on_close => sub {
my $method_frame = shift->method_frame;
die $method_frame->reply_code, $method_frame->reply_text;
},
);
},
on_failure => $cv,
on_read_failure => sub { die @_ },
on_return => sub {
my $frame = shift;
die "Unable to deliver ", Dumper($frame);
},
on_close => sub {
my $why = shift;
if (ref($why)) {
my $method_frame = $why->method_frame;
die $method_frame->reply_code, ": ", $method_frame->reply_text;
}
else {
die $why;
}
},
);
print $cv->recv, "\n";
=cut
has verbose => (is => 'rw', isa => Bool, default => 0);
has is_open => (is => 'ro', isa => Bool, default => 0);
has login_user => (is => 'ro', isa => Str);
has server_properties => (is => 'ro', isa => Str);
has worker_class => (is => 'lazy', isa => Str);
has channel_class => (is => 'lazy', isa => Str);
has worker_function => (is => 'lazy', isa => Str);
has init_function => (is => 'lazy', isa => Str);
sub _build_worker_class { return __PACKAGE__ . '::Worker' }
sub _build_channel_class { return __PACKAGE__ . '::Channel' }
sub _build_worker_function { return $_[0]->worker_class . '::run' }
sub _build_init_function { return $_[0]->worker_class . '::init' }
has _drain_cv => (is => 'lazy', isa => Object, predicate => 1, clearer => 1);
sub _build__drain_cv { return AE::cv }
has channels => (
is => 'ro',
isa => HashRef [InstanceOf ['AnyEvent::RabbitMQ::Fork::Channel']],
clearer => 1,
default => sub { {} },
init_arg => undef,
);
has cb_registry => (
is => 'ro',
isa => HashRef,
default => sub { {} },
clearer => 1,
init_arg => undef,
);
has rpc => (
is => 'lazy',
isa => CodeRef,
predicate => 1,
clearer => 1,
init_arg => undef,
);
sub _build_rpc {
my $self = shift;
weaken(my $wself = $self);
return AnyEvent::Fork->new #
->require($self->worker_class) #
->send_arg($self->worker_class, verbose => $self->verbose) #
->AnyEvent::Fork::RPC::run(
$self->worker_function,
async => 1,
serialiser => $AnyEvent::Fork::RPC::STORABLE_SERIALISER,
on_event => sub { $wself && $wself->_on_event(@_) },
on_error => sub { $wself && $wself->_on_error(@_) },
on_destroy => sub { $wself && $wself->_on_destroy(@_) },
init => $self->init_function,
# TODO look into
#done => '',
);
}
=head1 DESCRIPTION
This module is mean't to be a close to a drop-in facade for running
L<AnyEvent::RabbitMQ> in a background process via L<AnyEvent::Fork::RPC>.
Tha main use case is for programs where other operations block with little
control due to difficulty/laziness. In this way, the process hosting the
connection RabbitMQ is doing nothing else but processing messages.
=cut
my $cb_id = 'a'; # textual ++ gives a bigger space than numerical ++
sub _delegate {
my ($self, $method, $ch_id, @args, %args) = @_;
unless (@args % 2) {
%args = @args;
@args = ();
foreach my $event (grep { /^on_/ } keys %args) {
my $id = $cb_id++;
# store the user callback
$self->cb_registry->{$id} = delete $args{$event};
# create a signature to send back to on_event
$args{$event} = [$id, $event, $method, scalar caller];
}
}
$self->rpc->(
$method, $ch_id,
(@args ? @args : %args),
sub {
croak @_ if @_;
}
);
return $self;
}
=head1 CONSTRCTOR
( run in 1.429 second using v1.01-cache-2.11-cpan-39bf76dae61 )