App-MtAws
view release on metacpan or search on metacpan
t/unit/queue_job/iterator.t view on Meta::CPAN
sub action_str
{
"abc".join('', (map { sprintf("%04d", $_) } @_));
}
sub create_iterator
{
my ($maxcnt, $cnt, $jobs_count, $cb, @actions) = @_;
my @orig_parts = do {
if ($jobs_count == 1) {
map {
my $a = action_str(@actions, $_);
my $x = "x$a";
SimpleJob->new(action => $a, n => $x)
} (1..$cnt);
} else {
map { create_iterator($jobs_count+1, $jobs_count, 1, undef, @actions, $_) } (1..$cnt);
}
};
App::MtAws::QueueJob::Iterator->new(maxcnt => $maxcnt, iterator => sub {
$cb->() if $cb;
@orig_parts ? shift @orig_parts : ()
});
}
sub test_case_early_finish
{
my ($maxcnt, $cnt, $jobs_count) = @_;
my $live_counter = 0;
my @live_counter_log;
my $itt = create_iterator($maxcnt, $cnt, $jobs_count, sub { ++$live_counter });
my @actions;
while (1) {
my $r = $itt->next;
push @live_counter_log, $live_counter;
ok $r->{code} eq JOB_OK || $r->{code} eq JOB_DONE;
last if $r->{code} eq JOB_DONE;
push @actions, $r->{task}{action};
$r->{task}{cb_task_proxy}->("somedata1");
}
cmp_deeply [@live_counter_log], [map { $_ } 1..$cnt+1], "should not call itterator for all jobs at once";
cmp_deeply [sort @actions], [sort map { action_str($_) } 1..$cnt], "test it works when callback called immediately";
}
sub test_late_finish
{
my ($maxcnt, $cnt, $jobs_count) = @_;
my $live_counter = 0;
my @live_counter_log;
my $itt = create_iterator($maxcnt, $cnt, $jobs_count, sub { ++$live_counter });
my @actions = ();
my @passes;
while (@actions < $cnt) {
my @callbacks = ();
my $r;
while (1) {
$r = $itt->next;
push @live_counter_log, $live_counter;
ok $r->{code} eq JOB_OK || $r->{code} eq JOB_WAIT;
last if $r->{code} eq JOB_WAIT;
push @actions, $r->{task}{action};
push @callbacks, $r->{task}{cb_task_proxy};
}
if ($r->{code} eq JOB_WAIT) {
push @passes, scalar @callbacks;
$_->("somedata2") for @callbacks;
next;
}
}
cmp_deeply [sort @actions], [sort map { action_str($_) } 1..$cnt];
#print Dumper $maxcnt, $cnt, \@live_counter_log;
if ($cnt % $maxcnt) {
cmp_deeply {map { $_ => 1 } @live_counter_log}, {map { $_ => 1 } 1..$cnt+1}, "should not call itterator for all jobs at once";
} else {
cmp_deeply {map { $_ => 1 } @live_counter_log}, {map { $_ => 1 } 1..$cnt }, "should not call itterator for all jobs at once";
}
is pop @passes, $cnt % $maxcnt, "last pass should contain cnt mod maxcnt items" if ($cnt % $maxcnt);
is $_, $maxcnt, "all passes excapt last should contain maxcnt items (if more than one pass)" for (@passes);
is $itt->next->{code}, JOB_DONE, "test it works when callback called later";
is $live_counter, $cnt+1;
}
sub test_random_finish
{
my ($maxcnt, $cnt, $jobs_count, $nworkers) = @_;
my $itt = create_iterator($maxcnt, $cnt, $jobs_count);
my $q = QE->new(n => $nworkers);
$q->process($itt);
for (@{ $q->{res} }) {
ok $_->{action} =~ /^abc(\d{4})/;
is $_->{data}[0], 'x';
ok $_->{data}[1] =~ /^xabc$1/;
is scalar @{ $_->{data} }, 2;
}
if ($jobs_count == 1) {
cmp_deeply [sort map { $_->{action} } @{ $q->{res} }], [sort map { action_str($_) } 1..$cnt];
} else {
is scalar @{ $q->{res} }, $cnt*$jobs_count;
}
}
plan_tests 928 => sub {
ok ! eval { App::MtAws::QueueJob::Iterator->new(maxcnt => 30); 1; };
like $@, qr/iterator required/;
{
my $itt = App::MtAws::QueueJob::Iterator->new(maxcnt => 20, iterator => sub {});
is $itt->{maxcnt}, 20, "one should be able to override maxcnt";
$itt = App::MtAws::QueueJob::Iterator->new(iterator => sub {});
is $itt->{maxcnt}, 30, "default maxcnt should be 30";
}
my $maxcnt = 7;
lcg_srand 777654 => sub {
alarm 180;
for my $n (0, 1, 2, 5, $maxcnt - 1, $maxcnt, $maxcnt+1, $maxcnt*2, $maxcnt*2+1, $maxcnt*3, $maxcnt*3-1) {
test_case_early_finish($maxcnt, $n, 1);
test_late_finish($maxcnt, $n, 1);
}
( run in 1.300 second using v1.01-cache-2.11-cpan-d8267643d1d )