view release on metacpan or search on metacpan
proto
MANIFEST
eg/mrgamood
eg/filelist.pm
eg/readinput.pm
eg/mrgamoo
eg/mrgamoo.conf
eg/example.mrjob
eg/myself.pm
Makefile.PL
META.yml Module meta-data (added by MakeMaker)
eg/example.mrjob view on Meta::CPAN
mood => 'joyous',
};
</%init>
%################################################################
<%map>
<%attr>
%# override various parameters
maxrun => 300
sortprog => /bin/sort
</%attr>
my $data = shift; # one record from the input
# return a key + a value
return ( $data->{cmp}, 1 );
</%map>
%################################################################
<%reduce>
my $key = shift;
my $itr = shift; # an iterator object
# count
my $n = 0;
$itr->foreach( sub { $n ++ } );
eg/readinput.pm view on Meta::CPAN
package Local::MrMagoo::ReadInput;
use AC::MrMagoo::User;
use JSON;
use strict;
our $R; # exported by AC::MrMagoo::User
sub readinput {
my $fd = shift; # file handle
# our file is newline delimted json data
# read next line
my $line = scalar <$fd>;
# end of file?
return (undef, 1) unless defined $line;
my $d = json_decode($line);
# filter input on date range. we could just as easily filter
# in 'map', but doing it here, behind the scenes, keeps things
lib/AC/MrGamoo.pm view on Meta::CPAN
=item seedpeer
specify initial peers to contact when starting. the author generally
specifies 2 on the east coast, and 2 on the west coast.
seedpeer 192.168.10.11:3503
seedpeer 192.168.10.12:3503
=item secret
specify a secret key used to encrypt data transfered between
systems in different datacenters.
secret squeamish-ossifrage
=item syslog
specify a syslog facility for log messages.
syslog local5
=item basedir
local directory to store files
basedir /home/data
=item debug
enable debugging for a particular section
debug job
=back
=head1 BUGS
lib/AC/MrGamoo/AC/FileList.pm view on Meta::CPAN
#
# $Id: FileList.pm,v 1.3 2010/11/10 16:24:38 jaw Exp $
package AC::MrGamoo::AC::FileList;
use AC::MrGamoo::Debug 'files';
use AC::ISOTime;
use AC::Yenta::Direct;
use JSON;
use strict;
my $YDBFILE = "/home/acdata/logfile.ydb";
# return an array of:
# {
# filename => www/2010/01/17/23/5943_prod_5x2N5qyerdeddsNi
# location => [ scrib@a2be021bd31c, scrib@a2be021ad31c ]
# size => 10863
# [anything else]
# }
# convert legacy scriblr ids
lib/AC/MrGamoo/AC/ReadInput.pm view on Meta::CPAN
sub readinput {
my $fd = shift;
my $line = scalar <$fd>;
return (undef, 1) unless defined $line;
my $d;
eval { $d = parse_dancr_log($line); };
if( $@ ){
problem("cannot parse data in (" . $R->config('current_file') . "). cannot process\n");
return ;
}
# filter input on date range. we could just as easily filter
# in 'map', but doing here, behind the scenes, keeps things
# simpler for the jr. developers writing reports.
return if $d->{tstart} < $R->config('start');
return if $d->{tstart} >= $R->config('end');
return ($d, 0);
lib/AC/MrGamoo/API/Client.pm view on Meta::CPAN
use strict;
my $TIMEOUT = 15;
sub new {
my $class = shift;
my $addr = shift;
my $port = shift;
my $info = shift;
my $req = shift;
my $data = shift;
debug("new client type: $req->{type} to $addr:$port");
my $send = AC::MrGamoo::Protocol->encode_request( $req, $data );
my $me = $class->SUPER::new( $addr, $port,
info => "client $req->{type} to $addr:$port; $info",
request => $send,
);
return $me;
}
sub start {
my $me = shift;
lib/AC/MrGamoo/Client.pm view on Meta::CPAN
}
sub run_console {
my $me = shift;
my $fd = $me->{console_fd};
while(1){
my $buf;
recv $fd, $buf, 65535, 0;
my $proto = AC::MrGamoo::Protocol->decode_header($buf);
my $data = substr($buf, AC::MrGamoo::Protocol->header_size());
my $req = AC::MrGamoo::Protocol->decode_request($proto, $data);
last if $req->{type} eq 'finish';
print STDERR "$req->{msg}" if $req->{type} eq 'stderr';
print "$req->{msg}" if $req->{type} eq 'stdout';
$me->{fdebug}->("$req->{server_id}\t$req->{msg}") if $req->{type} eq 'debug';
}
}
sub submit {
my $me = shift;
my $seed = shift; # [ "ipaddr:port", ... ]
lib/AC/MrGamoo/Default/MySelf.pm view on Meta::CPAN
}
sub my_server_id {
return $SERVERID;
}
sub my_network_info {
return [ { ipa => $MYIP } ];
}
sub my_datacenter {
return 'default';
}
1;
lib/AC/MrGamoo/FileList.pm view on Meta::CPAN
perform some limited tests without this file.
But you must provide this file in order to actually run map/reduce jobs.
=head1 DESCRIPTION
MrGamoo only runs map/reduce jobs.
It is up to you to get the files on to the servers
and keep track of where they are. And to tell MrGamoo.
Some people keep the file meta-information in a sql database.
Some people keep the file meta-information in a yenta map.
Some people keep the file meta-information in the filesystem.
When a new job starts, your C<get_file_list> function will be
called with the job config, and should return an arrayref
of matching files along with meta-info.
Each element of the returned arrayref should be a hashref
containing at least the following fields:
lib/AC/MrGamoo/Iter.pm view on Meta::CPAN
$me->_putback($r);
$me->{key} = $r->[0];
return $r->[0];
}
return; # eof
}
sub next {
my($data, $end) = _next(@_);
if( wantarray ){
return ($data, $end);
}else{
return $data;
}
}
sub _next {
my $me = shift;
my $r = $me->_nextrow();
return (undef, 1) unless $r; # eof
return $r->[1] if $me->{key} eq $r->[0];
lib/AC/MrGamoo/Iter/Array.pm view on Meta::CPAN
#
# $Id: Array.pm,v 1.1 2010/11/01 18:41:55 jaw Exp $
package AC::MrGamoo::Iter::Array;
use AC::MrGamoo::Iter;
our @ISA = 'AC::MrGamoo::Iter';
use strict;
sub new {
my $class = shift;
my $array = shift; # [ [key, data], ...]
return bless {
src => $array,
}, $class;
}
sub _nextrow {
my $me = shift;
if( $me->{buf} ){
lib/AC/MrGamoo/Job/Plan.pm view on Meta::CPAN
return \@out;
}
sub _plan_map_these_servers {
my $job = shift;
my $servers = shift;
# limit number of servers?
my $nm = ($job->{options}{maps} + 0) || @$servers;
my %data;
for my $s ( sort { $a->{metric} <=> $b->{metric} } @$servers ){
$data{ $s->{id} } = { metric => $s->{metric}, use => ($nm ? 1 : 0) };
$nm -- if $nm;
}
return \%data;
}
sub _plan_divy_files {
my $job = shift;
my $files = shift;
my $servers = shift;
my %filemap;
my %bytes;
my @copies;
lib/AC/MrGamoo/Kibitz.pm view on Meta::CPAN
my $natinfo = my_network_info();
for my $i ( @$natinfo ){
push @$ipinfo, { ipv4 => inet_atoi($i->{ipa}), port => my_port(), natdom => $i->{natdom} };
}
}
my $status = ($^T > $STARTTIME + $STARTDELAY) ? 200 : 102;
return {
hostname => $HOSTNAME,
datacenter => my_datacenter(),
subsystem => 'mrgamoo',
environment => conf_value('environment'),
via => my_server_id(),
server_id => my_server_id(),
path => '.',
status => $status,
timestamp => $^T,
lastup => $^T,
ip => $ipinfo,
sort_metric => loadave() * 1000,
lib/AC/MrGamoo/Kibitz/Client.pm view on Meta::CPAN
AC::MrGamoo::Kibitz::Peers->maybe_down( $me->{status_peer}, 'timeout' );
}
}
sub read {
my $me = shift;
my $evt = shift;
debug("recvd reply");
my($proto, $data, $content) = read_protocol_no_content( $me, $evt );
return unless $proto;
$me->{status_ok} = 1;
eval {
my $resp = AC::MrGamoo::Protocol->decode_reply( $proto, $data );
for my $update ( @{$resp->{status}} ){
AC::MrGamoo::Kibitz::Peers->update( $update );
}
};
if(my $e = $@){
verbose("error: $e");
}
$me->shut();
}
lib/AC/MrGamoo/Kibitz/Peers.pm view on Meta::CPAN
use AC::MrGamoo::MySelf;
use AC::MrGamoo::Config;
use AC::DC::Sched;
use AC::Misc;
use AC::Import;
use JSON;
use strict;
our @EXPORT = qw(pick_best_addr_for_peer peer_list_all get_peer_by_id);
my $KEEPDOWN = 300; # keep data about down servers for how long?
my $KEEPLOST = 600; # keep data about servers we have not heard about for how long?
my %SCEPTICAL;
my %ALLPEER;
my %MAYBEDOWN;
my $natdom;
my $natinit;
AC::DC::Sched->new(
info => 'kibitz status',
freq => (conf_value('time_status_kibitz') || 5),
lib/AC/MrGamoo/Server.pm view on Meta::CPAN
my $me = shift;
debug("connection timed out");
$me->shut();
}
sub read {
my $me = shift;
my $evt = shift;
my($proto, $data, $content) = read_protocol_no_content( $me, $evt );
return unless $proto;
# dispatch request
my $h = $HANDLER{ $proto->{type} };
unless( $h ){
verbose("unknown message type: $proto->{type}");
$me->shut();
return;
}
eval {
$data = AC::MrGamoo::Protocol->decode_request($proto, $data) if $data && $proto->{type} ne 'http';
};
if(my $e = $@ ){
problem("cannot decode request: $e");
$me->shut();
return;
}
debug("handling request - $proto->{type}");
if( ref $h ){
$h->( $me, $proto, $data, $content );
}else{
$h->handler( $me, $proto, $data, $content );
}
}
sub http {
my $me = shift;
my $proto = shift;
my $url = shift;
$url =~ s|^/||;
$url =~ s/%(..)/chr(hex($1))/eg;
lib/AC/MrGamoo/Task.pm view on Meta::CPAN
my $me = shift;
$me->{_status_underway} --;
}
sub _read {
my $io = shift;
my $evt = shift;
my $me = shift;
debug("read child $me->{request}{taskid}: $evt->{data}.");
# read status msg from child
$io->{rbuffer} .= $evt->{data};
my @l = split /^/m, $io->{rbuffer};
$io->{rbuffer} = '';
for my $l (@l){
unless( $l =~ /\n/ ){
$io->{rbuffer} = $l;
last;
}
debug("got status $l");
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
sub _close_outfiles {
my $me = shift;
for my $io ( @{$me->{outfd}} ){
$io->close();
}
delete $me->{outfd};
}
sub _output_partition {
my ($me, $n, $key, $data) = @_;
# md5 is twice as fast as sha1.
# anything written in perl is 10 times slower
my $hash = unpack('N', md5( $key ));
my $p = $hash % $n;
my $io = $me->{outfd}[$p];
$io->output( encode_json( [ $key, $data ] ), "\n" );
}
# end-user's 'print' come here
sub eu_print_stdout {
my $me = shift;
_send_eumsg($me, 'stdout', "@_");
}
lib/AC/MrGamoo/Xfer.pm view on Meta::CPAN
my $p;
eval {
# connect
my $s = AC::MrGamoo::Protocol->connect_to_server( inet_aton($addr), $port );
return unless $s;
# send req
AC::MrGamoo::Protocol->write_request($s, $req);
# get response
my $buf = AC::MrGamoo::Protocol->read_data($s, AC::MrGamoo::Protocol->header_size(), 30);
$p = AC::MrGamoo::Protocol->decode_header($buf);
$p->{data} = AC::MrGamoo::Protocol->read_data($s, $p->{data_length}, 1);
$p->{data} = AC::MrGamoo::Protocol->decode_reply($p);
debug("recvd response $p->{data}{status_code}");
return unless $p->{data}{status_code} == 200;
# stream file to disk
my $size = $p->{content_length};
debug("recving file ($size B)");
my $fd;
unless( open( $fd, "> $tmpfile" ) ){
verbose("cannot open output file '$tmpfile': $!");
return;
}
my $chk = _sendfile($oreq, $fd, $s, $size);
my $sha1 = $p->{data}{hash_sha1};
die "SHA1 check failed\n" if $sha1 && $sha1 ne $chk;
};
if(my $e=$@){
debug("error: $e");
return;
}
return $p;
}
lib/AC/protobuf/mrgamoo_status.pl view on Meta::CPAN
'ACPMRMStatus',
[
[
Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
Google::ProtocolBuffers::Constants::TYPE_STRING(),
'hostname', 1, undef
],
[
Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
Google::ProtocolBuffers::Constants::TYPE_STRING(),
'datacenter', 2, undef
],
[
Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
Google::ProtocolBuffers::Constants::TYPE_STRING(),
'subsystem', 3, undef
],
[
Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
Google::ProtocolBuffers::Constants::TYPE_STRING(),
'environment', 4, undef
view all matches for this distributionview release on metacpan - search on metacpan