AC-MrGamoo

 view release on metacpan or  search on metacpan

MANIFEST  view on Meta::CPAN

proto
MANIFEST
eg/mrgamood
eg/filelist.pm
eg/readinput.pm
eg/mrgamoo
eg/mrgamoo.conf
eg/example.mrjob
eg/myself.pm
Makefile.PL
META.yml                                 Module meta-data (added by MakeMaker)

eg/example.mrjob  view on Meta::CPAN

        mood    => 'joyous',
    };
</%init>
%################################################################
<%map>
<%attr>
%# override various parameters
    maxrun      => 300
    sortprog    => /bin/sort
</%attr>
    my $data = shift;   # one record from the input

    # return a key + a value
    return ( $data->{cmp}, 1 );
</%map>
%################################################################
<%reduce>
    my $key = shift;
    my $itr = shift;    # an iterator object

    # count
    my $n = 0;
    $itr->foreach( sub { $n ++ } );

eg/readinput.pm  view on Meta::CPAN

package Local::MrMagoo::ReadInput;
use AC::MrMagoo::User;
use JSON;
use strict;

our $R;		# exported by AC::MrMagoo::User

sub readinput {
    my $fd = shift;	# file handle

    # our file is newline delimted json data

    # read next line
    my $line = scalar <$fd>;
    # end of file?
    return (undef, 1) unless defined $line;

    my $d = json_decode($line);

    # filter input on date range. we could just as easily filter
    # in 'map', but doing it here, behind the scenes, keeps things

lib/AC/MrGamoo.pm  view on Meta::CPAN

=item seedpeer

specify initial peers to contact when starting. the author generally
specifies 2 on the east coast, and 2 on the west coast.

    seedpeer 192.168.10.11:3503
    seedpeer 192.168.10.12:3503

=item secret

specify a secret key used to encrypt data transfered between
systems in different datacenters.

    secret squeamish-ossifrage

=item syslog

specify a syslog facility for log messages.

    syslog local5

=item basedir

local directory to store files

    basedir         /home/data

=item debug

enable debugging for a particular section

    debug job

=back

=head1 BUGS

lib/AC/MrGamoo/AC/FileList.pm  view on Meta::CPAN

#
# $Id: FileList.pm,v 1.3 2010/11/10 16:24:38 jaw Exp $

package AC::MrGamoo::AC::FileList;
use AC::MrGamoo::Debug 'files';
use AC::ISOTime;
use AC::Yenta::Direct;
use JSON;
use strict;

my $YDBFILE = "/home/acdata/logfile.ydb";

# return an array of:
#   {
#     filename    => www/2010/01/17/23/5943_prod_5x2N5qyerdeddsNi
#     location    => [ scrib@a2be021bd31c, scrib@a2be021ad31c ]
#     size        => 10863
#     [anything else]
#   }

# convert legacy scriblr ids

lib/AC/MrGamoo/AC/ReadInput.pm  view on Meta::CPAN


sub readinput {
    my $fd = shift;

    my $line = scalar <$fd>;
    return (undef, 1) unless defined $line;

    my $d;
    eval { $d = parse_dancr_log($line); };
    if( $@ ){
        problem("cannot parse data in (" . $R->config('current_file') . "). cannot process\n");
        return ;
    }

    # filter input on date range. we could just as easily filter
    # in 'map', but doing here, behind the scenes, keeps things
    # simpler for the jr. developers writing reports.
    return if $d->{tstart} <  $R->config('start');
    return if $d->{tstart} >= $R->config('end');

    return ($d, 0);

lib/AC/MrGamoo/API/Client.pm  view on Meta::CPAN

use strict;

my $TIMEOUT  = 15;

sub new {
    my $class = shift;
    my $addr  = shift;
    my $port  = shift;
    my $info  = shift;
    my $req   = shift;
    my $data  = shift;

    debug("new client type: $req->{type} to $addr:$port");
    my $send = AC::MrGamoo::Protocol->encode_request( $req, $data );
    my $me   = $class->SUPER::new( $addr, $port,
                                 info	 => "client $req->{type} to $addr:$port; $info",
                                 request => $send,
                                );

    return $me;
}

sub start {
    my $me = shift;

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

}

sub run_console {
    my $me = shift;
    my $fd = $me->{console_fd};

    while(1){
        my $buf;
        recv $fd, $buf, 65535, 0;
        my $proto = AC::MrGamoo::Protocol->decode_header($buf);
        my $data  = substr($buf, AC::MrGamoo::Protocol->header_size());
        my $req   = AC::MrGamoo::Protocol->decode_request($proto, $data);
        last if $req->{type} eq 'finish';
        print STDERR "$req->{msg}"                        if $req->{type} eq 'stderr';
        print "$req->{msg}"                               if $req->{type} eq 'stdout';
        $me->{fdebug}->("$req->{server_id}\t$req->{msg}") if $req->{type} eq 'debug';
    }
}

sub submit {
    my $me   = shift;
    my $seed = shift;	# [ "ipaddr:port", ... ]

lib/AC/MrGamoo/Default/MySelf.pm  view on Meta::CPAN

}

sub my_server_id {
    return $SERVERID;
}

sub my_network_info {
    return [ { ipa => $MYIP } ];
}

sub my_datacenter {
    return 'default';
}

1;

lib/AC/MrGamoo/FileList.pm  view on Meta::CPAN

perform some limited tests without this file.

But you must provide this file in order to actually run map/reduce jobs.

=head1 DESCRIPTION

MrGamoo only runs map/reduce jobs.
It is up to you to get the files on to the servers
and keep track of where they are. And to tell MrGamoo.

Some people keep the file meta-information in a sql database.
Some people keep the file meta-information in a yenta map.
Some people keep the file meta-information in the filesystem.

When a new job starts, your C<get_file_list> function will be
called with the job config, and should return an arrayref
of matching files along with meta-info.

Each element of the returned arrayref should be a hashref
containing at least the following fields:

lib/AC/MrGamoo/Iter.pm  view on Meta::CPAN

        $me->_putback($r);
        $me->{key} = $r->[0];
        return $r->[0];
    }

    return;	# eof
}

sub next {

    my($data, $end) = _next(@_);
    if( wantarray ){
        return ($data, $end);
    }else{
        return $data;
    }
}

sub _next {
    my $me = shift;

    my $r = $me->_nextrow();
    return (undef, 1) unless $r;	# eof

    return $r->[1] if $me->{key} eq $r->[0];

lib/AC/MrGamoo/Iter/Array.pm  view on Meta::CPAN

#
# $Id: Array.pm,v 1.1 2010/11/01 18:41:55 jaw Exp $

package AC::MrGamoo::Iter::Array;
use AC::MrGamoo::Iter;
our @ISA = 'AC::MrGamoo::Iter';
use strict;

sub new {
    my $class = shift;
    my $array = shift;  # [ [key, data], ...]

    return bless {
        src	=> $array,
    }, $class;
}

sub _nextrow {
    my $me = shift;

    if( $me->{buf} ){

lib/AC/MrGamoo/Job/Plan.pm  view on Meta::CPAN

    return \@out;
}

sub _plan_map_these_servers {
    my $job     = shift;
    my $servers = shift;

    # limit number of servers?
    my $nm = ($job->{options}{maps} + 0) || @$servers;

    my %data;
    for my $s ( sort { $a->{metric} <=> $b->{metric} } @$servers ){
        $data{ $s->{id} } = { metric => $s->{metric}, use => ($nm ? 1 : 0) };
        $nm -- if $nm;
    }

    return \%data;
}

sub _plan_divy_files {
    my $job     = shift;
    my $files   = shift;
    my $servers = shift;

    my %filemap;
    my %bytes;
    my @copies;

lib/AC/MrGamoo/Kibitz.pm  view on Meta::CPAN

        my $natinfo = my_network_info();
        for my $i ( @$natinfo ){
            push @$ipinfo, { ipv4 => inet_atoi($i->{ipa}), port => my_port(), natdom => $i->{natdom} };
        }
    }

    my $status = ($^T > $STARTTIME + $STARTDELAY) ? 200 : 102;

    return {
        hostname        => $HOSTNAME,
        datacenter      => my_datacenter(),
        subsystem       => 'mrgamoo',
        environment     => conf_value('environment'),
        via             => my_server_id(),
        server_id       => my_server_id(),
        path            => '.',
        status          => $status,
        timestamp       => $^T,
        lastup          => $^T,
        ip              => $ipinfo,
        sort_metric     => loadave() * 1000,

lib/AC/MrGamoo/Kibitz/Client.pm  view on Meta::CPAN

        AC::MrGamoo::Kibitz::Peers->maybe_down( $me->{status_peer}, 'timeout' );
    }
}

sub read {
    my $me  = shift;
    my $evt = shift;

    debug("recvd reply");

    my($proto, $data, $content) = read_protocol_no_content( $me, $evt );
    return unless $proto;

    $me->{status_ok} = 1;

    eval {
        my $resp = AC::MrGamoo::Protocol->decode_reply( $proto, $data );
        for my $update ( @{$resp->{status}} ){
            AC::MrGamoo::Kibitz::Peers->update( $update );
        }
    };
    if(my $e = $@){
        verbose("error: $e");
    }
    $me->shut();
}

lib/AC/MrGamoo/Kibitz/Peers.pm  view on Meta::CPAN

use AC::MrGamoo::MySelf;
use AC::MrGamoo::Config;
use AC::DC::Sched;
use AC::Misc;
use AC::Import;
use JSON;
use strict;

our @EXPORT = qw(pick_best_addr_for_peer peer_list_all get_peer_by_id);

my $KEEPDOWN = 300;     # keep data about down servers for how long?
my $KEEPLOST = 600;     # keep data about servers we have not heard about for how long?

my %SCEPTICAL;
my %ALLPEER;
my %MAYBEDOWN;
my $natdom;
my $natinit;

AC::DC::Sched->new(
    info    => 'kibitz status',
    freq    => (conf_value('time_status_kibitz') || 5),

lib/AC/MrGamoo/Server.pm  view on Meta::CPAN

    my $me = shift;

    debug("connection timed out");
    $me->shut();
}

sub read {
    my $me  = shift;
    my $evt = shift;

    my($proto, $data, $content) = read_protocol_no_content( $me, $evt );
    return unless $proto;

    # dispatch request
    my $h = $HANDLER{ $proto->{type} };

    unless( $h ){
        verbose("unknown message type: $proto->{type}");
        $me->shut();
        return;
    }

    eval {
        $data = AC::MrGamoo::Protocol->decode_request($proto, $data) if $data && $proto->{type} ne 'http';
    };
    if(my $e = $@ ){
        problem("cannot decode request: $e");
        $me->shut();
        return;
    }

    debug("handling request - $proto->{type}");

    if( ref $h ){
        $h->( $me, $proto, $data, $content );
    }else{
        $h->handler( $me, $proto, $data, $content );
    }
}

sub http {
    my $me    = shift;
    my $proto = shift;
    my $url   = shift;

    $url =~ s|^/||;
    $url =~ s/%(..)/chr(hex($1))/eg;

lib/AC/MrGamoo/Task.pm  view on Meta::CPAN

    my $me  = shift;

    $me->{_status_underway} --;
}

sub _read {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;

    debug("read child $me->{request}{taskid}: $evt->{data}.");
    # read status msg from child
    $io->{rbuffer} .= $evt->{data};

    my @l = split /^/m, $io->{rbuffer};
    $io->{rbuffer} = '';
    for my $l (@l){
        unless( $l =~ /\n/ ){
            $io->{rbuffer} = $l;
            last;
        }

        debug("got status $l");

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

sub _close_outfiles {
    my $me = shift;

    for my $io ( @{$me->{outfd}} ){
        $io->close();
    }
    delete $me->{outfd};
}

sub _output_partition {
    my ($me, $n, $key, $data) = @_;

    # md5 is twice as fast as sha1.
    # anything written  in perl is 10 times slower
    my $hash = unpack('N', md5( $key ));
    my $p    = $hash % $n;
    my $io   = $me->{outfd}[$p];
    $io->output( encode_json( [ $key, $data ] ), "\n" );
}


# end-user's 'print' come here
sub eu_print_stdout {
    my $me = shift;

    _send_eumsg($me, 'stdout', "@_");
}

lib/AC/MrGamoo/Xfer.pm  view on Meta::CPAN

    my $p;
    eval {
        # connect
        my $s = AC::MrGamoo::Protocol->connect_to_server( inet_aton($addr), $port );
        return unless $s;

        # send req
        AC::MrGamoo::Protocol->write_request($s, $req);

        # get response
        my $buf = AC::MrGamoo::Protocol->read_data($s, AC::MrGamoo::Protocol->header_size(), 30);
        $p      = AC::MrGamoo::Protocol->decode_header($buf);
        $p->{data} = AC::MrGamoo::Protocol->read_data($s, $p->{data_length}, 1);
        $p->{data} = AC::MrGamoo::Protocol->decode_reply($p);

        debug("recvd response $p->{data}{status_code}");
        return unless $p->{data}{status_code} == 200;

        # stream file to disk
        my $size = $p->{content_length};
        debug("recving file ($size B)");

        my $fd;
        unless( open( $fd, "> $tmpfile" ) ){
            verbose("cannot open output file '$tmpfile': $!");
            return;
        }

        my $chk  = _sendfile($oreq, $fd, $s, $size);
        my $sha1 = $p->{data}{hash_sha1};
        die "SHA1 check failed\n" if $sha1 && $sha1 ne $chk;
    };
    if(my $e=$@){
        debug("error: $e");
        return;
    }

    return $p;
}

lib/AC/protobuf/mrgamoo_status.pl  view on Meta::CPAN

            'ACPMRMStatus',
            [
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'hostname', 1, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'datacenter', 2, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'subsystem', 3, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'environment', 4, undef

 view all matches for this distribution
 view release on metacpan -  search on metacpan

( run in 0.868 second using v1.00-cache-2.02-grep-82fe00e-cpan-4673cadbf75 )