Parallel-DataPipe
view release on metacpan or search on metacpan
t/Parallel-DataPipe.t view on Meta::CPAN
my $max_buf_size = 512 * $kb;
#printf "You may top -p%s\n",$$;sleep(2);
# constant for max processor number tests: test_processor_number & test_other_children_survive
my $number_of_data_processors = 32;
my $n_items = 4; # number of large item to process
$\="\n";
#test_old_behaviour();
test_serialized_data();
test_storable(); # test with standard serializer
test_scalar_values();
#test_processor_number();
#test_other_children_survive();
# before testing big data let's top to see memory occupied
#printf "Have a chance loot top -p%s ...\n",$$;<>;
test_large_data_send(); # this test is fork ability killer, memory expander, can't make a (lot of) fork after it
#printf "Have a chance loot top -p%s ...\n",$$;<>;
test_large_data_receive(); # this test is fork ability killer, memory expander, can't make a (lot of) fork after it
test_large_data_process(); # this test is fork ability killer, memory expander, can't make a (lot of) fork after it
test_pipeline();
print "\n***Done!\n";
exit 0;
sub test_storable {
print "\n***Testing if conveyor works ok with Storable nfreeze and thaw...\n";
eval q{use Storable;};
my @data = 1..1000;
my @processed_data = ();
Parallel::DataPipe::run {
input => [map [$_],@data],
process => sub { [$_->[0]*2] },
output => sub { push @processed_data, $_; },
freeze => \&Storable::nfreeze,
thaw => \&Storable::thaw,
};
ok(@data==@processed_data,'length of processed Storable data');
@processed_data = map $_->[0], @processed_data;
ok(join(",",map $_*2, @data) eq join(",",sort {$a <=> $b} @processed_data),"processed Storable data values");
#printf "processed data:%s\n",join ",",@processed_data;
ok(zombies() == 0,'no zombies');
}
sub test_scalar_values {
print "\n***Testing if conveyor works ok with simple scalar data...\n";
my @data = 1..10000;
my @cdata = @data;
my @processed_data = Parallel::DataPipe::run {
input => \@data,
process => sub { $_*2 },
};
ok(@data==0,'length of input queue is empty');
ok(@cdata==@processed_data,'length of processed scalar data');
ok(join(",",map $_*2, @cdata) eq join(",",sort {$a <=> $b} @processed_data),"processed scalar data values");
#printf "processed data:%s\n",join ",",@processed_data;
ok(zombies() == 0,'no zombies');
}
sub test_serialized_data {
print "\n***Testing if conveyor works ok with serizalized data...\n";
# test pipe for serialized data
my @data = map [$_],1..10;
my @processed_data = Parallel::DataPipe::run(
[@data],
sub {
[$_->[0]*2];
},
);
ok(@data==@processed_data,'length of processed serialized data');
ok(join(",",map $_->[0]*2, @data) eq join(",",sort {$a <=> $b} map $_->[0],@processed_data),"processed serialized data values");
ok(zombies() == 0,'no zombies');
}
sub test_old_behaviour {
my @data = 1..1;
print "\n*** Test if input iterator as array ref still has old behaviour...\n";
my @processed = Parallel::DataPipe::run( {
input_iterator => \@data,
process_data => sub {$_},
});
ok(@data==@processed,"input_data remains \@data untouched - only reads");
@processed = Parallel::DataPipe::run( {
input => \@data,
process_data => sub {$_},
});
ok(@data==0,"while <input> has behaviour of queue - in the end it is empty");
}
sub test_large_data_receive {
#test large data
my $large_data_size = max_buf_size(32);
my $big = sprintf("big(%dK)",$large_data_size/$kb);
print "\n***Testing if data processor receives ok $big buffer wrapped into [$n_items] array...\n";
my $large_data_buf = sprintf("%${large_data_size}s"," ");
my @data = map [$large_data_buf],1..$n_items;
my $input_length=@data;
$large_data_buf =~ s/ {8}/!!!!!!!!/;
my $time = time();
my @processed_data = Parallel::DataPipe::run {
input => \@data,
process => sub { $_->[0] =~ s/ {8}/!!!!!!!!/;my $ret = $_->[0] eq $large_data_buf?1:0;undef $_; $ret },
number_of_data_processors => 4,
};
my $elapsed = time - $time;
#print substr($large_data_buf,0,20)."#\n";
#print substr($processed_data[0][0],0,20)."#\n";
ok(@data==0,"input queue was emptied");
ok($input_length==@processed_data,"length of received $big data");
ok(grep($_ == 1,@processed_data) == $input_length,"received $big data values");
ok(zombies() == 0,'no zombies');
# try to clean memory
undef $_;
$_ = undef for $large_data_buf,map $_->[0],@data;
( run in 1.546 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )