App-Prove-Plugin-Elasticsearch
view release on metacpan or search on metacpan
our $interval = 5;
our $global_interval = 30;
main() unless caller();
sub main {
my $conf = App::Prove::Elasticsearch::Utils::process_configuration();
if (!$conf->{'testd.no_daemon'}) {
my $pid = daemonify() or die "Could not daemonize";
print "Spawned as PID $pid\n";
}
my $queue = App::Prove::Elasticsearch::Utils::require_queue($conf);
my $q = &{\&{$queue . "::new"}}($queue);
$conf->{'testd.max_workers'} ||= 1;
my $platformer =
App::Prove::Elasticsearch::Utils::require_platformer($conf);
my $versioner = App::Prove::Elasticsearch::Utils::require_versioner($conf);
my @provisioners;
if (ref $conf->{'client.provisioners'} eq 'ARRAY') {
@provisioners = @{$conf->{'client.provisioners'}}
if ref $conf->{'client.provisioners'} eq 'ARRAY';
} else {
push(@provisioners, $conf->{'client.provisioners'})
if $conf->{'client.provisioners'};
}
@provisioners =
map { App::Prove::Elasticsearch::Utils::require_provisioner($_) }
@provisioners;
$0 = "testd - master: waiting for workers to finish";
print "Testd - starting up...\n";
while (1) {
my $jobspec = {};
$jobspec->{platforms} = &{\&{$platformer . "::get_platforms"}}();
$jobspec->{version} =
&{\&{$versioner . "::get_version"}}("$ENV{PWD}/t/bogus.t")
; #XXX this will cause trouble with other versioners & planners, I'm sure
$jobspec->{queue_name} = $q->build_queue_name($jobspec);
MCE::Shared->start();
my $worker_state = MCE::Shared->share({module => 'MCE::Shared::Hash'});
MCE::Loop::init {
max_workers => $conf->{'testd.max_workers'},
chunk_size => 1,
};
mce_loop {
#XXX Net::Rabbitmq is doing something incorrectly, requiring us to re-import every time we fork to avoid 'connection reset by peer'
my $q_f = &{\&{$queue . "::new"}}($queue);
#Ensure we have no channel overlap with single-threaded things, like the write_channel, which testd does not use
$q->{read_channel} = 10 + MCE->wid();
MCE->say("Worker "
. MCE->wid()
. " started, checking queue on channel $q->{read_channel}");
worker($conf, $worker_state, $jobspec, $q_f);
}
1 .. $conf->{'testd.max_workers'};
$0 = "testd - master: attempting to provision to different test target";
print "All workers done. Attempting to re-provision for new work...\n";
my $result = try_to_provision(
$versioner, $q, $jobspec->{platforms},
@provisioners
);
if ($result) {
print "System provisioned to "
. join(" ", @{$result->{platforms}})
. " with SUT version "
. $result->{version}
. ", beginning work\n";
next;
}
print
"No available test plans this system can provision to satisfy. Waiting "
. $global_interval
. "s for extra work...\n";
}
}
sub try_to_provision {
my ($candidate, $provision_matrix) = _get_satisfiable_configuration(@_);
return _provision($candidate, $provision_matrix);
}
sub _get_satisfiable_configuration {
my ($versioner, $queue, $platforms, @provisioners) = @_;
#Figure out what we *can* do
my %provision_matrix = (
version => 0,
platforms => {},
cur_platforms => {},
cur_version => &{\&{$versioner . "::get_version"}}
("$ENV{PWD}/t/bogus.t"),
);
foreach my $p (@provisioners) {
$provision_matrix{version} = $p
if &{\&{$p . "::can_switch_version"}}($versioner);
$provision_matrix{platforms}->{$p} = [];
my $platform;
($platform, $platforms) = &{\&{$p . "::pick_platform"}}(@$platforms);
push(
@{$provision_matrix{platforms}->{$p}},
&{\&{$p . "::get_available_provision_targets"}}($platform)
) if $platform; #optimization
$provision_matrix{cur_platforms}{$p} = $platform;
}
$provision_matrix{unsatisfiable_platforms} = $platforms;
#Figure out what needs doing
my @satiable_plans = $queue->list_queues(%provision_matrix);
#Pick one and return it. I'm deliberately not re-queuing, as that should be taken care of already, and would result in duplicate work in distributed configurations.
return (shift @satiable_plans, \%provision_matrix);
}
sub _provision {
my ($candidate, $provision_matrix) = @_;
#First, let's provision everything we can to match the provided candidate.
foreach my $platformer (keys(%{$provision_matrix->{platforms}})) {
my ($platform_to_provision) =
&{\&{$platformer . "::pick_platform"}}(@{$candidate->{platforms}});
next unless $platform_to_provision;
next
if $platform_to_provision eq
$provision_matrix->{cur_platforms}->{$platformer};
print "Provisioning system to $platform_to_provision...\n";
&{\&{$platformer . "::provision"}}($platform_to_provision);
}
#Finally, see if we need to change the version of our software.
if ($provision_matrix->{version}) {
if ($candidate->{version} ne $provision_matrix->{cur_version}) {
print "Provisioning SUT version to $candidate->{version}...\n";
&{\&{$provision_matrix->{version} . "::switch_version_to"}}
($candidate->{version});
}
}
return $candidate;
}
sub worker {
my ($conf, $worker_state, $jobspec, $q) = @_;
$worker_state->{MCE->wid()} = 1;
my $msg = "testd - worker " . MCE->wid() . ":";
$0 = "$msg starting up";
while (1) {
#check if every job is not busy, and if so, return so we can re-provision.
return 1 unless sum(values(%$worker_state));
$worker_state->{MCE->wid()} = 1;
$0 = "$msg looking for jobs";
my @jobs = $q->get_jobs($jobspec);
MCE->say("Found " . scalar(@jobs) . " jobs for worker " . MCE->wid());
if (!@jobs) {
$worker_state->{MCE->wid()} = 0;
$0 = "$msg waiting for jobs";
sleep $interval;
next;
}
$0 = "$msg running tests";
my $runner = App::Prove::Elasticsearch::Utils::require_runner($conf);
&{\&{$runner . "::run"}}($conf, @jobs);
}
}
sub daemonify {
my ($test_only) = @_
; # Flag for unit tests. Unfortunately there's not much other way I can safely test this in a unit test (that I can figure out anyways).
$test_only //= 0;
my $pid;
unless ($test_only) { # uncoverable branch true
$pid = fork;
exit 0 if $pid;
exit 1 if !defined($pid);
setsid();
$pid = fork;
exit 0 if $pid;
exit 1 if not defined $pid;
chdir '/' or die $!;
umask 0;
$pid = $$;
}
return $pid;
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
testd - watch for tests needing running and run them
=head1 VERSION
version 0.001
=head1 USAGE
testd
Requires that you have a configured autodiscover value in your ~/elastest.conf.
You can also control behavior via the [testd] section:
[testd]
no_daemon=1
max_workers=3
( run in 0.392 second using v1.01-cache-2.11-cpan-a1f116cd669 )