
 view release on metacpan or  search on metacpan

LICENSE  view on Meta::CPAN

    This software may be copied and distributed under the terms
    found in the Perl "Artistic License".

    A copy of the "Artistic License" may be found in the standard
    Perl distribution.

META.yml  view on Meta::CPAN

--- #YAML:1.0
name:               AC-MrGamoo
version:            1
abstract:           Map/Reduce Framework
    - AdCopy <>
license:            perl
distribution_type:  module
    ExtUtils::MakeMaker:  0
    AC::DC:               0
    Digest::SHA1:         0
    Google::ProtocolBuffers:  0
    JSON:                 0
    POSIX:                0
    Sys::Hostname:        0
    Time::HiRes:          0
        - t
        - inc
generated_by:       ExtUtils::MakeMaker version 6.48
    version:  1.4

Makefile.PL  view on Meta::CPAN

use ExtUtils::MakeMaker;
              NAME            => 'AC::MrGamoo',
              VERSION_FROM    => 'lib/AC/',
              ABSTRACT_FROM   => 'lib/AC/',
              AUTHOR          => 'AdCopy <>',
              LICENSE         => 'perl',
              PREREQ_PM       => {
                  'POSIX'                       => 0,
                  'Sys::Hostname'	        => 0,
                  'JSON'		        => 0,
                  'Digest::SHA1'	        => 0,
                  'Time::HiRes'			=> 0,
                  'Google::ProtocolBuffers'	=> 0,
		  'AC::DC'			=> 0,

eg/example.mrjob  view on Meta::CPAN

%# -*- mason -*-
# Copyright (c) 2009 by AdCopy
# Author: Jeff Weisberg
# Created: 2009-Oct-28 11:19 (EDT)
# Function: test
# $Id: example.mrjob,v 1.1 2010/11/01 19:04:21 jaw Exp $

    map reduce example
%# provide values for configurable parameters
%# these override the defaults
%# and params specified on the command line, override these
    system      => blargh
    tasktimeout => 120
%# common block is prepended to all other blocks.
%# used to load modules
    use AC::Misc;
    use AC::Dumper;
%# init block runs once at startup.
%# the return value can be retrieved by other blocks.
%# used to calculate values or fetch things from a db.
    $R->print("starting map/reduce job");

    return {
        mood    => 'joyous',
%# override various parameters
    maxrun      => 300
    sortprog    => /bin/sort
    my $data = shift;   # one record from the input

    # return a key + a value
    return ( $data->{cmp}, 1 );
    my $key = shift;
    my $itr = shift;    # an iterator object

    # count
    my $n = 0;
    $itr->foreach( sub { $n ++ } );

    # return a key + a value
    return ($key, $n);
%# additional reduce blocks can go here
%# final block runs once with the results of the previous reduce.
%# used to generate report or insert to db
%# override various parameters
    use_strict  => 0
    in_package  => My::Private::Space
    # init sub-block runs at start of final block
    my $report;
    # cleanup sub-block runs at end of final block

    # get the values from the init block
    my $iv = $R->initvalue();

    # stdout is tied to $R. so plain print also works.

    print "report for mood: $iv->{mood}\n";
    print $report;

    my($key, $data) = @_;

    $report .= "key: $key, value: $data\n";


eg/  view on Meta::CPAN

package Local::MrMagoo::FileList;
use AC::ISOTime;
use AC::Yenta::Direct;
use JSON;
use strict;

my $YDBFILE = "/data/files.ydb";

sub get_file_list {
    my $config = shift;

    # get files + metadata from yenta
    my $yenta = AC::Yenta::Direct->new( 'files', $YDBFILE );

    # the job config is asking for files that match:
    my $syst  = $config->{system};
    my $tmax  = $config->{end};		# time_t
    my $tmin  = $config->{start};	# time_t

    # the keys in  yenta are of the form: 20100126150139_[...]
    my $start = isotime($tmin);		# 1286819830       => 20101011T175710Z
    $start =~ s/^(\d+)T(\d+).*/$1$2/;	# 20101011T175710Z => 20101011175710

    my @files = grep {
        # does this file match the request?
        ($_->{subsystem}   eq $syst) &&
        ($_->{end_time}    >= $tmin) &&
        ($_->{start_time}  <= $tmax)
    } map {
        # get meta-data on this file. data is json encoded
        my $d = $yenta->get($_);
        $d = $d ? decode_json($d) : {};
        # convert space seperated locations to arrayref
        $d->{location} = [ (split /\s+/, $d->{location}) ];
    } $yenta->getrange($start, undef);	# get all files from $start to now

    return \@files;


eg/mrgamoo  view on Meta::CPAN

# Function: mrmagoo job submit client
# $Id: mrgamoo,v 1.1 2010/11/01 19:02:57 jaw Exp $

my %opt;
use Getopt::Long qw(:config no_ignore_case);
use AC::MrMagoo::Client;
use strict;

          ) || exit;

my $mrm = AC::MrMagoo::Client->new( $ARGV[0],  \%opt );

if( $opt{check} ){

$mrm->open_console() if $opt{console};

# run job
my $id = $mrm->submit( seedlist() );
die "could not run job\n" unless $id;

print STDERR "job: $id\n" if $opt{verbose};
$SIG{INT} = $SIG{QUIT} = sub{ $mrm->abort(); exit; };
$mrm->run_console() if $opt{console};



sub seedlist {
    # determine list of servers to try
    return '';

eg/mrgamood  view on Meta::CPAN

my @saved_argv = @ARGV;

getopts('c:dDfp:', \%OPT) || die "usage...\n";
# -c config file
# -d    enable all debugging
# -f    foreground
# -p port

my $mrm = AC::MrMagoo::D->new(
    class_filelist	=> 'My::Code::FileList',
    class_readinput	=> 'My::Code::ReadInput',
    class_myself	=> 'My::Code::MySelf',

$mrm->daemon( $OPT{c}, {
    argv	=> \@saved_argv,
    foreground	=> $OPT{f},
    debugall	=> $OPT{d},
    port	=> $OPT{p},
} );

eg/  view on Meta::CPAN

# $Id:,v 1.1 2010/11/01 19:04:21 jaw Exp $

package Local::MrGamoo::MySelf;
use Sys::Hostname;
use strict;


sub init {
    my $class = shift;
    my $port  = shift;	# our tcp port
    my $id    = shift;  # from cmd line

    $SERVERID = $id;
    unless( $SERVERID ){
        (my $h = hostname()) =~ s/\;	# remove domain
        $SERVERID = "mrm/$h";
    verbose("system persistent-id: $SERVERID");

sub my_server_id {
    return $SERVERID;


eg/  view on Meta::CPAN

# $Id:,v 1.1 2010/11/01 19:04:22 jaw Exp $

package Local::MrMagoo::ReadInput;
use AC::MrMagoo::User;
use JSON;
use strict;

our $R;		# exported by AC::MrMagoo::User

sub readinput {
    my $fd = shift;	# file handle

    # our file is newline delimted json data

    # read next line
    my $line = scalar <$fd>;
    # end of file?
    return (undef, 1) unless defined $line;

    my $d = json_decode($line);

    # filter input on date range. we could just as easily filter
    # in 'map', but doing it here, behind the scenes, keeps things
    # simpler for the jr. developers writing reports.

    return (undef, 0) if $d->{tstart} <  $R->config('start');
    return (undef, 0) if $d->{tstart} >= $R->config('end');

    return ($d, 0);


lib/AC/  view on Meta::CPAN

use strict;

our $VERSION = 1.0;

=head1 NAME

AC::MrGamoo - Map/Reduce Framework


    use AC::MrGamoo::D;
    use strict;

    my $m = AC::MrGamoo::D->new( );

    $m->daemon( $configfile, {
      argv		=> \@ARGV,
      foreground	=> $OPT{f},
      debugall		=> $OPT{d},
      port		=> $OPT{p},
    } );



various parameters need to be specified in a config file.
if you modify the file, it will be reloaded automagically.

=over 4

=item port

specify the TCP port to use

    port 3504

=item environment

specify the environment or realm to run in, so you can run multiple
independent map/reduce networks, such as production, staging, and dev.

    environment prod

=item allow

specify networks allowed to connect.


=item seedpeer

specify initial peers to contact when starting. the author generally
specifies 2 on the east coast, and 2 on the west coast.


=item secret

specify a secret key used to encrypt data transfered between
systems in different datacenters.

    secret squeamish-ossifrage

=item syslog

specify a syslog facility for log messages.

    syslog local5

=item basedir

local directory to store files

    basedir         /home/data

=item debug

enable debugging for a particular section

    debug job


=head1 BUGS

Too many to list here.

=head1 SEE ALSO


=head1 AUTHOR

    Jeff Weisberg -



lib/AC/MrGamoo/AC/  view on Meta::CPAN

# return an array of:
#   {
#     filename    => www/2010/01/17/23/5943_prod_5x2N5qyerdeddsNi
#     location    => [ scrib@a2be021bd31c, scrib@a2be021ad31c ]
#     size        => 10863
#     [anything else]
#   }

# convert legacy scriblr ids
my %CONVERT = (
    'scrib@a2be021ad31c' => 'mrm@gefiltefish1-r3.ccsphl',
    'scrib@a2be021bd31c' => 'mrm@gefiltefish1-r4.ccsphl',
    'scrib@a2be021cd31c' => 'mrm@gefiltefish2-r3.ccsphl',
    'scrib@a2be021dd31c' => 'mrm@gefiltefish2-r4.ccsphl',
    'scrib@a2be021ed31c' => 'mrm@gefiltefish3-r3.ccsphl',
    'scrib@a2be021fd31c' => 'mrm@gefiltefish3-r4.ccsphl',
    'scrib@a2be0220d31c' => 'mrm@gefiltefish4-r3.ccsphl',
    'scrib@a2be0221d31c' => 'mrm@gefiltefish4-r4.ccsphl',

sub get_file_list {
    my $config = shift;

    my $yenta = AC::Yenta::Direct->new( 'logfile', $YDBFILE );

    my $mode  = $config->{datamode};
    my $syst  = $config->{system};
    my $tmax  = $config->{end};
    my $tmin  = $config->{start};
    my $start = isotime($tmin);
    $start =~ s/^(\d+)T(\d+).*/$1$2/;	# 20091109T123456... => 20091109123456

    # NB: keys in the yenta logfile map are of the form: 20100126150139_eqaB5uSerdeddsOw

    $syst = undef if $syst eq '*';
    $mode = undef if $mode eq '*';
    $syst =~ s/[ ,]/\|/g;
    if( $syst ){
        $syst = qr/^($syst)$/;

    debug("mode=$mode, syst=$syst, tmin=$tmin, tmax=$tmax, start=$start");
    my @files = grep {
        (!$mode || ($_->{environment} eq $mode)) &&
        (!$syst || ($_->{subsystem}   =~ $syst)) &&
        ($_->{end_time}    >= $tmin) &&
        ($_->{start_time}  <= $tmax)
    } map {
        #debug("file: $_");
        my $d = $yenta->get($_);
        $d = $d ? decode_json($d) : {};
        $d->{location} = [ map { $CONVERT{$_} || $_ } (split /\s+/, $d->{location}) ];
    } $yenta->getrange($start, undef);

    debug("found " .scalar(@files)." files");
    return \@files;


lib/AC/MrGamoo/AC/  view on Meta::CPAN

package AC::MrGamoo::AC::MySelf;
use AC::MrGamoo::Config;
use AC::MrGamoo::Debug;
use AC::DataCenter;	# provides my_network_info, my_datacenter
use Sys::Hostname;
use strict;


sub init {
    my $class = shift;
    my $port  = shift;	# not used
    my $id    = shift;

    $SERVERID = $id;
    unless( $SERVERID ){
        (my $h = hostname()) =~ s/\.adcopy.*//;
        my $v = conf_value('environment');
        $SERVERID = 'mrm';
        $SERVERID .= '/' . $v unless $v eq 'prod';
        $SERVERID .= '@' . $h;
    verbose("system persistent-id: $SERVERID");

sub my_server_id {
    return $SERVERID;


lib/AC/MrGamoo/AC/  view on Meta::CPAN

package AC::MrGamoo::AC::ReadInput;
use AC::Logfile;
use AC::Daemon;
use AC::MrGamoo::User;
use strict;

our $R;		# exported by AC::MrGamoo::User

sub readinput {
    my $fd = shift;

    my $line = scalar <$fd>;
    return (undef, 1) unless defined $line;

    my $d;
    eval { $d = parse_dancr_log($line); };
    if( $@ ){
        problem("cannot parse data in (" . $R->config('current_file') . "). cannot process\n");
        return ;

    # filter input on date range. we could just as easily filter
    # in 'map', but doing here, behind the scenes, keeps things
    # simpler for the jr. developers writing reports.
    return if $d->{tstart} <  $R->config('start');
    return if $d->{tstart} >= $R->config('end');

    return ($d, 0);


lib/AC/MrGamoo/API/  view on Meta::CPAN

# $Id:,v 1.1 2010/11/01 18:41:50 jaw Exp $

package AC::MrGamoo::API::Chk;
use AC::MrGamoo::Debug 'api_del';
use AC::MrGamoo::API::Simple;
use AC::MrGamoo::Scriblr;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    my $file = filename($req->{filename});

    if( $file && -f $file ){
        reply( 500, 'Error', $io, $proto, $req );
        reply( 200, 'OK', $io, $proto, $req );


lib/AC/MrGamoo/API/  view on Meta::CPAN

use AC::MrGamoo::Protocol;
use AC::DC::IO::TCP::Client;
use Socket;
our @ISA = 'AC::DC::IO::TCP::Client';

use strict;

my $TIMEOUT  = 15;

sub new {
    my $class = shift;
    my $addr  = shift;
    my $port  = shift;
    my $info  = shift;
    my $req   = shift;
    my $data  = shift;

    debug("new client type: $req->{type} to $addr:$port");
    my $send = AC::MrGamoo::Protocol->encode_request( $req, $data );
    my $me   = $class->SUPER::new( $addr, $port,
                                 info	 => "client $req->{type} to $addr:$port; $info",
                                 request => $send,

    return $me;

sub start {
    my $me = shift;

    $me->set_callback('timeout',  \&_timeout);
    $me->set_callback('read',     \&_read);
    $me->set_callback('shutdown', \&_shutdown);

    $me->write( $me->{request} );

    return $me;

sub _timeout {
    my $me = shift;

sub _shutdown {
    my $me = shift;

    if( $me->{status_ok} ){
        $me->run_callback('on_success', { result => $me->{result} } );

sub _read {
    my $me  = shift;
    my $evt = shift;

    debug("recvd reply to $me->{info}");

    my($proto, $data, $content) = read_protocol_no_content( $me, $evt );
    return unless $proto;

    # check response
    if( $proto->{is_error} ){
        return $me->_uhoh("rcvd error response");

    $proto->{data} = AC::MrGamoo::Protocol->decode_reply($proto, $data);
    debug("recvd reply to $me->{info} - $proto->{data}{status_code} $proto->{data}{status_message}");

    if( $proto->{data}{status_code} != 200 ){
        return $me->_uh_oh("recvd error reply $proto->{data}{status_code} $proto->{data}{status_message}");

    $me->{result}    = $proto;
    $me->{status_ok} = 1;

sub _uh_oh {
    my $me  = shift;
    my $msg = shift;

    debug("error $msg");
    $me->run_callback('error', { error => $msg } );


lib/AC/MrGamoo/API/  view on Meta::CPAN

# $Id:,v 1.1 2010/11/01 18:41:51 jaw Exp $

package AC::MrGamoo::API::Del;
use AC::MrGamoo::Debug 'api_del';
use AC::MrGamoo::API::Simple;
use AC::MrGamoo::Scriblr;
use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    # validate filename
    my $file = filename($req->{filename});
    debug("deleting file $file");

    if( $file && -f $file ){
        unlink $file;

    if( -f $file ){
        reply( 500, 'Error', $io, $proto, $req );
        reply( 200, 'OK', $io, $proto, $req );


lib/AC/MrGamoo/API/  view on Meta::CPAN

use AC::MrGamoo::API::Simple;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::Scriblr;
use AC::SHA1File;
use Fcntl;
use POSIX;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    return unless $proto->{want_reply};
    in_background( \&_get_file, $io, $proto, $req, $content );

sub _get_file {
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    my $file = filename($req->{filename});
    my $fd = $io->{fd};
    fcntl($fd, F_SETFL, 0);	# unset nbio

    return nbfd_reply(404, "not found", $fd, $proto, $req) unless -f $file;
    open(F, $file) || return nbfd_reply(500, 'error', $fd, $proto, $req);
    my $size = (stat($file))[7];
    my $sha1 = sha1_file($file);

    debug("get file '$file' size $size");

    # send header
    my $gb  = ACPScriblReply->encode( { status_code => 200, status_message => 'OK', hash_sha1 => $sha1 } );
    my $hdr = AC::MrGamoo::Protocol->encode_header(
        type		=> $proto->{type},
        msgidno		=> $proto->{msgidno},
        is_reply	=> 1,
        data_length	=> length($gb),
        content_length	=> $size,
    my $buf = $hdr . $gb;

    syswrite( $fd, $buf );

    # stream
    AC::MrGamoo::Protocol->sendfile($fd, \*F, $size);


lib/AC/MrGamoo/API/  view on Meta::CPAN

use AC::MrGamoo::MySelf;

use Sys::Hostname;

require 'AC/protobuf/';
use strict;

my $HOSTNAME = hostname();

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    unless( $proto->{want_reply} ){

    my $response = AC::MrGamoo::Protocol->encode_reply( {
        type            => 'heartbeat_request',
        msgid           => $proto->{msgid},
        is_reply        => 1,
    }, {
        status_code	=> 200,
        status_message	=> 'Honky Dory',
        hostname	=> $HOSTNAME,
        subsystem	=> 'mrgamoo',
        environment	=> conf_value('environment'),
        port		=> my_port(),
        timestamp	=> time(),
        sort_metric	=> loadave(),
        server_id	=> my_server_id(),
        process_id	=> $$,
    } );

    debug("sending hb reply");
    $io->write_and_shut( $response );



lib/AC/MrGamoo/API/  view on Meta::CPAN

package AC::MrGamoo::API::JobAbort;
use AC::MrGamoo::Debug 'api_job';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;

use AC::MrGamoo::API::Simple;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    debug("abort job $req->{jobid}");

    my $r = AC::MrGamoo::Job->abort( jobid => $req->{jobid} );

    if( $r ){
        reply( 200, 'OK', $io, $proto, $req );
        reply( 500, 'Error', $io, $proto, $req );


lib/AC/MrGamoo/API/  view on Meta::CPAN

package AC::MrGamoo::API::JobCreate;
use AC::MrGamoo::Debug 'api_job';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;

use AC::MrGamoo::API::Simple;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    debug("new job $req->{jobid}");

    if( $req->{console} =~ /^:/ ){
        # fill in ip addr
        $req->{console} = $io->{from_ip} . $req->{console};

    my $x = AC::MrGamoo::Job->new( %$req );
    my $r = $x ? $x->start() : undef;

    if( $r ){
        reply( 200, 'OK', $io, $proto, $req );
        reply( 500, 'Error', $io, $proto, $req );


lib/AC/MrGamoo/API/  view on Meta::CPAN

use AC::MrGamoo::API::Simple;
use AC::SHA1File;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::Scriblr;
use File::Path;
use POSIX;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    if( conf_value('scriblr') =~ /no|off/i ){
        reply( 500, 'Error', $io, $proto, $req );

    in_background( \&_put_file, $io, $proto, $req, $content );

sub _put_file {
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    my $file = filename($req->{filename});
    my $fd = $io->{fd};
    fcntl($fd, F_SETFL, 0);	# unset nbio

    my($dir) = $file =~ m|^(.+)/[^/]+$|;

    # mkpath
    eval{ mkpath($dir, undef, 0755) };

    # open tmp
    my $tmp = "$file.tmp";
    unless( open(F, "> $tmp") ){
        problem("open file failed: $!");
        return nbfd_reply(500, 'error', $fd, $proto, $req);

    # read + write
    my $size = $proto->{content_length};
    my $sha1 = $req->{hash_sha1};

    verbose("put file '$file' size $size");

    if( $content ){
        syswrite( F, $content );
        $size -= length($content);

    eval {
        my $chk = AC::MrGamoo::Protocol->sendfile(\*F, $fd, $size, 10);
        close F;
        die "file size mismatch\n" unless (stat($tmp))[7] == $proto->{content_length};
        die "SHA1 check failed\n" if $sha1 && $sha1 ne $chk;
    if(my $e = $@){
        unlink $tmp;
        verbose("error: $e");
        nbfd_reply(500, 'error', $fd, $proto, $req);

    rename $tmp, $file;

    nbfd_reply(200, 'OK', $fd, $proto, $req);


lib/AC/MrGamoo/API/  view on Meta::CPAN

use AC::MrGamoo::Debug 'api';
use AC::Import;
use POSIX;

require 'AC/protobuf/';
use strict;

our @EXPORT = qw(reply on_success on_failure in_background nbfd_reply);

sub reply {
    my $code    = shift;
    my $msg     = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;

    unless( $proto->{want_reply} ){

    my $response = AC::MrGamoo::Protocol->encode_reply( {
        type            => $proto->{type},
        msgid           => $proto->{msgid},
        is_reply        => 1,
    }, {
        status_code	=> $code,
        status_message	=> $msg,
    } );

    debug("sending $code reply for $proto->{type} on $io->{info}");
    $io->write_and_shut( $response );

sub nbfd_reply {
    my $code    = shift;
    my $msg     = shift;
    my $fd      = shift;
    my $proto   = shift;
    my $req     = shift;

    return unless $proto->{want_reply};

    my $response = AC::MrGamoo::Protocol->encode_reply( {
        type            => $proto->{type},
        msgid           => $proto->{msgid},
        is_reply        => 1,
    }, {
        status_code	=> $code,
        status_message	=> $msg,
    } );

    debug("sending $code reply for $proto->{type} (from bkg)");
    syswrite( $fd, $response );

sub on_success {
    my $x  = shift;
    my $e  = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;

    reply( 200, 'OK', $io, $proto, $req );

sub on_failure {
    my $x  = shift;
    my $e  = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;

    reply( 500, 'Error', $io, $proto, $req );

sub in_background {
    my $func    = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;

    my $pid = fork();

    if( !defined($pid) ){
        problem("cannot fork: $!");
        reply( 500, 'Error', $io, $proto, $req );
        return ;
    }elsif( $pid ){
        # parent
        waitpid $pid, 0;
        # child
        my $gpid = fork();

        if( $gpid ){
            # parent
            # orphaned child
            eval {
                $func->($io, $proto, $req, @_);
            if(my $e = $@){
                chomp $e;
                verbose("child error: $e");


lib/AC/MrGamoo/API/  view on Meta::CPAN

package AC::MrGamoo::API::TaskAbort;
use AC::MrGamoo::Debug 'api_task';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;

use AC::MrGamoo::API::Simple;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    debug("abort task $req->{jobid}/$req->{taskid}");

    my $r = AC::MrGamoo::Task->abort( jobid => $req->{jobid}, taskid => $req->{taskid} );

    if( $r ){
        reply( 200, 'OK', $io, $proto, $req );
        reply( 500, 'Error', $io, $proto, $req );


lib/AC/MrGamoo/API/  view on Meta::CPAN

package AC::MrGamoo::API::TaskCreate;
use AC::MrGamoo::Debug 'api_task';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;

use AC::MrGamoo::API::Simple;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    debug("new task $req->{jobid}/$req->{taskid}");
    my $x = AC::MrGamoo::Task->new( %$req );
    my $r = $x ? $x->start() : undef;

    if( $r ){
        reply( 200, 'OK', $io, $proto, $req );
        reply( 500, 'Error', $io, $proto, $req );


lib/AC/MrGamoo/API/  view on Meta::CPAN

package AC::MrGamoo::API::TaskStatus;
use AC::MrGamoo::Debug 'api_job';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;

use AC::MrGamoo::API::Simple;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    debug("updating task status $req->{jobid}/$req->{taskid}");

    my $r = AC::MrGamoo::Job->task_status( %$req );

    if( $r ){
        reply( 200, 'OK', $io, $proto, $req );
        reply( 500, 'Error', $io, $proto, $req );


lib/AC/MrGamoo/API/  view on Meta::CPAN

use AC::MrGamoo::Protocol;
use AC::MrGamoo::PeerList;

use AC::MrGamoo::API::Simple;

use strict;

my $MSGID = $$;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    # validate filename
    if( $req->{filename} =~ m%/\.|^\.% ){
        reply( 500, 'Error', $io, $proto, $req );

    # new retry
    debug("starting file xfer $req->{copyid} => $req->{filename}");

    # start working on the copy
    my $x = AC::MrGamoo::Retry->new(
        newobj	=> \&_mk_xfer,
        newargs => [ $req ],
        tryeach	=> $req->{location},

    # reply now
    if( $x ){
        reply( 200, 'OK', $io, $proto, $req );
        debug("sending error, xfer/retrier failed, $io->{info}");
        reply( 501, 'Error', $io, $proto, $req );

    # send status when finished
    $x->set_callback('on_success', \&_yippee, $proto, $req);
    $x->set_callback('on_failure', \&_boohoo, $proto, $req);

    # start

sub _mk_xfer {
    my $loc  = shift;
    my $req  = shift;

    my $x = AC::MrGamoo::Xfer->new(
        $req->{filename}, ($req->{dstname} || $req->{filename}), $loc, $req,

    return $x;


sub _yippee {
    my $x  = shift;
    my $e  = shift;
    my $proto = shift;
    my $req   = shift;

    tell_master( $req, 200, 'OK' );

sub _boohoo {
    my $x  = shift;
    my $e  = shift;
    my $proto = shift;
    my $req   = shift;

    debug("boohoo - xfer failed $req->{copyid}");
    tell_master( $req, 500, 'Failed' );

sub tell_master {
    my $req   = shift;
    my $code  = shift;
    my $msg   = shift;

    my($addr, $port) = get_peer_addr_from_id( $req->{master} );
    debug("sending xfer status update for $req->{copyid} => $code => $req->{master}");
    debug("cannot find addr") unless $addr;
    return unless $addr;

    my $x = AC::MrGamoo::API::Client->new( $addr, $port, "xfer $req->{copyid}", {
        type		=> 'mrgamoo_xferstatus',
        msgidno		=> $MSGID++,
        want_reply	=> 1,
    }, {
        jobid		=> $req->{jobid},
        copyid		=> $req->{copyid},
        status_code	=> $code,
        status_message	=> $msg,
    } );

    debug("cannot create client") unless $x;
    return unless $x;

    # we don't need any reply or reply callbacks. just send + forget



lib/AC/MrGamoo/API/  view on Meta::CPAN

package AC::MrGamoo::API::XferStatus;
use AC::MrGamoo::Debug 'api_job';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;

use AC::MrGamoo::API::Simple;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    debug("updating xfer status $req->{jobid}/$req->{copyid}");

    my $r = AC::MrGamoo::Job->xfer_status( %$req );

    if( $r ){
        reply( 200, 'OK', $io, $proto, $req );
        reply( 500, 'Error', $io, $proto, $req );


lib/AC/MrGamoo/  view on Meta::CPAN

# $Id:,v 1.1 2010/11/01 18:41:39 jaw Exp $

package AC::MrGamoo::About;
use AC::Import;
use strict;

our @EXPORT = 'my_port';
my $port;

sub init {
    my $class = shift;
    $port = shift;

sub my_port { $port }


lib/AC/MrGamoo/  view on Meta::CPAN

use Sys::Hostname;
use Socket;

require 'AC/protobuf/';
require 'AC/protobuf/';
require 'AC/protobuf/';
use strict;

sub new {
    my $class = shift;
    my $from = shift;	# file | text
    my $src  = shift;
    my $cfg  = shift;

    my $host = hostname();
    my $user = getpwuid($<);
    my $trace = "$user/$$\@$host:" . ($from eq 'file' ? $src : 'text');

    my $me   = bless {
        traceinfo	=> $trace,
    }, $class;
    $me->{fdebug} = $cfg->{debug} ? sub{ print STDERR "@_\n" } : sub {};

    # compile job
    my $mr = AC::MrGamoo::Submit::Compile->new( $from => $src );
    $me->{program} = $mr;

    # merge job %config section with passed in config

    return $me;

sub get_config_param {
    my $me = shift;


sub set_config_param {
    my $me = shift;


sub open_console {
    my $me = shift;

    my $fd;
    socket($fd, PF_INET, SOCK_DGRAM, 0);
    bind($fd, sockaddr_in(0, INADDR_ANY));
    my $s = getsockname($fd);
    my($port, $addr) = sockaddr_in($s);

    $me->{console_fd}   = $fd;
    $me->{console_port} = $port;

sub run_console {
    my $me = shift;
    my $fd = $me->{console_fd};

        my $buf;
        recv $fd, $buf, 65535, 0;
        my $proto = AC::MrGamoo::Protocol->decode_header($buf);
        my $data  = substr($buf, AC::MrGamoo::Protocol->header_size());
        my $req   = AC::MrGamoo::Protocol->decode_request($proto, $data);
        last if $req->{type} eq 'finish';
        print STDERR "$req->{msg}"                        if $req->{type} eq 'stderr';
        print "$req->{msg}"                               if $req->{type} eq 'stdout';
        $me->{fdebug}->("$req->{server_id}\t$req->{msg}") if $req->{type} eq 'debug';

sub submit {
    my $me   = shift;
    my $seed = shift;	# [ "ipaddr:port", ... ]

    my $mr = $me->{program};
    my $r = AC::MrGamoo::Submit::Request->new( $mr );
    $r->{eu_print_stderr} = sub { print STDERR "@_\n" };
    $r->{eu_print_stdout} = sub { print STDERR "@_\n" };

    # run init section
    my $h_init   = $mr->get_code( 'init' );
    my $initres  = ($h_init ? $h_init->{code}() : undef) || {};

    $me->{id} = unique();
    my $req = AC::MrGamoo::Protocol->encode_request( {
        type		=> 'mrgamoo_jobcreate',
        msgidno		=> $^T,
        want_reply	=> 1,
        jobid		=> $me->{id},
        options		=> to_json( $r->{config} ),
        initres		=> to_json( $initres, {allow_nonref => 1} ),
        jobsrc		=> $mr->src(),
        console		=> ($me->{console_port} ? ":$me->{console_port}" : ''),
        traceinfo	=> $me->{traceinfo},
    } );

    my $ok;
    if( my $master = $me->get_config_param('master') ){
        # use specified master (for debugging)
        my($addr, $port) = split /:/, $master;
        $me->_submit_to( $addr, $port, $req );
        $me->{master} = { addr => $addr, port => $port };
        $ok = 1;
        # pick server
        $ok = $me->_pick_master_and_send( $req, $seed );

    return $ok ? $me->{id} : undef;

sub abort {
    my $me = shift;

    return unless $me->{master};
    my $res = $me->_submit_to( $me->{master}{addr}, $me->{master}{port}, AC::MrGamoo::Protocol->encode_request( {
        type		=> 'mrgamoo_jobabort',
        msgidno		=> $^T,
        want_reply	=> 1,
    }, {
        jobid		=> $me->{id},



sub _pick_master_and_send {
    my $me   = shift;
    my $req  = shift;
    my $seed = shift;

    my @serverlist;

    my $listreq = AC::MrGamoo::Protocol->encode_request( {
        type		=> 'mrgamoo_status',
        msgidno		=> $^T,
        want_reply	=> 1,
    }, {});

    # get the full list of servers
    # contact each seed passed in above, until we get a reply
    for my $s ( @$seed ){
        my($addr, $port) = split /:/, $s;
        $me->{fdebug}->("attempting to fetch server list from $addr:$port");
        eval {
            my $reply = AC::MrGamoo::Protocol->send_request( inet_aton($addr), $port, $listreq, $me->{fdebug} );
            my $res   = AC::MrGamoo::Protocol->decode_reply($reply);
            my $list = $res->{status};
            @serverlist = @$list if $list && @$list;
        last if @serverlist;

    # sort+filter list
    @serverlist = sort { ($a->{sort_metric} <=> $b->{sort_metric}) || int(rand(3)) - 1 }
      grep { $_->{status} == 200 } @serverlist;

    # try all addresses
    # RSN - sort addresslist in a Peers::pick_best_addr_for_peer() like manner?

    my @addrlist = map { @{$_->{ip}} } @serverlist;

    for my $ip (@addrlist){
        my $addr = inet_itoa($ip->{ipv4});
        my $res;
        eval {
            $res = $me->_submit_to( $addr, $ip->{port}, $req );
        next unless $res && $res->{status_code} == 200;
        $me->{master} = { addr => $addr, port => $ip->{port} };
        return 1;
    return ;

sub _submit_to {
    my $me   = shift;
    my $addr = shift;
    my $port = shift;
    my $req  = shift;

    $me->{fdebug}->("sending job to $addr:$port");
    my $reply = AC::MrGamoo::Protocol->send_request( inet_aton($addr), $port, $req, $me->{fdebug}, 120 );
    my $res   = AC::MrGamoo::Protocol->decode_reply($reply);

    return $res;


sub check_code {
    my $me = shift;

    my $mr = $me->{program};
    my $nr = @{ $mr->{content}{reduce} };

    $me->_check('reduce', $_) for (0 .. $nr - 1);

    return 1;

sub _check {
    my $me = shift;
    my $mr = $me->{program};

    my $prog = $mr->compile(@_);
    eval "sub $prog";
    die $@ if $@;


lib/AC/MrGamoo/  view on Meta::CPAN

use AC::ConfigFile::Simple;
use Socket;
use strict;

our @ISA = 'AC::ConfigFile::Simple';
our @EXPORT = qw(conf_value);

my %CONFIG = (

    include	=> \&AC::ConfigFile::Simple::include_file,
    debug	=> \&AC::ConfigFile::Simple::parse_debug,
    allow	=> \&AC::ConfigFile::Simple::parse_allow,
    port	=> \&AC::ConfigFile::Simple::parse_keyvalue,
    environment => \&AC::ConfigFile::Simple::parse_keyvalue,
    basedir	=> \&AC::ConfigFile::Simple::parse_keyvalue,
    syslog	=> \&AC::ConfigFile::Simple::parse_keyvalue,
    seedpeer    => \&AC::ConfigFile::Simple::parse_keyarray,
    scriblr	=> \&AC::ConfigFile::Simple::parse_keyvalue,
    sortprog	=> \&AC::ConfigFile::Simple::parse_keyvalue,
    gzprog	=> \&AC::ConfigFile::Simple::parse_keyvalue,


sub handle_config {
    my $me   = shift;
    my $key  = shift;
    my $rest = shift;

    my $fnc = $CONFIG{$key};
    return unless $fnc;
    $fnc->($me, $key, $rest);
    return 1;


sub conf_value {
    my $key = shift;

    return $AC::MrGamoo::CONF->{config}{$key};


 view all matches for this distribution
 view release on metacpan -  search on metacpan

( run in 1.032 second using v1.00-cache-2.02-grep-82fe00e-cpan-2c419f77a38b )