App-Prove-Plugin-Elasticsearch

 view release on metacpan or  search on metacpan

bin/testd  view on Meta::CPAN

our $interval        = 5;
our $global_interval = 30;

main() unless caller();

sub main {

    my $conf = App::Prove::Elasticsearch::Utils::process_configuration();

    if (!$conf->{'testd.no_daemon'}) {
        my $pid = daemonify() or die "Could not daemonize";
        print "Spawned as PID $pid\n";
    }

    my $queue = App::Prove::Elasticsearch::Utils::require_queue($conf);
    my $q     = &{\&{$queue . "::new"}}($queue);
    $conf->{'testd.max_workers'} ||= 1;

    my $platformer =
      App::Prove::Elasticsearch::Utils::require_platformer($conf);
    my $versioner = App::Prove::Elasticsearch::Utils::require_versioner($conf);

    my @provisioners;
    if (ref $conf->{'client.provisioners'} eq 'ARRAY') {
        @provisioners = @{$conf->{'client.provisioners'}}
          if ref $conf->{'client.provisioners'} eq 'ARRAY';
    } else {
        push(@provisioners, $conf->{'client.provisioners'})
          if $conf->{'client.provisioners'};
    }
    @provisioners =
      map { App::Prove::Elasticsearch::Utils::require_provisioner($_) }
      @provisioners;

    $0 = "testd - master: waiting for workers to finish";
    print "Testd - starting up...\n";

    while (1) {

        my $jobspec = {};
        $jobspec->{platforms} = &{\&{$platformer . "::get_platforms"}}();
        $jobspec->{version} =
          &{\&{$versioner . "::get_version"}}("$ENV{PWD}/t/bogus.t")
          ; #XXX this will cause trouble with other versioners & planners, I'm sure
        $jobspec->{queue_name} = $q->build_queue_name($jobspec);

        MCE::Shared->start();
        my $worker_state = MCE::Shared->share({module => 'MCE::Shared::Hash'});

        MCE::Loop::init {
            max_workers => $conf->{'testd.max_workers'},
            chunk_size  => 1,
        };

        mce_loop {
            #XXX Net::Rabbitmq is doing something incorrectly, requiring us to re-import every time we fork to avoid 'connection reset by peer'
            my $q_f = &{\&{$queue . "::new"}}($queue);

            #Ensure we have no channel overlap with single-threaded things, like the write_channel, which testd does not use
            $q->{read_channel} = 10 + MCE->wid();
            MCE->say("Worker "
                  . MCE->wid()
                  . " started, checking queue on channel $q->{read_channel}");
            worker($conf, $worker_state, $jobspec, $q_f);
        }
        1 .. $conf->{'testd.max_workers'};

        $0 = "testd - master: attempting to provision to different test target";
        print "All workers done.  Attempting to re-provision for new work...\n";
        my $result = try_to_provision(
            $versioner, $q, $jobspec->{platforms},
            @provisioners
        );
        if ($result) {
            print "System provisioned to "
              . join(" ", @{$result->{platforms}})
              . " with SUT version "
              . $result->{version}
              . ", beginning work\n";
            next;
        }
        print
          "No available test plans this system can provision to satisfy.  Waiting "
          . $global_interval
          . "s for extra work...\n";
    }
}

sub try_to_provision {
    my ($candidate, $provision_matrix) = _get_satisfiable_configuration(@_);
    return _provision($candidate, $provision_matrix);
}

sub _get_satisfiable_configuration {
    my ($versioner, $queue, $platforms, @provisioners) = @_;

    #Figure out what we *can* do
    my %provision_matrix = (
        version       => 0,
        platforms     => {},
        cur_platforms => {},
        cur_version   => &{\&{$versioner . "::get_version"}}
          ("$ENV{PWD}/t/bogus.t"),
    );
    foreach my $p (@provisioners) {
        $provision_matrix{version} = $p
          if &{\&{$p . "::can_switch_version"}}($versioner);
        $provision_matrix{platforms}->{$p} = [];
        my $platform;
        ($platform, $platforms) = &{\&{$p . "::pick_platform"}}(@$platforms);
        push(
            @{$provision_matrix{platforms}->{$p}},
            &{\&{$p . "::get_available_provision_targets"}}($platform)
        ) if $platform;    #optimization
        $provision_matrix{cur_platforms}{$p} = $platform;
    }
    $provision_matrix{unsatisfiable_platforms} = $platforms;

    #Figure out what needs doing
    my @satiable_plans = $queue->list_queues(%provision_matrix);

    #Pick one and return it.  I'm deliberately not re-queuing, as that should be taken care of already, and would result in duplicate work in distributed configurations.
    return (shift @satiable_plans, \%provision_matrix);
}

sub _provision {
    my ($candidate, $provision_matrix) = @_;

    #First, let's provision everything we can to match the provided candidate.
    foreach my $platformer (keys(%{$provision_matrix->{platforms}})) {
        my ($platform_to_provision) =
          &{\&{$platformer . "::pick_platform"}}(@{$candidate->{platforms}});
        next unless $platform_to_provision;
        next
          if $platform_to_provision eq
          $provision_matrix->{cur_platforms}->{$platformer};
        print "Provisioning system to $platform_to_provision...\n";
        &{\&{$platformer . "::provision"}}($platform_to_provision);
    }

    #Finally, see if we need to change the version of our software.
    if ($provision_matrix->{version}) {
        if ($candidate->{version} ne $provision_matrix->{cur_version}) {
            print "Provisioning SUT version to $candidate->{version}...\n";
            &{\&{$provision_matrix->{version} . "::switch_version_to"}}
              ($candidate->{version});
        }
    }
    return $candidate;
}

sub worker {
    my ($conf, $worker_state, $jobspec, $q) = @_;
    $worker_state->{MCE->wid()} = 1;
    my $msg = "testd - worker " . MCE->wid() . ":";
    $0 = "$msg starting up";

    while (1) {

        #check if every job is not busy, and if so, return so we can re-provision.
        return 1 unless sum(values(%$worker_state));

        $worker_state->{MCE->wid()} = 1;
        $0 = "$msg looking for jobs";
        my @jobs = $q->get_jobs($jobspec);
        MCE->say("Found " . scalar(@jobs) . " jobs for worker " . MCE->wid());
        if (!@jobs) {
            $worker_state->{MCE->wid()} = 0;
            $0 = "$msg waiting for jobs";
            sleep $interval;
            next;
        }
        $0 = "$msg running tests";
        my $runner = App::Prove::Elasticsearch::Utils::require_runner($conf);
        &{\&{$runner . "::run"}}($conf, @jobs);
    }
}

sub daemonify {
    my ($test_only) = @_
      ; # Flag for unit tests. Unfortunately there's not much other way I can safely test this in a unit test (that I can figure out anyways).
    $test_only //= 0;

    my $pid;
    unless ($test_only) {    # uncoverable branch true
        $pid = fork;
        exit 0 if $pid;
        exit 1 if !defined($pid);
        setsid();
        $pid = fork;
        exit 0 if $pid;
        exit 1 if not defined $pid;
        chdir '/' or die $!;
        umask 0;
        $pid = $$;
    }
    return $pid;
}

1;

__END__

=pod

=encoding UTF-8

=head1 NAME

testd - watch for tests needing running and run them

=head1 VERSION

version 0.001

=head1 USAGE

testd

Requires that you have a configured autodiscover value in your ~/elastest.conf.

You can also control behavior via the [testd] section:

    [testd]
    no_daemon=1
    max_workers=3



( run in 0.392 second using v1.01-cache-2.11-cpan-a1f116cd669 )