AC-MrGamoo

 view release on metacpan or  search on metacpan

META.yml  view on Meta::CPAN

--- #YAML:1.0
name:               AC-MrGamoo
version:            1
abstract:           Map/Reduce Framework
author:
    - AdCopy <http://www.adcopy.com>
license:            perl
distribution_type:  module
configure_requires:
    ExtUtils::MakeMaker:  0
requires:
    AC::DC:               0
    Digest::SHA1:         0
    Google::ProtocolBuffers:  0
    JSON:                 0
    POSIX:                0
    Sys::Hostname:        0
    Time::HiRes:          0
no_index:
    directory:
        - t

eg/example.mrjob  view on Meta::CPAN

    my $n = 0;
    $itr->foreach( sub { $n ++ } );

    # return a key + a value
    return ($key, $n);
</%reduce>
%#
%# additional reduce blocks can go here
%#
%################################################################
%# final block runs once with the results of the previous reduce.
%# used to generate report or insert to db
<%final>
<%attr>
%# override various parameters
    use_strict  => 0
    in_package  => My::Private::Space
</%attr>
<%init>
    # init sub-block runs at start of final block
    my $report;

eg/mrgamoo.conf  view on Meta::CPAN

# example config
#
# file will be reloaded automagically if it changes. no need to hup or restart.

port            3504
syslog          local4
environment	prod

allow           10.200.2.0/23

basedir         /home/aclogs

seedpeer        10.200.2.3:3504

lib/AC/MrGamoo/API/Client.pm  view on Meta::CPAN


sub _timeout {
    my $me = shift;
    $me->shut();
}

sub _shutdown {
    my $me = shift;

    if( $me->{status_ok} ){
        $me->run_callback('on_success', { result => $me->{result} } );
    }else{
        $me->run_callback('on_failure');
    }
}

sub _read {
    my $me  = shift;
    my $evt = shift;

    debug("recvd reply to $me->{info}");

    my($proto, $data, $content) = read_protocol_no_content( $me, $evt );
    return unless $proto;

    # check response
    if( $proto->{is_error} ){
        return $me->_uhoh("rcvd error response");
    }

    $proto->{data} = AC::MrGamoo::Protocol->decode_reply($proto, $data);
    debug("recvd reply to $me->{info} - $proto->{data}{status_code} $proto->{data}{status_message}");

    if( $proto->{data}{status_code} != 200 ){
        return $me->_uh_oh("recvd error reply $proto->{data}{status_code} $proto->{data}{status_message}");
    }

    $me->{result}    = $proto;
    $me->{status_ok} = 1;
    $me->shut();
}

sub _uh_oh {
    my $me  = shift;
    my $msg = shift;

    debug("error $msg");
    $me->run_callback('error', { error => $msg } );

lib/AC/MrGamoo/API/HB.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Dec-21 17:08 (EST)
# Function: respond to heartbeat requests
#
# $Id: HB.pm,v 1.1 2010/11/01 18:41:51 jaw Exp $

package AC::MrGamoo::API::HB;
use AC::MrGamoo::Debug 'hb';
use AC::MrGamoo::Config;
use AC::MrGamoo::Stats;
use AC::MrGamoo::About;
use AC::MrGamoo::MySelf;

lib/AC/MrGamoo/API/HB.pm  view on Meta::CPAN

    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

    my $response = AC::MrGamoo::Protocol->encode_reply( {
        type            => 'heartbeat_request',
        msgid           => $proto->{msgid},
        is_reply        => 1,
    }, {
        status_code	=> 200,
        status_message	=> 'Honky Dory',
        hostname	=> $HOSTNAME,
        subsystem	=> 'mrgamoo',
        environment	=> conf_value('environment'),
        port		=> my_port(),
        timestamp	=> time(),
        sort_metric	=> loadave(),
        server_id	=> my_server_id(),
        process_id	=> $$,
    } );

    debug("sending hb reply");
    $io->write_and_shut( $response );

}



1;

lib/AC/MrGamoo/API/Simple.pm  view on Meta::CPAN

    my $msg     = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;

    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

    my $response = AC::MrGamoo::Protocol->encode_reply( {
        type            => $proto->{type},
        msgid           => $proto->{msgid},
        is_reply        => 1,
    }, {
        status_code	=> $code,
        status_message	=> $msg,
    } );

    debug("sending $code reply for $proto->{type} on $io->{info}");
    $io->write_and_shut( $response );
}

sub nbfd_reply {
    my $code    = shift;
    my $msg     = shift;
    my $fd      = shift;
    my $proto   = shift;
    my $req     = shift;

    return unless $proto->{want_reply};

    my $response = AC::MrGamoo::Protocol->encode_reply( {
        type            => $proto->{type},
        msgid           => $proto->{msgid},
        is_reply        => 1,
    }, {
        status_code	=> $code,
        status_message	=> $msg,
    } );

    debug("sending $code reply for $proto->{type} (from bkg)");
    syswrite( $fd, $response );
}

sub on_success {
    my $x  = shift;
    my $e  = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;

    reply( 200, 'OK', $io, $proto, $req );

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

    my $me   = shift;
    my $seed = shift;	# [ "ipaddr:port", ... ]

    my $mr = $me->{program};
    my $r = AC::MrGamoo::Submit::Request->new( $mr );
    $r->{eu_print_stderr} = sub { print STDERR "@_\n" };
    $r->{eu_print_stdout} = sub { print STDERR "@_\n" };

    # run init section
    my $h_init   = $mr->get_code( 'init' );
    my $initres  = ($h_init ? $h_init->{code}() : undef) || {};

    $me->{id} = unique();
    my $req = AC::MrGamoo::Protocol->encode_request( {
        type		=> 'mrgamoo_jobcreate',
        msgidno		=> $^T,
        want_reply	=> 1,
    },{
        jobid		=> $me->{id},
        options		=> to_json( $r->{config} ),
        initres		=> to_json( $initres, {allow_nonref => 1} ),
        jobsrc		=> $mr->src(),
        console		=> ($me->{console_port} ? ":$me->{console_port}" : ''),
        traceinfo	=> $me->{traceinfo},
    } );

    my $ok;
    if( my $master = $me->get_config_param('master') ){
        # use specified master (for debugging)
        my($addr, $port) = split /:/, $master;
        $me->_submit_to( $addr, $port, $req );

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

        $ok = $me->_pick_master_and_send( $req, $seed );
    }

    return $ok ? $me->{id} : undef;
}

sub abort {
    my $me = shift;

    return unless $me->{master};
    my $res = $me->_submit_to( $me->{master}{addr}, $me->{master}{port}, AC::MrGamoo::Protocol->encode_request( {
        type		=> 'mrgamoo_jobabort',
        msgidno		=> $^T,
        want_reply	=> 1,
    }, {
        jobid		=> $me->{id},
    }));

}

################################################################

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

    }, {});

    # get the full list of servers
    # contact each seed passed in above, until we get a reply
    for my $s ( @$seed ){
        my($addr, $port) = split /:/, $s;
        $me->{fdebug}->("attempting to fetch server list from $addr:$port");
        eval {
            alarm(1);
            my $reply = AC::MrGamoo::Protocol->send_request( inet_aton($addr), $port, $listreq, $me->{fdebug} );
            my $res   = AC::MrGamoo::Protocol->decode_reply($reply);
            alarm(0);
            my $list = $res->{status};
            @serverlist = @$list if $list && @$list;
        };
        last if @serverlist;
    }

    # sort+filter list
    @serverlist = sort { ($a->{sort_metric} <=> $b->{sort_metric}) || int(rand(3)) - 1 }
      grep { $_->{status} == 200 } @serverlist;

    # try all addresses
    # RSN - sort addresslist in a Peers::pick_best_addr_for_peer() like manner?

    my @addrlist = map { @{$_->{ip}} } @serverlist;

    for my $ip (@addrlist){
        my $addr = inet_itoa($ip->{ipv4});
        my $res;
        eval {
            alarm(30);
            $res = $me->_submit_to( $addr, $ip->{port}, $req );
            alarm(0);
        };
        next unless $res && $res->{status_code} == 200;
        $me->{master} = { addr => $addr, port => $ip->{port} };
        return 1;
    }
    return ;
}

sub _submit_to {
    my $me   = shift;
    my $addr = shift;
    my $port = shift;
    my $req  = shift;

    $me->{fdebug}->("sending job to $addr:$port");
    my $reply = AC::MrGamoo::Protocol->send_request( inet_aton($addr), $port, $req, $me->{fdebug}, 120 );
    my $res   = AC::MrGamoo::Protocol->decode_reply($reply);

    return $res;
}

################################################################

sub check_code {
    my $me = shift;

    my $mr = $me->{program};
    my $nr = @{ $mr->{content}{reduce} };

lib/AC/MrGamoo/Config.pm  view on Meta::CPAN

    gzprog	=> \&AC::ConfigFile::Simple::parse_keyvalue,
);



################################################################

sub handle_config {
    my $me   = shift;
    my $key  = shift;
    my $rest = shift;

    my $fnc = $CONFIG{$key};
    return unless $fnc;
    $fnc->($me, $key, $rest);
    return 1;
}

################################################################

sub conf_value {
    my $key = shift;

    return $AC::MrGamoo::CONF->{config}{$key};
}

lib/AC/MrGamoo/FileList.pm  view on Meta::CPAN


    filename    => 'www/2010/01/17/23/5943_prod_5x2N5qyerdeddsNi'

=head2 location

an arrayref of servers where this file is located. the locations
should be the persistent-ids of the servers (see MySelf).

if the same file is replicated on multiple servers, mrgamoo will
be able to both intelligently determine which servers will process
which files, as well as recover from failures.

    location	=> [ 'mrm@athena.example.com', 'mrm@zeus.example.com' ]

=head2 size

this should be the size of the file, in bytes. mrgamoo will consider
the sizes of files in determining which servers will process which files.

    size	=> 10843

lib/AC/MrGamoo/Iter/File.pm  view on Meta::CPAN

our @ISA = 'AC::MrGamoo::Iter';
use strict;

sub new {
    my $class = shift;
    my $fd    = shift;
    my $pf    = shift;

    return bless {
        fd	 => $fd,
        progress => $pf,
    }, $class;
}

sub _nextrow {
    my $me = shift;

    if( $me->{buf} ){
        my $r = $me->{buf};
        delete $me->{buf};
        return $r;
    }

    my $fd = $me->{fd};
    my $l  = scalar <$fd>;
    return unless $l;
    $me->{progress}->();
    return decode_json($l);
}


1;

lib/AC/MrGamoo/Job.pm  view on Meta::CPAN

# record status rcvd from task
sub task_status {
    my $me = _find(shift, @_);
    my %p  = @_;
    my $taskid = $p{taskid};

    return unless $me;
    my $t = $me->{task_running}{$taskid};
    return unless $t;

    $t->update_status( $me, $p{phase}, $p{progress} );

    1;
}

# record status rcvd from file xfer
sub xfer_status {
    my $me = _find(shift, @_);
    my %p  = @_;
    my $copyid = $p{copyid};

lib/AC/MrGamoo/Job/Task.pm  view on Meta::CPAN


    my $x = $job->_send_request( $server, "task $me->{id}", {
        type		=> 'mrgamoo_taskcreate',
        msgidno		=> $^T,
        want_reply	=> 1,
    }, {
        jobid		=> $job->{request}{jobid},
        taskid		=> $me->{id},
        jobsrc		=> $job->{request}{jobsrc},
        options		=> $job->{request}{options},
        initres		=> $job->{request}{initres},
        console		=> ($job->{request}{console} || ''),
        phase		=> $ti->{phase},
        infile		=> $ti->{infile},
        outfile		=> [ map { $_->{filename} } @{$ti->{outfile}} ],
        master		=> my_server_id(),
    } );

    unless( $x ){
        verbose("cannot start task");
        $me->failed($job);

lib/AC/MrGamoo/Job/Task.pm  view on Meta::CPAN

    my $me  = shift;
    my $job = shift;

    $me->failed($job, 'network');
}

sub update_status {
    my $me  = shift;
    my $job = shift;
    my $phase = shift;
    my $progress = shift;

    $me->{status_time}  = $^T;
    $me->{status_phase} = $phase;
    $me->{status_amt}   = $progress;
    $me->{status_fail}  = 1 if $phase eq 'FAILED';

    debug("task is $phase $progress");

    if( $phase eq 'FINISHED' ){
        if( $me->{status_fail} ){
            $me->failed( $job, "status fail" );
        }else{
            $me->finished( $job );
        }
    }

    return 1;

lib/AC/MrGamoo/Job/Util.pm  view on Meta::CPAN

        verbose("cannot locate server $server");
        return;
    }

    my $x = AC::MrGamoo::API::Client->new( $addr, $port, @_ );
    return $x;
}

################################################################

# do we have sufficient resources to take on more work?
sub _ok_to_do_more_p {
    my $me = shift;

    my $io = AC::DC::IO->underway();
    return if $io >= $MAXFILE / 2;

    if( loadave() >= $MAXLOAD ){
        return if rand() >= ( 1 - loadave() ) / 5;
    }

lib/AC/MrGamoo/Kibitz/Client.pm  view on Meta::CPAN

    my $evt = shift;

    debug("recvd reply");

    my($proto, $data, $content) = read_protocol_no_content( $me, $evt );
    return unless $proto;

    $me->{status_ok} = 1;

    eval {
        my $resp = AC::MrGamoo::Protocol->decode_reply( $proto, $data );
        for my $update ( @{$resp->{status}} ){
            AC::MrGamoo::Kibitz::Peers->update( $update );
        }
    };
    if(my $e = $@){
        verbose("error: $e");
    }
    $me->shut();
}

lib/AC/MrGamoo/Kibitz/Peers.pm  view on Meta::CPAN

    my $class = shift;
    my $up    = shift;

    return unless _update_ok($up);
    my $id = $up->{server_id};


    my $previnfo = $ALLPEER{$id};
    # only keep it if it is newer than what we have
    return if $previnfo && $up->{timestamp} <= $previnfo->{timestamp};
    # only keep it if it is relatively fresh
    return unless $up->{timestamp} > $^T - $KEEPDOWN;

    $up->{path} .= ' ' . my_server_id();

    if( $previnfo ){
        verbose("marking peer $id as up") if $up->{status} == 200 && $previnfo->{status} != 200;
    }else{
        verbose("discovered new peer: $id ($up->{hostname})");
    }

lib/AC/MrGamoo/Kibitz/Peers.pm  view on Meta::CPAN

    }
}

sub seems_ok {
    my $class = shift;
    my $id    = shift;

    delete $MAYBEDOWN{$id};
}

# require 2 failures before declaring it down
sub maybe_down {
    my $class = shift;
    my $id    = shift;
    my $why   = shift;

    if( $MAYBEDOWN{$id} ){
        delete $MAYBEDOWN{$id};
        $class->isdown($id, $why);
        return;
    }

lib/AC/MrGamoo/Kibitz/Peers.pm  view on Meta::CPAN

    $ALLPEER{$id}{status}    = 0;

    _maybe_remove( $id );
}

sub peer_list_all {

    return [ AC::MrGamoo::Kibitz->about_myself(), values %ALLPEER ];
}

sub response {
    return peer_list_all();
}

sub get_peer_by_id {
    my $id = shift;

    return $ALLPEER{$id} if $ALLPEER{$id};
    return AC::MrGamoo::Kibitz->about_myself() if $id eq my_server_id();
    return ;
}

lib/AC/MrGamoo/Kibitz/Server.pm  view on Meta::CPAN

    debug("recvd request");
    if( $req ){
        AC::MrGamoo::Kibitz::Peers->update_sceptical( $req->{myself} );
    }

    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

    # respond with all known peers
    my $all  = AC::MrGamoo::Kibitz::Peers->peer_list_all();
    my $resp = AC::MrGamoo::Protocol->encode_reply( $proto, {
        status => $all,
    } );

    debug("sending status reply");
    $io->write_and_shut( $resp );
}

1;

lib/AC/MrGamoo/MySelf.pm  view on Meta::CPAN

        return [
            # use this IP for communication with servers this datacenter (same natdom)
            { ip => $privat_ip, natdom => my_datacenter() },
            # otherwise use this IP
            { ip => $public_ip },
        ]
    }

=head2 init

inialization function called at startup. typically used to lookup hostanmes, IP addresses,
and such and store them in variables to make the above functions faster.

    my $HOSTNAME;
    my $DOMAIN;
    sub init {
        $HOSTNAME = hostname();
        ($DOMAIN) = $HOSTNAME =~ /^[\.]+\.(.*)/;
    }

=head1 BUGS

lib/AC/MrGamoo/OutFile.pm  view on Meta::CPAN


# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-27 11:34 (EST)
# Function: buffer output + open/close files as needed
#
# $Id: OutFile.pm,v 1.2 2011/01/06 17:58:13 jaw Exp $

package AC::MrGamoo::OutFile;
use AC::MrGamoo::Debug 'outfile';
use IO::Compress::Gzip;
use strict;

my $BUFMAX         = 200000;
my $max_open;
my $currently_open = 0;
my %all;


$max_open = `sh -c "ulimit -n"`;
$max_open = 255 if $^O eq 'solaris' && $max_open > 255;

lib/AC/MrGamoo/OutFile.pm  view on Meta::CPAN

    close($me->{fd});
    $currently_open --;
    delete $me->{fd};
    debug("closed file $me->{file}");
}

sub _open {
    my $me = shift;

    if( $me->{gz} ){
        my $fd = IO::Compress::Gzip->new( $me->{file},
                                          Append	=> $me->{been_opened},
                                          Merge		=> $me->{been_opened},
                                         );
        $me->{fd} = $fd;
        debug("opened file (compressed) $me->{file}");
    }else{
        my $mode = $me->{been_opened} ? '>>' : '>';

        open(my $fd, $mode, $me->{file}) || die "cannot open '$me->{file}': $!\n";
        $me->{fd} = $fd;
        debug("opened file $me->{file}");
    }

    $me->{been_opened} = 1;
    $currently_open ++;

lib/AC/MrGamoo/Protocol.pm  view on Meta::CPAN

use AC::DC::Protocol;
use AC::Import;
use strict;

our @ISA    = 'AC::DC::Protocol';
our @EXPORT = qw(read_protocol read_protocol_no_content);
my $HDRSIZE = __PACKAGE__->header_size();

my %MSGTYPE =
 (
  status		=> { num => 0, reqc => '', 			resc => 'ACPStdReply' },
  heartbeat		=> { num => 1, reqc => '', 			resc => '' },
  heartbeat_request	=> { num => 2, reqc => '', 			resc => 'ACPHeartBeat' },

  scribl_put		=> { num => 11, reqc => 'ACPScriblRequest',     resc => 'ACPScriblReply' },
  scribl_get		=> { num => 12, reqc => 'ACPScriblRequest',     resc => 'ACPScriblReply' },
  scribl_del		=> { num => 13, reqc => 'ACPScriblRequest',     resc => 'ACPScriblReply' },
  scribl_stat		=> { num => 14, reqc => 'ACPScriblRequest',     resc => 'ACPScriblReply' },

  mrgamoo_jobcreate	=> { num => 15, reqc => 'ACPMRMJobCreate',      resc => 'ACPStdReply' },
  mrgamoo_taskcreate	=> { num => 16, reqc => 'ACPMRMTaskCreate',     resc => 'ACPStdReply' },
  mrgamoo_jobabort	=> { num => 17, reqc => 'ACPMRMJobAbort',       resc => 'ACPStdReply' },
  mrgamoo_taskabort	=> { num => 18, reqc => 'ACPMRMTaskAbort',      resc => 'ACPStdReply' },
  mrgamoo_taskstatus	=> { num => 19, reqc => 'ACPMRMTaskStatus',     resc => 'ACPStdReply' },
  mrgamoo_filexfer	=> { num => 20, reqc => 'ACPMRMFileXfer',       resc => 'ACPStdReply' },
  mrgamoo_filedel	=> { num => 21, reqc => 'ACPMRMFileDel',        resc => 'ACPStdReply' },
  mrgamoo_diagmsg	=> { num => 22, reqc => 'ACPMRMDiagMsg',        resc => 'ACPStdReply' },
  mrgamoo_xferstatus	=> { num => 23, reqc => 'ACPMRMXferStatus',     resc => 'ACPStdReply' },
  mrgamoo_status	=> { num => 24, reqc => 'ACPMRMStatusRequest',  resc => 'ACPMRMStatusReply' },

 );


for my $name (keys %MSGTYPE){
    my $r = $MSGTYPE{$name};
    __PACKAGE__->add_msg( $name, $r->{num}, $r->{reqc}, $r->{resc});
}



sub read_protocol {
    my $io  = shift;
    my $evt = shift;

    $io->{rbuffer} .= $evt->{data};

lib/AC/MrGamoo/ReadInput.pm  view on Meta::CPAN

    copy. paste. edit.

    use lib '/myperldir';
    my $m = AC::MrGamoo::D->new(
        class_readinput    => 'Local::MrGamoo::ReadInput',
    );

=head1 DESCRIPTION

In your map/reduce job, your C<map> function is called once per record.
The C<readinput> function is responsible for reading the actual files
and returning records.

The default C<readinput> returns one line at a time (just like <FILE>).

If you want different behavior, you can provide a C<ReadInput> class,
or spoecify a C<readinput> block in your map/reduce job.

Your function should return an array of 2 values

=head2 record

lib/AC/MrGamoo/Server.pm  view on Meta::CPAN

    $url =~ s/%(..)/chr(hex($1))/eg;
    my($base) = split m|/|, $url;

    debug("http get $base");
    my $f = $HTTP{$base};
    $f ||= \&http_notfound;
    my( $content, $code, $text ) = $f->($url);
    $code ||= 200;
    $text ||= 'OK';

    my $res = "HTTP/1.0 $code $text\r\n"
      . "Server: AC/MrGamoo\r\n"
      . "Connection: close\r\n"
      . "Content-Type: text/plain; charset=UTF-8\r\n"
      . "Content-Length: " . length($content) . "\r\n"
      . "\r\n"
      . $content ;

    $me->write_and_shut($res);
}

################################################################

sub http_notfound {
    my $url = shift;

    return ("404 NOT FOUND\nThe requested url /$url was not found on this server.\nSo sorry.\n\n", 404, "Not Found");
}

lib/AC/MrGamoo/Stats.pm  view on Meta::CPAN


################################################################

sub http_load {

    return sprintf("loadave:    %0.4f\n\n", loadave());
}

sub http_stats {

    my $res;
    for my $k (sort keys %STATS){
        $res .= sprintf("%-24s%s\n", "$k:", $STATS{$k});
    }

    $res .= "\n";
    return $res;
}

sub http_status {
    return "status: OK\n\n";
}


1;

lib/AC/MrGamoo/Submit/Compile.pm  view on Meta::CPAN


    if( $d->{multi} ){
        # merge
        @{ $me->{content}{$tag} }{ keys %$cfg } = values %$cfg;
    }else{
        $me->_die("redefinition of '$tag' section") if $me->{content}{$tag};
        $me->{content}{$tag} = $cfg;
    }
}

sub set_initres {
    my $me = shift;
    my $ir = shift;

    $me->{initres} = $ir;
}

sub set_config {
    my $me  = shift;
    my $cfg = shift;

    $me->{content}{config} = $cfg;
}

sub get_config_param {

lib/AC/MrGamoo/Submit/Request.pm  view on Meta::CPAN

use strict;


sub new {
    my $class = shift;
    my $c     = shift;

    my $me = bless {
        file	=> $c->{file},
        config	=> $c->{content}{config},
        initres	=> $c->{initres},
        @_,
    }, $class;

    $AC::MrGamoo::User::R = $me;
    return $me;
}


# get config param
sub config {
    my $me = shift;
    my $k  = shift;

    return $me->{config}{$k};
}

# get result of init block
sub initvalue {
    my $me = shift;

    return $me->{initres};
}

# let user output a key+value via $R->output(...)
sub output {
    my $me = shift;
    $me->{func_output}->( @_ ) if $me->{func_output};
}
# and indicate progress via $R->progress
sub progress {
    my $me = shift;
    $me->{func_progress}->( @_ ) if $me->{func_progress};
}


sub redirect_io {
    my $me = shift;

    tie *STDOUT, 'AC::MrGamoo::Submit::TieIO', $me->{eu_print_stdout};
    tie *STDERR, 'AC::MrGamoo::Submit::TieIO', $me->{eu_print_stderr};
}

lib/AC/MrGamoo/Task.pm  view on Meta::CPAN


    my $task = $me->{request}{taskid};
    return problem("cannot create task: no task id")   unless $task;
    if( $REGISTRY{$task} ){
        verbose("ignoring duplicate request task $task");
        # will cause a 200 OK, so the requestor will not retry
        return $REGISTRY{$task};
    }

    $me->{options} = decode_json( $me->{request}{options} ) if $me->{request}{options};
    $me->{initres} = from_json( $me->{request}{initres}, {allow_nonref => 1} ) if $me->{request}{initres};

    # compile
    eval {
        my $mr = AC::MrGamoo::Submit::Compile->new( text => $me->{request}{jobsrc} );
        # merge job config + opts.
        $mr->set_config($me->{options});
        $mr->set_initres($me->{initres});
        $me->{R} = AC::MrGamoo::Submit::Request->new( $mr );
        $me->{R}{config}{jobid}  = $me->{request}{jobid};
        $me->{R}{config}{taskid} = $me->{request}{taskid};
        $me->{mr} = $mr;
    };
    if(my $e = $@){
        problem("cannot compile task: $e");
        return;
    }

lib/AC/MrGamoo/Task.pm  view on Meta::CPAN


    debug("sending task status update $me->{request}{taskid} => $me->{status}{phase}");
    my $x = AC::MrGamoo::API::Client->new( $addr, $port, "task $me->{request}{taskid}", {
        type		=> 'mrgamoo_taskstatus',
        msgidno		=> $msgid++,
        want_reply	=> 0,
    }, {
        jobid		=> $me->{request}{jobid},
        taskid		=> $me->{request}{taskid},
        phase		=> $me->{status}{phase},
        progress	=> $me->{status}{amt},
    } );

    return unless $x;

    $me->{_status_underway} ++;
    $x->set_callback('shutdown', \&_send_status_done, $me);

    $x->start();

}

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

    _setup_console( $me );
    _update_status( 'STARTING', 0 );

    # send STDOUT + STDERR to end-user console session
    $me->{R}{eu_print_stderr} = sub { eu_print_stderr( $me, @_ )  };
    $me->{R}{eu_print_stdout} = sub { eu_print_stdout( $me, @_ ) };
    $me->{R}->redirect_io();

    my $n = $me->{request}{outfile} ? @{$me->{request}{outfile}} : 0;
    $me->{R}{func_output}   = sub{ _output_partition($me, $n, @_) };
    $me->{R}{func_progress} = sub{ _maybe_update_status($me, 'RUNNING', @_) };

    eval {
        _setup_outfiles( $me );

        if( $me->{request}{phase}     eq 'map' ){
            _do_map( $me );
        }elsif( $me->{request}{phase} eq 'final' ){
            _do_final( $me );
        }elsif( $me->{request}{phase} =~ /^reduce/ ){
            _do_reduce( $me );

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN


    return if $^T < ($me->{status_time} + $STATUSTIME);
    $me->{status_time} = $^T;
    _update_status( @_ );
}

sub _setup_outfiles {
    my $me = shift;
    my @out;

    my $gz = $me->attr(undef, 'compress');
    for my $file ( @{$me->{request}{outfile}} ){
        my $f = conf_value('basedir') . '/' . $file;
        my($dir) = $f =~ m|^(.+)/[^/]+$|;

        eval{ mkpath($dir, undef, 0777) };
        push @out, AC::MrGamoo::OutFile->new( $f, $gz );
    }

    $me->{outfd} = \@out;
}

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

        $h_final->{cleanup}() if $h_final->{cleanup};
    }
}

################################################################

sub _sort_cmd {
    my $me = shift;
    my $hc = shift;

    my $gz   = $me->attr(undef, 'compress');
    my $sort = $me->attr($hc,'sortprog') || conf_value('sortprog') || $SORTPROG;
    my @file = map { conf_value('basedir') . '/' . $_ } @{$me->{request}{infile}};

    if( $gz ){
        my $zcat = $me->attr($hc,'gzprog') || conf_value('gzprog') || $GZPROG;
        my $cmd  = $zcat . ' ' . join(' ', @file) . ' | ' . $sort;
        debug("running cmd: $cmd");
        return $cmd;
    }else{
        my @cmd = ($sort, @file);

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

    }
}

sub _sort_underway {
    my $me = shift;
    my $fd = shift;

    my $fn = fileno($fd);
    my $rfd = "\0\0\0\0";

    # send progress updates to master while sort is sorting
    while(1){
        vec($rfd, $fn, 1) = 1;

        select($rfd, undef, undef, 5);
        return if vec($rfd, $fn, 1);
        _maybe_update_status( $me, 'RUNNING', 0);
    }

}

lib/AC/MrGamoo/Xfer.pm  view on Meta::CPAN


    my $p;
    eval {
        # connect
        my $s = AC::MrGamoo::Protocol->connect_to_server( inet_aton($addr), $port );
        return unless $s;

        # send req
        AC::MrGamoo::Protocol->write_request($s, $req);

        # get response
        my $buf = AC::MrGamoo::Protocol->read_data($s, AC::MrGamoo::Protocol->header_size(), 30);
        $p      = AC::MrGamoo::Protocol->decode_header($buf);
        $p->{data} = AC::MrGamoo::Protocol->read_data($s, $p->{data_length}, 1);
        $p->{data} = AC::MrGamoo::Protocol->decode_reply($p);

        debug("recvd response $p->{data}{status_code}");
        return unless $p->{data}{status_code} == 200;

        # stream file to disk
        my $size = $p->{content_length};
        debug("recving file ($size B)");

        my $fd;
        unless( open( $fd, "> $tmpfile" ) ){
            verbose("cannot open output file '$tmpfile': $!");
            return;

lib/AC/protobuf/mrgamoo.pl  view on Meta::CPAN

                    'jobsrc', 3, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'options', 4, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'initres', 5, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'phase', 6, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REPEATED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'outfile', 7, undef

lib/AC/protobuf/mrgamoo.pl  view on Meta::CPAN

                    'jobsrc', 2, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'options', 3, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'initres', 4, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'console', 5, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'traceinfo', 6, undef

lib/AC/protobuf/mrgamoo.pl  view on Meta::CPAN

                    'taskid', 2, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'phase', 3, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_FLOAT(), 
                    'progress', 4, undef
                ],

            ],
            { 'create_accessors' => 1, 'follow_best_practice' => 1,  }
        );
    }

}
1;



( run in 1.355 second using v1.01-cache-2.11-cpan-49f99fa48dc )