AnyEvent-RabbitMQ-Fork

 view release on metacpan or  search on metacpan

lib/AnyEvent/RabbitMQ/Fork/Worker.pm  view on Meta::CPAN

        }
    } else {
        $ch_id ||= '<undef>';
        push @error, "Unknown channel: '$ch_id'";
    }

    return $done->(@error);
}

my %cb_hooks = (
    channel => {
        _state      => 'is_open',
        _is_active  => 'is_active',
        _is_confirm => 'is_confirm',
    },
    connection => {
        _state             => 'is_open',
        _login_user        => 'login_user',
        _server_properties => 'server_properties',
    }
);
sub _cb_hooks {
    weaken(my $obj = shift);

    my ($type, $hooks)
      = $obj->isa('AnyEvent::RabbitMQ')
      ? ('connection', $cb_hooks{connection})
      : ($obj->id, $cb_hooks{channel});

    foreach my $prop (keys %$hooks) {
        my $method = $hooks->{$prop};
        ## no critic (Miscellanea::ProhibitTies)
        tie $obj->{$prop}, 'AnyEvent::RabbitMQ::Fork::Worker::TieScalar',
          $obj->{$prop}, sub {
            AnyEvent::Fork::RPC::event(
                i => { $type => { $method => $obj->$method } });
          };
    }

    return;
}

sub _generate_callback {
    my ($self, $method, $event, $sig) = @_;

    my $is_conn = $sig->[-1] eq 'AnyEvent::RabbitMQ::Fork';

    my $should_clear_connection
      = ($is_conn and ($method eq 'close' or ($method eq 'connect' and $event eq 'on_close')));

    my $open_channel_success = ($method eq 'open_channel' and $event eq 'on_success');

    my $guard = guard {
        # inform parent process that this callback is no longer needed
        AnyEvent::Fork::RPC::event(cbd => @$sig);
    };

    # our callback to be used by AE::RMQ
    weaken(my $wself = $self);
    return sub {
        $guard if 0;    # keepalive

        $wself->clear_connection if $should_clear_connection;

        my $blessed = blessed($_[0]) || 'UNIVERSAL';
        if ($blessed->isa('AnyEvent::RabbitMQ') or $blessed->isa('AnyEvent::RabbitMQ::Channel')) {
            # we put our sentry value in place later
            my $obj = shift;
            # this is our signal back to the parent as to what kind of object it was
            unshift @_, \[$blessed, ($obj->isa('AnyEvent::RabbitMQ::Channel') ? $obj->id : ())];

            if ($open_channel_success) {
                my $id = $obj->id;
                $obj->{"_$wself\_guard"} ||= guard {
                    # channel was GC'd by AE::RMQ
                    AnyEvent::Fork::RPC::event(chd => $id);
                };

                # needs to be done after parent registers channel
                AE::postpone { _cb_hooks($obj) };
            }

            if ($obj->isa('AnyEvent::RabbitMQ')) {
                # replace with our own handling
                $obj->{_handle}->on_drain(sub { AnyEvent::Fork::RPC::event('cdw') });
            }
        }

            # these values don't pass muster with Storable
        delete local @{ $_[0] }{ 'fh', 'on_error', 'on_drain' }
            #if $blessed->isa('AnyEvent::Handle');
            if $method eq 'connect' and $event eq 'on_failure' and $blessed->isa('AnyEvent::Handle');

        # tell the parent to run the users callback known by $sig
        AnyEvent::Fork::RPC::event(cb => $sig, @_);
    };
}

=head1 AUTHOR

William Cox <mydimension@gmail.com>

=head1 COPYRIGHT

Copyright (c) 2014, the above named author(s).

=head1 LICENSE

This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.

=cut

package    # hide from PAUSE
  AnyEvent::RabbitMQ::Fork::Worker::TieScalar;

use strict;
use warnings;

sub TIESCALAR { $_[2]->(); return bless [$_[1], $_[2]] => $_[0] }
sub FETCH { return $_[0][0] }
sub STORE { $_[0][1]->(); return $_[0][0] = $_[1] }



( run in 1.554 second using v1.01-cache-2.11-cpan-df04353d9ac )