App-Prove-Plugin-Elasticsearch

 view release on metacpan or  search on metacpan

lib/App/Prove/Elasticsearch/Queue/Rabbit.pm  view on Meta::CPAN

package App::Prove::Elasticsearch::Queue::Rabbit;
$App::Prove::Elasticsearch::Queue::Rabbit::VERSION = '0.001';

# PODNAME: App::Prove::Elasticsearch::Queue::Rabbit;
# ABSTRACT: Coordinate the running of test plans across multiple instances via RabbitMQ.

use strict;
use warnings;

use parent qw{App::Prove::Elasticsearch::Queue::Default};

use Net::RabbitMQ;
use JSON::MaybeXS;

sub new {
    my ($class, $input) = @_;
    my $self = $class->SUPER::new($input);

    #Connect to rabbit
    $self->{mq} = Net::RabbitMQ->new();
    $self->{config}->{'queue.exchange'} ||= 'testsuite';

    #Allow callers to overwrite this to prevent double-usage of channels
    $self->{write_channel} = 1;
    $self->{read_channel}  = 2;

    my $port =
      $self->{config}->{'queue.port'}
      ? ':' . $self->{config}->{'queue.port'}
      : '';
    die("queue.host must be specified") unless $self->{config}->{'queue.host'};
    my $serveraddress = "$self->{config}->{'queue.host'}$port";

    $self->{mq}->connect(
        $serveraddress,
        {
            user     => $self->{config}->{'queue.user'},
            password => $self->{config}->{'queue.password'}
        }
    );

    return $self;
}

sub queue_jobs {
    my ($self, @jobs_to_queue) = @_;
    $self->{mq}->channel_open($self->{write_channel});

    my $options =
      $self->{config}->{'queue.exchange'}
      ? {exchange => $self->{config}->{'queue.exchange'}}
      : undef;
    foreach my $job (@jobs_to_queue) {
        $job->{queue_name} = $self->build_queue_name($job);

        #Publish each plan to it's own queue, and the name of this queue that needs work to the 'queues needing work' queue
        $self->{mq}->exchange_declare(
            $self->{write_channel},
            $self->{config}->{'queue.exchange'}, {auto_delete => 0,}
        );
        $self->{mq}->queue_declare(
            $self->{write_channel}, $job->{queue_name},
            {auto_delete => 0}
        );
        $self->{mq}->queue_bind(
            $self->{write_channel},              $job->{queue_name},
            $self->{config}->{'queue.exchange'}, $job->{queue_name}
        );

        #queue a test to a queue for the same version/platform/etc

        @{$job->{tests}} =
          &{\&{$self->{planner} . "::find_test_paths"}}(@{$job->{tests}});

        #filter jobs by what is already done if this is a re-queue for optimization's sake
        if ($self->{requeue}) {
            $self->{searcher} = $self->_get_searcher();



( run in 1.351 second using v1.01-cache-2.11-cpan-98e64b0badf )