AnyEvent-RabbitMQ-Fork

 view release on metacpan or  search on metacpan

lib/AnyEvent/RabbitMQ/Fork.pm  view on Meta::CPAN

has channel_class   => (is => 'lazy', isa => Str);
has worker_function => (is => 'lazy', isa => Str);
has init_function   => (is => 'lazy', isa => Str);

sub _build_worker_class    { return __PACKAGE__ . '::Worker' }
sub _build_channel_class   { return __PACKAGE__ . '::Channel' }
sub _build_worker_function { return $_[0]->worker_class . '::run' }
sub _build_init_function   { return $_[0]->worker_class . '::init' }

has _drain_cv => (is => 'lazy', isa => Object, predicate => 1, clearer => 1);

sub _build__drain_cv { return AE::cv }

has channels => (
    is      => 'ro',
    isa     => HashRef [InstanceOf ['AnyEvent::RabbitMQ::Fork::Channel']],
    clearer => 1,
    default  => sub { {} },
    init_arg => undef,
);

has cb_registry => (
    is       => 'ro',
    isa      => HashRef,
    default  => sub { {} },
    clearer  => 1,
    init_arg => undef,
);

has rpc => (
    is        => 'lazy',
    isa       => CodeRef,
    predicate => 1,
    clearer   => 1,
    init_arg  => undef,
);

sub _build_rpc {
    my $self = shift;
    weaken(my $wself = $self);

    return AnyEvent::Fork->new          #
      ->require($self->worker_class)    #
      ->send_arg($self->worker_class, verbose => $self->verbose)    #
      ->AnyEvent::Fork::RPC::run(
        $self->worker_function,
        async      => 1,
        serialiser => $AnyEvent::Fork::RPC::STORABLE_SERIALISER,
        on_event   => sub { $wself && $wself->_on_event(@_) },
        on_error   => sub { $wself && $wself->_on_error(@_) },
        on_destroy => sub { $wself && $wself->_on_destroy(@_) },
        init       => $self->init_function,
        # TODO look into
        #done => '',
      );
}

=head1 DESCRIPTION

This module is mean't to be a close to a drop-in facade for running
L<AnyEvent::RabbitMQ> in a background process via L<AnyEvent::Fork::RPC>.

Tha main use case is for programs where other operations block with little
control due to difficulty/laziness. In this way, the process hosting the
connection RabbitMQ is doing nothing else but processing messages.

=cut

my $cb_id = 'a';    # textual ++ gives a bigger space than numerical ++

sub _delegate {
    my ($self, $method, $ch_id, @args, %args) = @_;

    unless (@args % 2) {
        %args = @args;
        @args = ();
        foreach my $event (grep { /^on_/ } keys %args) {
            my $id = $cb_id++;

            # store the user callback
            $self->cb_registry->{$id} = delete $args{$event};

            # create a signature to send back to on_event
            $args{$event} = [$id, $event, $method, scalar caller];
        }
    }

    $self->rpc->(
        $method, $ch_id,
        (@args ? @args : %args),
        sub {
            croak @_ if @_;
        }
    );

    return $self;
}

=head1 CONSTRCTOR

    my $ar = AnyEvent::RabbitMQ::Fork->new();

=head2 Options

=over

=item verbose [Bool]

Prints a LOT of debugging information to C<STDOUT>.

=back

=cut

before verbose => sub {
    return if @_ < 2;
    $_[0]->_delegate(verbose => 0, $_[1]);
};

=head1 METHODS



( run in 2.115 seconds using v1.01-cache-2.11-cpan-d8267643d1d )