Parallel-MapReduce

 view release on metacpan or  search on metacpan

lib/Parallel/MapReduce.pm  view on Meta::CPAN


  SLICING:
    my $slices = Hslice ($h1, scalar @{ $self->{_workers} });                # slice the hash into equal parts (as many workers as there are)
    $log->debug ("master sliced ".Dumper $slices) if $log->is_debug;

    my @keys;                                                                # this will be filled in the map phase below
  MAPPHASE:
    while (my $sl4ws = _slices ([ keys %$slices ], $self->{_workers}) ) {    # compute unresolved slices versus operational workers
	if (my ($k) = keys %$slices) {                                       # there is one unhandled
    
	    if (my ($w) = grep { ! defined $_->{thread} }                    # find a non-busy worker
		          @{ $self->{_workers}} ) {                          # from the operational workers
#warn "found free ".$w->{host};
		$w->{slice}  = delete $slices->{$k};                         # task it with slice,  take it off the list for now
                my @chunks = threads->create ({'context' => 'list'},
					      'chunk_n_store',
					      $memd, $w->{slice}, 
					      $job, 1000)->join;             # distribute hash slice over memcacheds
#warn "thread chunks ".Dumper \@chunks;

		$w->{thread} = threads->create (ref ($w).'::map',

lib/Parallel/MapReduce.pm  view on Meta::CPAN

    $log->debug ("master: all keys after mappers ".Dumper \@keys) if $log->is_debug;
  RESHUFFLING:
    my $Rs = balance_keys (\@keys, $job, scalar @{ $self->{_workers} });     # slice the keys into 'equal' groups
    $log->debug ("master: after balancing ".Dumper $Rs) if $log->is_debug;

    my @chunks;
  REDUCEPHASE:
    while (my $rs4ws = _slices ([ keys %$Rs ], $self->{_workers}) ) {
	if (my ($r) = keys %$Rs) {

	    if (my ($w) = grep { ! defined $_->{thread} }                    # find a non-busy worker
		          @{ $self->{_workers}} ) {                          # from the operational workers
#warn "reduce: found free ".$w->{host};
                $w->{slice}  = delete $Rs->{$r}; 

                $w->{thread} = threads->create (ref ($w).'::reduce',
						$w,
						$w->{slice},
						$self->{MemCacheds}, 
						$job);
	    } else {                     



( run in 0.322 second using v1.01-cache-2.11-cpan-3cd7ad12f66 )