AnyEvent-RabbitMQ-Fork
view release on metacpan or search on metacpan
Revision history for AnyEvent-RabbitMQ-Fork
0.6 2020-07-26 14:02:30-04:00 America/New_York
- fix operator typo in _generate_callback
- redundant code cleanup
- simplify logic in _generate_callback
- make ::Channel->delegate private
- safer rpc callbacks with weakened self
- update test suite from AnyEent::RabbitMQ with mods to get it working again
0.5 2014-12-28 17:38:10-05:00 America/New_York
- update test suite to align with AnyEvent::RabbitMQ
- support bind_exchange & unbind_exchange methods
- in Worker: generate stand-in callbacks in a separate method
0.4 2014-06-06 16:43:00-04:00 America/New_York
- improve handling object state attirbutes being reported to parent
lib/AnyEvent/RabbitMQ/Fork.pm view on Meta::CPAN
# ABSTRACT: Run AnyEvent::RabbitMQ inside AnyEvent::Fork(::RPC)
=head1 NAME
AnyEvent::RabbitMQ::Fork - Run AnyEvent::RabbitMQ inside AnyEvent::Fork(::RPC)
=cut
use Moo;
use Types::Standard qw(CodeRef Str HashRef InstanceOf Bool Object);
use Scalar::Util qw(weaken);
use Carp qw(croak);
use File::ShareDir qw(dist_file);
use constant DEFAULT_AMQP_SPEC =>
dist_file('AnyEvent-RabbitMQ', 'fixed_amqp0-9-1.xml');
use namespace::clean;
use AnyEvent::Fork;
use AnyEvent::Fork::RPC;
lib/AnyEvent/RabbitMQ/Fork.pm view on Meta::CPAN
has rpc => (
is => 'lazy',
isa => CodeRef,
predicate => 1,
clearer => 1,
init_arg => undef,
);
sub _build_rpc {
my $self = shift;
weaken(my $wself = $self);
return AnyEvent::Fork->new #
->require($self->worker_class) #
->send_arg($self->worker_class, verbose => $self->verbose) #
->AnyEvent::Fork::RPC::run(
$self->worker_function,
async => 1,
serialiser => $AnyEvent::Fork::RPC::STORABLE_SERIALISER,
on_event => sub { $wself && $wself->_on_event(@_) },
on_error => sub { $wself && $wself->_on_error(@_) },
lib/AnyEvent/RabbitMQ/Fork/Worker.pm view on Meta::CPAN
=head1 DESCRIPTION
No user serviceable parts inside. Venture at your own risk.
=cut
use Moo;
use Types::Standard qw(InstanceOf Bool);
use Guard;
use Scalar::Util qw(weaken blessed);
use namespace::clean;
use AnyEvent::RabbitMQ 1.18;
has verbose => (is => 'rw', isa => Bool, default => 0);
has connection => (
is => 'lazy',
isa => InstanceOf['AnyEvent::RabbitMQ'],
lib/AnyEvent/RabbitMQ/Fork/Worker.pm view on Meta::CPAN
sub init {
my $class = shift;
$instance = $class->new(@_);
return;
}
sub run {
my ($done, $method, $ch_id, @args, %args) = @_;
weaken(my $self = $instance);
unless (@args % 2) {
%args = @args;
@args = ();
foreach my $event (grep { /^on_/ } keys %args) {
# callback signature provided by parent process
my $sig = delete $args{$event};
# our callback to be used by AE::RMQ
$args{$event} = $self->_generate_callback($method, $event, $sig);
lib/AnyEvent/RabbitMQ/Fork/Worker.pm view on Meta::CPAN
_is_active => 'is_active',
_is_confirm => 'is_confirm',
},
connection => {
_state => 'is_open',
_login_user => 'login_user',
_server_properties => 'server_properties',
}
);
sub _cb_hooks {
weaken(my $obj = shift);
my ($type, $hooks)
= $obj->isa('AnyEvent::RabbitMQ')
? ('connection', $cb_hooks{connection})
: ($obj->id, $cb_hooks{channel});
foreach my $prop (keys %$hooks) {
my $method = $hooks->{$prop};
## no critic (Miscellanea::ProhibitTies)
tie $obj->{$prop}, 'AnyEvent::RabbitMQ::Fork::Worker::TieScalar',
lib/AnyEvent/RabbitMQ/Fork/Worker.pm view on Meta::CPAN
= ($is_conn and ($method eq 'close' or ($method eq 'connect' and $event eq 'on_close')));
my $open_channel_success = ($method eq 'open_channel' and $event eq 'on_success');
my $guard = guard {
# inform parent process that this callback is no longer needed
AnyEvent::Fork::RPC::event(cbd => @$sig);
};
# our callback to be used by AE::RMQ
weaken(my $wself = $self);
return sub {
$guard if 0; # keepalive
$wself->clear_connection if $should_clear_connection;
my $blessed = blessed($_[0]) || 'UNIVERSAL';
if ($blessed->isa('AnyEvent::RabbitMQ') or $blessed->isa('AnyEvent::RabbitMQ::Channel')) {
# we put our sentry value in place later
my $obj = shift;
# this is our signal back to the parent as to what kind of object it was
( run in 0.709 second using v1.01-cache-2.11-cpan-65fba6d93b7 )