Parallel-MapReduce

 view release on metacpan or  search on metacpan

lib/Parallel/MapReduce.pm  view on Meta::CPAN

=pod

=head2 Methods

=over

=item B<shutdown>

I<$mri>->shutdown

Especially when you use the SSH workers you should make sure that you terminate them properly. So
better run this method if you do not want to have plenty of SSH sessions being left over.

=cut

sub shutdown {
    my $self = shift;
    map { $_->shutdown } @{ $self->{_workers} };
}

sub _slices {
    $_ = scalar @{$_[0]} . ':' . scalar grep { $_->{slice} } @{$_[1]};
    return '' if $_ eq '0:0';
    return $_;
}

=pod

=item B<mapreduce>

I<$A> = I<$mri>->mapreduce (I<$map_coderef>, I<$reduce_coderef>, I<$B>)

This method applies to hash (reference) C<$B> the MR algorithm. You have to pass in CODE refs to the
map and the reduce function. The result a reference to a hash.

=cut

sub mapreduce {
    my $self    = shift;
    #--
    my $map    = shift;                                                      # the map function to be used
    my $reduce = shift;                                                      # the reduce function to be used
    my $h1     = shift;                                                      # the incoming hash
    my $job    = shift || 'job1:';                                           # a job id (should be different for every job)

    my $memd = new Cache::Memcached {servers   => $self->{MemCacheds},       # connect to the Memcached cloud
				     namespace => $job };

    threads->create (sub { $memd->set ('map',    $map) })   ->join;          # store map into cloud (see $Storable::Deparse)
    threads->create (sub { $memd->set ('reduce', $reduce) })->join;          # store reduce into cloud (see $Storable::Deparse)

  SLICING:
    my $slices = Hslice ($h1, scalar @{ $self->{_workers} });                # slice the hash into equal parts (as many workers as there are)
    $log->debug ("master sliced ".Dumper $slices) if $log->is_debug;

    my @keys;                                                                # this will be filled in the map phase below
  MAPPHASE:
    while (my $sl4ws = _slices ([ keys %$slices ], $self->{_workers}) ) {    # compute unresolved slices versus operational workers
	if (my ($k) = keys %$slices) {                                       # there is one unhandled
    
	    if (my ($w) = grep { ! defined $_->{thread} }                    # find a non-busy worker
		          @{ $self->{_workers}} ) {                          # from the operational workers
#warn "found free ".$w->{host};
		$w->{slice}  = delete $slices->{$k};                         # task it with slice,  take it off the list for now
                my @chunks = threads->create ({'context' => 'list'},
					      'chunk_n_store',
					      $memd, $w->{slice}, 
					      $job, 1000)->join;             # distribute hash slice over memcacheds
#warn "thread chunks ".Dumper \@chunks;

		$w->{thread} = threads->create (ref ($w).'::map',
 						$w,                          # this is the worker which will be effectively tasked
 						\@chunks,                    # these params are just passed through
 						"slice$k:",
 						$self->{MemCacheds},
 						$job);
	    } else {                     
		# null
	    }
	}
	foreach my $j ( threads->list ( threads::joinable() ) ) {            # see those who are finished
#warn "joining one";
	    push @keys, @{ $j->join() };                                     # harvest
	    my ($w) = grep { $_->{thread} == $j } @{$self->{_workers}};      # find the corresponding worker
#warn " and it is ".$w->{host};
	    $w->{slice} = $w->{thread} = undef;                              # entlaste den
	}
#warn "open slices? ".Dumper $slices;
#warn "outstanding threads? ".Dumper [ map { $_->{slice} } @{$self->{_workers}}];
#warn "   _slices "._slices ([ keys %$slices ], $self->{_workers});
#warn "waiting for something...";
	sleep 1 if $sl4ws eq _slices ([ keys %$slices ], $self->{_workers});    # only if no progress , we are not yet finished?
    }

    $log->debug ("master: all keys after mappers ".Dumper \@keys) if $log->is_debug;
  RESHUFFLING:
    my $Rs = balance_keys (\@keys, $job, scalar @{ $self->{_workers} });     # slice the keys into 'equal' groups
    $log->debug ("master: after balancing ".Dumper $Rs) if $log->is_debug;

    my @chunks;
  REDUCEPHASE:
    while (my $rs4ws = _slices ([ keys %$Rs ], $self->{_workers}) ) {
	if (my ($r) = keys %$Rs) {

	    if (my ($w) = grep { ! defined $_->{thread} }                    # find a non-busy worker
		          @{ $self->{_workers}} ) {                          # from the operational workers
#warn "reduce: found free ".$w->{host};
                $w->{slice}  = delete $Rs->{$r}; 

                $w->{thread} = threads->create (ref ($w).'::reduce',
						$w,
						$w->{slice},
						$self->{MemCacheds}, 
						$job);
	    } else {                     
		# null
	    }
	}
	foreach my $j ( threads->list (threads::joinable) ) {                # see those who are finished
#warn "reduce: joining one";
	    push @chunks, @{ $j->join() };                                   # harvest
	    my ($w) = grep { $_->{thread} == $j } @{$self->{_workers}};      # find the corresponding worker
#warn " and it is ".$w->{host};
	    $w->{slice} = $w->{thread} = undef;                              # entlaste den
	}
#warn "reduce: open R slices? ".Dumper $Rs;
#warn "reduce : outstanding threads? ".Dumper [ map { $_->{slice} } @{$self->{_workers}}];
#warn "   _slices "._slices ([ keys %$Rs ], $self->{_workers});
#warn "reduce: waiting for something...";
	sleep 1 if $rs4ws eq _slices ([ keys %$Rs ], $self->{_workers});     # only if no progress , we are not yet finished?
    }

#    foreach my $r (keys %$Rs) {                                              # for all these slices
#	my ($w) = @{ $self->{_workers} };                                    # take always the first, TODO: random?
#	push @chunks, @{ 
#	               $w->reduce ($Rs->{$r}, $self->{MemCacheds}, $job)     # run the reducer and collect keys of chunks for result hash
#		       };
#    }

#warn "trying to reconstruct from ".Dumper \@chunks;
    my $h4 = threads->create ('fetch_n_unchunk', $memd, \@chunks)->join;
## fetch_n_unchunk ($memd, \@chunks);                             # collect together all these chunks
    $log->debug ("master: reconstructed result ".Dumper $h4) if $log->is_debug;
    return $h4;                                                              # return the result hash
}


=pod

=item B<mapreducer>

I<$op> = I<$mri>->mapreducer (I<$map_coderef>, I<$reduce_coderef>)

This method returns a prefabricated mapreducer (see SYNOPSIS). You also have to pass in CODE refs to
the map and the reduce function.

=cut

sub mapreducer {
    my $self = shift;

    my $map    = $_[0];
    my $reduce = $_[1];
    
    return sub {



( run in 1.552 second using v1.01-cache-2.11-cpan-df04353d9ac )