Parallel-MapReduce
view release on metacpan or search on metacpan
lib/Parallel/MapReduce.pm view on Meta::CPAN
SLICING:
my $slices = Hslice ($h1, scalar @{ $self->{_workers} }); # slice the hash into equal parts (as many workers as there are)
$log->debug ("master sliced ".Dumper $slices) if $log->is_debug;
my @keys; # this will be filled in the map phase below
MAPPHASE:
while (my $sl4ws = _slices ([ keys %$slices ], $self->{_workers}) ) { # compute unresolved slices versus operational workers
if (my ($k) = keys %$slices) { # there is one unhandled
if (my ($w) = grep { ! defined $_->{thread} } # find a non-busy worker
@{ $self->{_workers}} ) { # from the operational workers
#warn "found free ".$w->{host};
$w->{slice} = delete $slices->{$k}; # task it with slice, take it off the list for now
my @chunks = threads->create ({'context' => 'list'},
'chunk_n_store',
$memd, $w->{slice},
$job, 1000)->join; # distribute hash slice over memcacheds
#warn "thread chunks ".Dumper \@chunks;
$w->{thread} = threads->create (ref ($w).'::map',
lib/Parallel/MapReduce.pm view on Meta::CPAN
$log->debug ("master: all keys after mappers ".Dumper \@keys) if $log->is_debug;
RESHUFFLING:
my $Rs = balance_keys (\@keys, $job, scalar @{ $self->{_workers} }); # slice the keys into 'equal' groups
$log->debug ("master: after balancing ".Dumper $Rs) if $log->is_debug;
my @chunks;
REDUCEPHASE:
while (my $rs4ws = _slices ([ keys %$Rs ], $self->{_workers}) ) {
if (my ($r) = keys %$Rs) {
if (my ($w) = grep { ! defined $_->{thread} } # find a non-busy worker
@{ $self->{_workers}} ) { # from the operational workers
#warn "reduce: found free ".$w->{host};
$w->{slice} = delete $Rs->{$r};
$w->{thread} = threads->create (ref ($w).'::reduce',
$w,
$w->{slice},
$self->{MemCacheds},
$job);
} else {
( run in 0.322 second using v1.01-cache-2.11-cpan-3cd7ad12f66 )