AnyEvent-RabbitMQ-Fork

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

Revision history for AnyEvent-RabbitMQ-Fork

0.6       2020-07-26 14:02:30-04:00 America/New_York
    - fix operator typo in _generate_callback
    - redundant code cleanup
    - simplify logic in _generate_callback
    - make ::Channel->delegate private
    - safer rpc callbacks with weakened self
    - update test suite from AnyEent::RabbitMQ with mods to get it working again

0.5       2014-12-28 17:38:10-05:00 America/New_York
    - update test suite to align with AnyEvent::RabbitMQ
    - support bind_exchange & unbind_exchange methods
    - in Worker: generate stand-in callbacks in a separate method

0.4       2014-06-06 16:43:00-04:00 America/New_York
    - improve handling object state attirbutes being reported to parent

lib/AnyEvent/RabbitMQ/Fork.pm  view on Meta::CPAN

# ABSTRACT: Run AnyEvent::RabbitMQ inside AnyEvent::Fork(::RPC)

=head1 NAME

AnyEvent::RabbitMQ::Fork - Run AnyEvent::RabbitMQ inside AnyEvent::Fork(::RPC)

=cut

use Moo;
use Types::Standard qw(CodeRef Str HashRef InstanceOf Bool Object);
use Scalar::Util qw(weaken);
use Carp qw(croak);
use File::ShareDir qw(dist_file);

use constant DEFAULT_AMQP_SPEC =>
  dist_file('AnyEvent-RabbitMQ', 'fixed_amqp0-9-1.xml');

use namespace::clean;

use AnyEvent::Fork;
use AnyEvent::Fork::RPC;

lib/AnyEvent/RabbitMQ/Fork.pm  view on Meta::CPAN

has rpc => (
    is        => 'lazy',
    isa       => CodeRef,
    predicate => 1,
    clearer   => 1,
    init_arg  => undef,
);

sub _build_rpc {
    my $self = shift;
    weaken(my $wself = $self);

    return AnyEvent::Fork->new          #
      ->require($self->worker_class)    #
      ->send_arg($self->worker_class, verbose => $self->verbose)    #
      ->AnyEvent::Fork::RPC::run(
        $self->worker_function,
        async      => 1,
        serialiser => $AnyEvent::Fork::RPC::STORABLE_SERIALISER,
        on_event   => sub { $wself && $wself->_on_event(@_) },
        on_error   => sub { $wself && $wself->_on_error(@_) },

lib/AnyEvent/RabbitMQ/Fork/Worker.pm  view on Meta::CPAN


=head1 DESCRIPTION

No user serviceable parts inside. Venture at your own risk.

=cut

use Moo;
use Types::Standard qw(InstanceOf Bool);
use Guard;
use Scalar::Util qw(weaken blessed);

use namespace::clean;

use AnyEvent::RabbitMQ 1.18;

has verbose => (is => 'rw', isa => Bool, default => 0);

has connection => (
    is      => 'lazy',
    isa     => InstanceOf['AnyEvent::RabbitMQ'],

lib/AnyEvent/RabbitMQ/Fork/Worker.pm  view on Meta::CPAN


sub init {
    my $class = shift;
    $instance = $class->new(@_);
    return;
}

sub run {
    my ($done, $method, $ch_id, @args, %args) = @_;

    weaken(my $self = $instance);

    unless (@args % 2) {
        %args = @args;
        @args = ();
        foreach my $event (grep { /^on_/ } keys %args) {
            # callback signature provided by parent process
            my $sig = delete $args{$event};

            # our callback to be used by AE::RMQ
            $args{$event} = $self->_generate_callback($method, $event, $sig);

lib/AnyEvent/RabbitMQ/Fork/Worker.pm  view on Meta::CPAN

        _is_active  => 'is_active',
        _is_confirm => 'is_confirm',
    },
    connection => {
        _state             => 'is_open',
        _login_user        => 'login_user',
        _server_properties => 'server_properties',
    }
);
sub _cb_hooks {
    weaken(my $obj = shift);

    my ($type, $hooks)
      = $obj->isa('AnyEvent::RabbitMQ')
      ? ('connection', $cb_hooks{connection})
      : ($obj->id, $cb_hooks{channel});

    foreach my $prop (keys %$hooks) {
        my $method = $hooks->{$prop};
        ## no critic (Miscellanea::ProhibitTies)
        tie $obj->{$prop}, 'AnyEvent::RabbitMQ::Fork::Worker::TieScalar',

lib/AnyEvent/RabbitMQ/Fork/Worker.pm  view on Meta::CPAN

      = ($is_conn and ($method eq 'close' or ($method eq 'connect' and $event eq 'on_close')));

    my $open_channel_success = ($method eq 'open_channel' and $event eq 'on_success');

    my $guard = guard {
        # inform parent process that this callback is no longer needed
        AnyEvent::Fork::RPC::event(cbd => @$sig);
    };

    # our callback to be used by AE::RMQ
    weaken(my $wself = $self);
    return sub {
        $guard if 0;    # keepalive

        $wself->clear_connection if $should_clear_connection;

        my $blessed = blessed($_[0]) || 'UNIVERSAL';
        if ($blessed->isa('AnyEvent::RabbitMQ') or $blessed->isa('AnyEvent::RabbitMQ::Channel')) {
            # we put our sentry value in place later
            my $obj = shift;
            # this is our signal back to the parent as to what kind of object it was



( run in 0.709 second using v1.01-cache-2.11-cpan-65fba6d93b7 )