App-Prove-Plugin-Elasticsearch
view release on metacpan or search on metacpan
lib/App/Prove/Elasticsearch/Queue/Rabbit.pm view on Meta::CPAN
package App::Prove::Elasticsearch::Queue::Rabbit;
$App::Prove::Elasticsearch::Queue::Rabbit::VERSION = '0.001';
# PODNAME: App::Prove::Elasticsearch::Queue::Rabbit;
# ABSTRACT: Coordinate the running of test plans across multiple instances via RabbitMQ.
use strict;
use warnings;
use parent qw{App::Prove::Elasticsearch::Queue::Default};
use Net::RabbitMQ;
use JSON::MaybeXS;
sub new {
my ($class, $input) = @_;
my $self = $class->SUPER::new($input);
#Connect to rabbit
$self->{mq} = Net::RabbitMQ->new();
$self->{config}->{'queue.exchange'} ||= 'testsuite';
#Allow callers to overwrite this to prevent double-usage of channels
$self->{write_channel} = 1;
$self->{read_channel} = 2;
my $port =
$self->{config}->{'queue.port'}
? ':' . $self->{config}->{'queue.port'}
: '';
die("queue.host must be specified") unless $self->{config}->{'queue.host'};
my $serveraddress = "$self->{config}->{'queue.host'}$port";
$self->{mq}->connect(
$serveraddress,
{
user => $self->{config}->{'queue.user'},
password => $self->{config}->{'queue.password'}
}
);
return $self;
}
sub queue_jobs {
my ($self, @jobs_to_queue) = @_;
$self->{mq}->channel_open($self->{write_channel});
my $options =
$self->{config}->{'queue.exchange'}
? {exchange => $self->{config}->{'queue.exchange'}}
: undef;
foreach my $job (@jobs_to_queue) {
$job->{queue_name} = $self->build_queue_name($job);
#Publish each plan to it's own queue, and the name of this queue that needs work to the 'queues needing work' queue
$self->{mq}->exchange_declare(
$self->{write_channel},
$self->{config}->{'queue.exchange'}, {auto_delete => 0,}
);
$self->{mq}->queue_declare(
$self->{write_channel}, $job->{queue_name},
{auto_delete => 0}
);
$self->{mq}->queue_bind(
$self->{write_channel}, $job->{queue_name},
$self->{config}->{'queue.exchange'}, $job->{queue_name}
);
#queue a test to a queue for the same version/platform/etc
@{$job->{tests}} =
&{\&{$self->{planner} . "::find_test_paths"}}(@{$job->{tests}});
#filter jobs by what is already done if this is a re-queue for optimization's sake
if ($self->{requeue}) {
$self->{searcher} = $self->_get_searcher();
( run in 1.351 second using v1.01-cache-2.11-cpan-98e64b0badf )