AC-MrGamoo

 view release on metacpan or  search on metacpan

eg/example.mrjob  view on Meta::CPAN

# Created: 2009-Oct-28 11:19 (EDT)
# Function: test
#
# $Id: example.mrjob,v 1.1 2010/11/01 19:04:21 jaw Exp $

<%doc>
    map reduce example
</%doc>
%################################################################
%# provide values for configurable parameters
%# these override the defaults
%# and params specified on the command line, override these
<%config>
    system      => blargh
    tasktimeout => 120
</%config>
%################################################################
%# common block is prepended to all other blocks.
%# used to load modules
<%common>
    use AC::Misc;
    use AC::Dumper;

eg/example.mrjob  view on Meta::CPAN

<%init>
    $R->print("starting map/reduce job");

    return {
        mood    => 'joyous',
    };
</%init>
%################################################################
<%map>
<%attr>
%# override various parameters
    maxrun      => 300
    sortprog    => /bin/sort
</%attr>
    my $data = shift;   # one record from the input

    # return a key + a value
    return ( $data->{cmp}, 1 );
</%map>
%################################################################
<%reduce>

eg/example.mrjob  view on Meta::CPAN

    return ($key, $n);
</%reduce>
%#
%# additional reduce blocks can go here
%#
%################################################################
%# final block runs once with the results of the previous reduce.
%# used to generate report or insert to db
<%final>
<%attr>
%# override various parameters
    use_strict  => 0
    in_package  => My::Private::Space
</%attr>
<%init>
    # init sub-block runs at start of final block
    my $report;
</%init>
<%cleanup>
    # cleanup sub-block runs at end of final block

lib/AC/MrGamoo/API/Client.pm  view on Meta::CPAN

sub _read {
    my $me  = shift;
    my $evt = shift;

    debug("recvd reply to $me->{info}");

    my($proto, $data, $content) = read_protocol_no_content( $me, $evt );
    return unless $proto;

    # check response
    if( $proto->{is_error} ){
        return $me->_uhoh("rcvd error response");
    }

    $proto->{data} = AC::MrGamoo::Protocol->decode_reply($proto, $data);
    debug("recvd reply to $me->{info} - $proto->{data}{status_code} $proto->{data}{status_message}");

    if( $proto->{data}{status_code} != 200 ){
        return $me->_uh_oh("recvd error reply $proto->{data}{status_code} $proto->{data}{status_message}");
    }

    $me->{result}    = $proto;
    $me->{status_ok} = 1;
    $me->shut();
}

sub _uh_oh {
    my $me  = shift;
    my $msg = shift;

    debug("error $msg");
    $me->run_callback('error', { error => $msg } );
    $me->shut();
}


1;

lib/AC/MrGamoo/API/Get.pm  view on Meta::CPAN

    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    my $file = filename($req->{filename});
    my $fd = $io->{fd};
    fcntl($fd, F_SETFL, 0);	# unset nbio

    return nbfd_reply(404, "not found", $fd, $proto, $req) unless -f $file;
    open(F, $file) || return nbfd_reply(500, 'error', $fd, $proto, $req);
    my $size = (stat($file))[7];
    my $sha1 = sha1_file($file);

    debug("get file '$file' size $size");

    # send header
    my $gb  = ACPScriblReply->encode( { status_code => 200, status_message => 'OK', hash_sha1 => $sha1 } );
    my $hdr = AC::MrGamoo::Protocol->encode_header(
        type		=> $proto->{type},
        msgidno		=> $proto->{msgidno},

lib/AC/MrGamoo/API/Put.pm  view on Meta::CPAN


    my($dir) = $file =~ m|^(.+)/[^/]+$|;

    # mkpath
    eval{ mkpath($dir, undef, 0755) };

    # open tmp
    my $tmp = "$file.tmp";
    unless( open(F, "> $tmp") ){
        problem("open file failed: $!");
        return nbfd_reply(500, 'error', $fd, $proto, $req);
    }

    # read + write
    my $size = $proto->{content_length};
    my $sha1 = $req->{hash_sha1};

    verbose("put file '$file' size $size");

    if( $content ){
        syswrite( F, $content );

lib/AC/MrGamoo/API/Put.pm  view on Meta::CPAN

    }

    eval {
        my $chk = AC::MrGamoo::Protocol->sendfile(\*F, $fd, $size, 10);
        close F;
        die "file size mismatch\n" unless (stat($tmp))[7] == $proto->{content_length};
        die "SHA1 check failed\n" if $sha1 && $sha1 ne $chk;
    };
    if(my $e = $@){
        unlink $tmp;
        verbose("error: $e");
        nbfd_reply(500, 'error', $fd, $proto, $req);
        return;
    }

    rename $tmp, $file;

    nbfd_reply(200, 'OK', $fd, $proto, $req);
}

1;

lib/AC/MrGamoo/API/Simple.pm  view on Meta::CPAN

        if( $gpid ){
            # parent
            _exit(0);
        }else{
            # orphaned child
            eval {
                $func->($io, $proto, $req, @_);
            };
            if(my $e = $@){
                chomp $e;
                verbose("child error: $e");
                _exit(1);
            }
            _exit(0);
        }
    }
}

1;

lib/AC/MrGamoo/API/Xfer.pm  view on Meta::CPAN

    my $x = AC::MrGamoo::Retry->new(
        newobj	=> \&_mk_xfer,
        newargs => [ $req ],
        tryeach	=> $req->{location},
       );

    # reply now
    if( $x ){
        reply( 200, 'OK', $io, $proto, $req );
    }else{
        debug("sending error, xfer/retrier failed, $io->{info}");
        reply( 501, 'Error', $io, $proto, $req );
    }

    # send status when finished
    $x->set_callback('on_success', \&_yippee, $proto, $req);
    $x->set_callback('on_failure', \&_boohoo, $proto, $req);

    # start
    $x->start();
}

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

    my $me = shift;
    my $fd = $me->{console_fd};

    while(1){
        my $buf;
        recv $fd, $buf, 65535, 0;
        my $proto = AC::MrGamoo::Protocol->decode_header($buf);
        my $data  = substr($buf, AC::MrGamoo::Protocol->header_size());
        my $req   = AC::MrGamoo::Protocol->decode_request($proto, $data);
        last if $req->{type} eq 'finish';
        print STDERR "$req->{msg}"                        if $req->{type} eq 'stderr';
        print "$req->{msg}"                               if $req->{type} eq 'stdout';
        $me->{fdebug}->("$req->{server_id}\t$req->{msg}") if $req->{type} eq 'debug';
    }
}

sub submit {
    my $me   = shift;
    my $seed = shift;	# [ "ipaddr:port", ... ]

    my $mr = $me->{program};
    my $r = AC::MrGamoo::Submit::Request->new( $mr );
    $r->{eu_print_stderr} = sub { print STDERR "@_\n" };
    $r->{eu_print_stdout} = sub { print STDERR "@_\n" };

    # run init section
    my $h_init   = $mr->get_code( 'init' );
    my $initres  = ($h_init ? $h_init->{code}() : undef) || {};

    $me->{id} = unique();
    my $req = AC::MrGamoo::Protocol->encode_request( {
        type		=> 'mrgamoo_jobcreate',
        msgidno		=> $^T,

lib/AC/MrGamoo/Job/Done.pm  view on Meta::CPAN


    $me->_cleanup_tasks();
    $me->_cleanup_files();

    return if $me->{aborted};
    $me->{aborted}       = 1;

    # move to final state
    $me->{phase_no} = @{$me->{plan}{taskplan}};

    $me->{euconsole}->send_msg('stderr', 'aborted job' . ($p{reason} ? ": $p{reason}" : ''));
}

################################################################

sub _cleanup_files {
    my $me = shift;

    $me->{_cleanedup} = 1;
    $me->{statistics}{cleanup_start} = time();
    $me->{statistics}{job_time} = time() - $me->{statistics}{job_start};

lib/AC/MrGamoo/Kibitz/Client.pm  view on Meta::CPAN


    $me->{status_ok} = 1;

    eval {
        my $resp = AC::MrGamoo::Protocol->decode_reply( $proto, $data );
        for my $update ( @{$resp->{status}} ){
            AC::MrGamoo::Kibitz::Peers->update( $update );
        }
    };
    if(my $e = $@){
        verbose("error: $e");
    }
    $me->shut();
}

lib/AC/MrGamoo/MySelf.pm  view on Meta::CPAN

    emacs /myperldir/Local/MrGamoo/MySelf.pm
    copy. paste. edit.

    use lib '/myperldir';
    my $m = AC::MrGamoo::D->new(
        class_myself        => 'Local::MrGamoo::MySelf',
    );

=head1 DESCRIPTION

provide functions to override default behavior. you may define
any or all of the following functions.

=head2 my_server_id

return a unique identity for this mrgamoo instance. typically,
something similar to the server hostname.

    sub my_server_id {
        return 'mrm@' . hostname();
    }

lib/AC/MrGamoo/Protocol.pm  view on Meta::CPAN

    my $io  = shift;
    my $evt = shift;

    if( length($io->{rbuffer}) >= $HDRSIZE && !$io->{proto_header} ){
        # decode header
        eval {
            $io->{proto_header} = __PACKAGE__->decode_header( $io->{rbuffer} );
        };
        if(my $e=$@){
            verbose("cannot decode protocol header: $e");
            $io->run_callback('error', {
                cause	=> 'read',
                error	=> "cannot decode protocol: $e",
            });
            $io->shut();
            return;
        }
    }

    return $io->{proto_header};
}

# for simple status queries, argus, debugging

lib/AC/MrGamoo/Submit/Compile.pm  view on Meta::CPAN

    return unless $prog;

    my $c = eval $prog;
    die $@ if $@;

    return $c;
}

sub _die {
    my $me  = shift;
    my $err = shift;

    if( $me->{_lineno} ){
        die "ERROR: $err\nfile: $me->{file} line: $me->{_lineno}\n$me->{_line}\n";
    }
    die "ERROR: $err\nfile: $me->{file}\n";
}

sub _next {
    my $me = shift;

    return unless @{ $me->{lines} };
    $me->{_line} = shift @{ $me->{lines} };
    $me->{_lineno} ++;
    $me->{_file_content} .= $me->{_line};
    return $me->{_line};

lib/AC/MrGamoo/Submit/Compile.pm  view on Meta::CPAN

        if( $d->{tag} eq 'block'){
            $me->_add_block($tag, $me->_compile_block($tag));
        }
        elsif( $d->{tag} eq 'simple' ){
            $me->_add_block($tag, $me->_compile_block_simple($tag));
        }
        elsif( $d->{tag} eq 'config' ){
            $me->_add_config($tag, $me->_compile_config($tag));
        }
        else{
            $me->_die("syntax error");
        }
    }

    delete $me->{_lineno};
    delete $me->{_line};
    delete $me->{_fd};

    1;
}

lib/AC/MrGamoo/Submit/Compile.pm  view on Meta::CPAN

        last if $line =~ m|^</%$tag>\s*$|;

        my($tag) = $line =~ m|^<%(.*)>\s*$|;

        if( $BLOCK{$tag} eq 'simple' ){
            $b->{$tag} .= $me->_compile_block_simple( $tag );
            $b->{code} .= $me->_lineno_info();
        }elsif( $BLOCK{$tag} eq 'config' ){
            $b->{$tag} = $me->_compile_config( $tag );
        }elsif( $tag ){
            $me->_die("syntax error");

        }else{
            $b->{code} .= $line;
        }
    }

    return $b;
}

sub _compile_block_simple {

lib/AC/MrGamoo/Submit/Request.pm  view on Meta::CPAN

sub progress {
    my $me = shift;
    $me->{func_progress}->( @_ ) if $me->{func_progress};
}


sub redirect_io {
    my $me = shift;

    tie *STDOUT, 'AC::MrGamoo::Submit::TieIO', $me->{eu_print_stdout};
    tie *STDERR, 'AC::MrGamoo::Submit::TieIO', $me->{eu_print_stderr};
}

sub print_stderr {
    my $me = shift;

    $me->{eu_print_stderr}->( @_ ) if $me->{eu_print_stderr};
}

sub print {
    my $me = shift;

    $me->{eu_print_stdout}->( @_ ) if $me->{eu_print_stdout};
}

*print_stdout = \&print;

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

use AC::Daemon;
use Digest::SHA1 'sha1';
use Digest::MD5  'md5';
use File::Path;
use Sys::Syslog;
use Socket;
use JSON;
use strict;

my $STATUSTIME = 5;			# seconds
my $MAXRUN     = 3600;			# override with %attr maxrun
my $SORTPROG   = '/usr/bin/sort';	# override with %attr sortprog or config file
my $GZPROG     = '/usr/bin/gzcat';	# override with %attr gzprog or config file

# in child process
sub _start_task {
    my $me = shift;

    debug("start child task");
    $^T = time();
    _setup_stdio_etal();
    _setup_console( $me );
    _update_status( 'STARTING', 0 );

    # send STDOUT + STDERR to end-user console session
    $me->{R}{eu_print_stderr} = sub { eu_print_stderr( $me, @_ )  };
    $me->{R}{eu_print_stdout} = sub { eu_print_stdout( $me, @_ ) };
    $me->{R}->redirect_io();

    my $n = $me->{request}{outfile} ? @{$me->{request}{outfile}} : 0;
    $me->{R}{func_output}   = sub{ _output_partition($me, $n, @_) };
    $me->{R}{func_progress} = sub{ _maybe_update_status($me, 'RUNNING', @_) };

    eval {
        _setup_outfiles( $me );

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

            _do_final( $me );
        }elsif( $me->{request}{phase} =~ /^reduce/ ){
            _do_reduce( $me );
        }else{
            die "unknown map/reduce phase '$me->{request}{phase}'\n";
        }
    };
    if( my $e = $@ ){
        my $myid = my_server_id();
        verbose( "ERROR: $myid - $e" );
        _send_eumsg($me, 'stderr', "ERROR: $myid - $e");
        _update_status( 'FAILED', 0 );
    }

    _close_outfiles( $me );
    _update_status( 'FINISHED', 0 );
    debug("finish child task");
    exit(0);
}

sub _setup_stdio_etal {

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

}


# end-user's 'print' come here
sub eu_print_stdout {
    my $me = shift;

    _send_eumsg($me, 'stdout', "@_");
}

sub eu_print_stderr {
    my $me = shift;

    _send_eumsg($me, 'stderr', "@_");
}

################################################################

sub _do_map {
    my $me = shift;
    my $mr = $me->{mr};

    debug("doing map");
    my $n        = @{$me->{request}{outfile}};

lib/AC/MrGamoo/Xfer.pm  view on Meta::CPAN

        unless( open( $fd, "> $tmpfile" ) ){
            verbose("cannot open output file '$tmpfile': $!");
            return;
        }

        my $chk  = _sendfile($oreq, $fd, $s, $size);
        my $sha1 = $p->{data}{hash_sha1};
        die "SHA1 check failed\n" if $sha1 && $sha1 ne $chk;
    };
    if(my $e=$@){
        debug("error: $e");
        return;
    }

    return $p;
}

sub _sendfile {
    my $req   = shift;
    my $out   = shift;
    my $in    = shift;



( run in 1.076 second using v1.01-cache-2.11-cpan-49f99fa48dc )