AnyEvent-RabbitMQ-Fork
view release on metacpan or search on metacpan
lib/AnyEvent/RabbitMQ/Fork.pm view on Meta::CPAN
has channel_class => (is => 'lazy', isa => Str);
has worker_function => (is => 'lazy', isa => Str);
has init_function => (is => 'lazy', isa => Str);
sub _build_worker_class { return __PACKAGE__ . '::Worker' }
sub _build_channel_class { return __PACKAGE__ . '::Channel' }
sub _build_worker_function { return $_[0]->worker_class . '::run' }
sub _build_init_function { return $_[0]->worker_class . '::init' }
has _drain_cv => (is => 'lazy', isa => Object, predicate => 1, clearer => 1);
sub _build__drain_cv { return AE::cv }
has channels => (
is => 'ro',
isa => HashRef [InstanceOf ['AnyEvent::RabbitMQ::Fork::Channel']],
clearer => 1,
default => sub { {} },
init_arg => undef,
);
has cb_registry => (
is => 'ro',
isa => HashRef,
default => sub { {} },
clearer => 1,
init_arg => undef,
);
has rpc => (
is => 'lazy',
isa => CodeRef,
predicate => 1,
clearer => 1,
init_arg => undef,
);
sub _build_rpc {
my $self = shift;
weaken(my $wself = $self);
return AnyEvent::Fork->new #
->require($self->worker_class) #
->send_arg($self->worker_class, verbose => $self->verbose) #
->AnyEvent::Fork::RPC::run(
$self->worker_function,
async => 1,
serialiser => $AnyEvent::Fork::RPC::STORABLE_SERIALISER,
on_event => sub { $wself && $wself->_on_event(@_) },
on_error => sub { $wself && $wself->_on_error(@_) },
on_destroy => sub { $wself && $wself->_on_destroy(@_) },
init => $self->init_function,
# TODO look into
#done => '',
);
}
=head1 DESCRIPTION
This module is mean't to be a close to a drop-in facade for running
L<AnyEvent::RabbitMQ> in a background process via L<AnyEvent::Fork::RPC>.
Tha main use case is for programs where other operations block with little
control due to difficulty/laziness. In this way, the process hosting the
connection RabbitMQ is doing nothing else but processing messages.
=cut
my $cb_id = 'a'; # textual ++ gives a bigger space than numerical ++
sub _delegate {
my ($self, $method, $ch_id, @args, %args) = @_;
unless (@args % 2) {
%args = @args;
@args = ();
foreach my $event (grep { /^on_/ } keys %args) {
my $id = $cb_id++;
# store the user callback
$self->cb_registry->{$id} = delete $args{$event};
# create a signature to send back to on_event
$args{$event} = [$id, $event, $method, scalar caller];
}
}
$self->rpc->(
$method, $ch_id,
(@args ? @args : %args),
sub {
croak @_ if @_;
}
);
return $self;
}
=head1 CONSTRCTOR
my $ar = AnyEvent::RabbitMQ::Fork->new();
=head2 Options
=over
=item verbose [Bool]
Prints a LOT of debugging information to C<STDOUT>.
=back
=cut
before verbose => sub {
return if @_ < 2;
$_[0]->_delegate(verbose => 0, $_[1]);
};
=head1 METHODS
( run in 2.115 seconds using v1.01-cache-2.11-cpan-d8267643d1d )