Parallel-DataPipe

 view release on metacpan or  search on metacpan

t/Parallel-DataPipe.t  view on Meta::CPAN

my $max_buf_size = 512 * $kb;


#printf "You may top -p%s\n",$$;sleep(2);

# constant for max processor number tests: test_processor_number & test_other_children_survive
my $number_of_data_processors = 32;
my $n_items = 4; # number of large item to process

$\="\n";

#test_old_behaviour();
test_serialized_data();
test_storable(); # test with standard serializer
test_scalar_values();

#test_processor_number();
#test_other_children_survive();

# before testing big data let's top to see memory occupied
#printf "Have a chance loot top -p%s ...\n",$$;<>;

test_large_data_send(); # this test is fork ability killer, memory expander, can't make a (lot of) fork after it

#printf "Have a chance loot top -p%s ...\n",$$;<>;
test_large_data_receive(); # this test is fork ability killer, memory expander, can't make a (lot of) fork after it

test_large_data_process(); # this test is fork ability killer, memory expander, can't make a (lot of) fork after it
test_pipeline();

print "\n***Done!\n";

exit 0;

sub test_storable {
    print "\n***Testing if conveyor works ok with Storable nfreeze and thaw...\n";
    eval q{use Storable;};
    my @data = 1..1000;
    my @processed_data = ();
    Parallel::DataPipe::run {
        input => [map [$_],@data],
        process => sub { [$_->[0]*2] },
        output => sub { push @processed_data, $_; },
        freeze => \&Storable::nfreeze,
        thaw => \&Storable::thaw,
    };

    ok(@data==@processed_data,'length of processed Storable data');
    @processed_data = map $_->[0], @processed_data;
    ok(join(",",map $_*2, @data) eq join(",",sort {$a <=> $b} @processed_data),"processed Storable data values");
    #printf "processed data:%s\n",join ",",@processed_data;
    ok(zombies() == 0,'no zombies');

}



sub test_scalar_values {
    print "\n***Testing if conveyor works ok with simple scalar data...\n";
    my @data = 1..10000;
    my @cdata = @data;
    my @processed_data = Parallel::DataPipe::run {
        input => \@data,
        process => sub { $_*2 },
    };

    ok(@data==0,'length of input queue is empty');
    ok(@cdata==@processed_data,'length of processed scalar data');
    ok(join(",",map $_*2, @cdata) eq join(",",sort {$a <=> $b} @processed_data),"processed scalar data values");
    #printf "processed data:%s\n",join ",",@processed_data;
    ok(zombies() == 0,'no zombies');
}

sub test_serialized_data {
    print "\n***Testing if conveyor works ok with serizalized data...\n";
    # test pipe for serialized data
    my @data = map [$_],1..10;
    my @processed_data = Parallel::DataPipe::run(
        [@data],
        sub {
            [$_->[0]*2];
        },
    );

    ok(@data==@processed_data,'length of processed serialized data');
    ok(join(",",map $_->[0]*2, @data) eq join(",",sort {$a <=> $b} map $_->[0],@processed_data),"processed serialized data values");
    ok(zombies() == 0,'no zombies');
}

sub test_old_behaviour {
    my @data = 1..1;
    print "\n*** Test if input iterator as array ref still has old behaviour...\n";
    my @processed = Parallel::DataPipe::run( {
        input_iterator => \@data,
        process_data => sub {$_},
    });
    ok(@data==@processed,"input_data remains \@data untouched - only reads");
    @processed = Parallel::DataPipe::run( {
        input => \@data,
        process_data => sub {$_},
    });
    ok(@data==0,"while <input> has behaviour of queue - in the end it is empty");
}

sub test_large_data_receive {
    #test large data
    my $large_data_size = max_buf_size(32);
    my $big = sprintf("big(%dK)",$large_data_size/$kb);
    print "\n***Testing if data processor receives ok $big buffer wrapped into [$n_items] array...\n";
    my $large_data_buf = sprintf("%${large_data_size}s"," ");
    my @data = map [$large_data_buf],1..$n_items;
    my $input_length=@data;
    $large_data_buf =~ s/ {8}/!!!!!!!!/;
    my $time = time();
    my @processed_data = Parallel::DataPipe::run {
        input => \@data,
        process => sub { $_->[0] =~ s/ {8}/!!!!!!!!/;my $ret = $_->[0] eq $large_data_buf?1:0;undef $_; $ret },
        number_of_data_processors => 4,
    };
    my $elapsed = time - $time;
    #print substr($large_data_buf,0,20)."#\n";
    #print substr($processed_data[0][0],0,20)."#\n";
    ok(@data==0,"input queue was emptied");
    ok($input_length==@processed_data,"length of received $big data");
    ok(grep($_ == 1,@processed_data) == $input_length,"received $big data values");
    ok(zombies() == 0,'no zombies');
    # try to clean memory
    undef $_;
    $_ = undef for $large_data_buf,map $_->[0],@data;



( run in 1.546 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )