AC-MrGamoo

 view release on metacpan or  search on metacpan

eg/example.mrjob  view on Meta::CPAN

    # return a key + a value
    return ( $data->{cmp}, 1 );
</%map>
%################################################################
<%reduce>
    my $key = shift;
    my $itr = shift;    # an iterator object

    # count
    my $n = 0;
    $itr->foreach( sub { $n ++ } );

    # return a key + a value
    return ($key, $n);
</%reduce>
%#
%# additional reduce blocks can go here
%#
%################################################################
%# final block runs once with the results of the previous reduce.
%# used to generate report or insert to db

eg/filelist.pm  view on Meta::CPAN

# $Id: filelist.pm,v 1.1 2010/11/01 19:04:21 jaw Exp $

package Local::MrMagoo::FileList;
use AC::ISOTime;
use AC::Yenta::Direct;
use JSON;
use strict;

my $YDBFILE = "/data/files.ydb";

sub get_file_list {
    my $config = shift;

    # get files + metadata from yenta
    my $yenta = AC::Yenta::Direct->new( 'files', $YDBFILE );

    # the job config is asking for files that match:
    my $syst  = $config->{system};
    my $tmax  = $config->{end};		# time_t
    my $tmin  = $config->{start};	# time_t

eg/mrgamoo  view on Meta::CPAN

die "could not run job\n" unless $id;

print STDERR "job: $id\n" if $opt{verbose};
$SIG{INT} = $SIG{QUIT} = sub{ $mrm->abort(); exit; };
$mrm->run_console() if $opt{console};

exit;

################################################################

sub seedlist {
    # determine list of servers to try
    return '10.200.2.3:3504';
}

eg/myself.pm  view on Meta::CPAN

# example myself

# $Id: myself.pm,v 1.1 2010/11/01 19:04:21 jaw Exp $

package Local::MrGamoo::MySelf;
use Sys::Hostname;
use strict;

my $SERVERID;

sub init {
    my $class = shift;
    my $port  = shift;	# our tcp port
    my $id    = shift;  # from cmd line

    $SERVERID = $id;
    unless( $SERVERID ){
        (my $h = hostname()) =~ s/\.example.com//;	# remove domain
        $SERVERID = "mrm/$h";
    }
    verbose("system persistent-id: $SERVERID");
}

sub my_server_id {
    return $SERVERID;
}

1;

eg/readinput.pm  view on Meta::CPAN


# $Id: readinput.pm,v 1.1 2010/11/01 19:04:22 jaw Exp $

package Local::MrMagoo::ReadInput;
use AC::MrMagoo::User;
use JSON;
use strict;

our $R;		# exported by AC::MrMagoo::User

sub readinput {
    my $fd = shift;	# file handle

    # our file is newline delimted json data

    # read next line
    my $line = scalar <$fd>;
    # end of file?
    return (undef, 1) unless defined $line;

    my $d = json_decode($line);

lib/AC/MrGamoo/AC/FileList.pm  view on Meta::CPAN

    'scrib@a2be021ad31c' => 'mrm@gefiltefish1-r3.ccsphl',
    'scrib@a2be021bd31c' => 'mrm@gefiltefish1-r4.ccsphl',
    'scrib@a2be021cd31c' => 'mrm@gefiltefish2-r3.ccsphl',
    'scrib@a2be021dd31c' => 'mrm@gefiltefish2-r4.ccsphl',
    'scrib@a2be021ed31c' => 'mrm@gefiltefish3-r3.ccsphl',
    'scrib@a2be021fd31c' => 'mrm@gefiltefish3-r4.ccsphl',
    'scrib@a2be0220d31c' => 'mrm@gefiltefish4-r3.ccsphl',
    'scrib@a2be0221d31c' => 'mrm@gefiltefish4-r4.ccsphl',
);

sub get_file_list {
    my $config = shift;

    my $yenta = AC::Yenta::Direct->new( 'logfile', $YDBFILE );

    my $mode  = $config->{datamode};
    my $syst  = $config->{system};
    my $tmax  = $config->{end};
    my $tmin  = $config->{start};
    my $start = isotime($tmin);
    $start =~ s/^(\d+)T(\d+).*/$1$2/;	# 20091109T123456... => 20091109123456

lib/AC/MrGamoo/AC/MySelf.pm  view on Meta::CPAN


package AC::MrGamoo::AC::MySelf;
use AC::MrGamoo::Config;
use AC::MrGamoo::Debug;
use AC::DataCenter;	# provides my_network_info, my_datacenter
use Sys::Hostname;
use strict;

my $SERVERID;

sub init {
    my $class = shift;
    my $port  = shift;	# not used
    my $id    = shift;

    $SERVERID = $id;
    unless( $SERVERID ){
        (my $h = hostname()) =~ s/\.adcopy.*//;
        my $v = conf_value('environment');
        $SERVERID = 'mrm';
        $SERVERID .= '/' . $v unless $v eq 'prod';
        $SERVERID .= '@' . $h;
    }
    verbose("system persistent-id: $SERVERID");
}

sub my_server_id {
    return $SERVERID;
}

1;

lib/AC/MrGamoo/AC/ReadInput.pm  view on Meta::CPAN

# $Id: ReadInput.pm,v 1.1 2010/11/01 18:41:49 jaw Exp $

package AC::MrGamoo::AC::ReadInput;
use AC::Logfile;
use AC::Daemon;
use AC::MrGamoo::User;
use strict;

our $R;		# exported by AC::MrGamoo::User

sub readinput {
    my $fd = shift;

    my $line = scalar <$fd>;
    return (undef, 1) unless defined $line;

    my $d;
    eval { $d = parse_dancr_log($line); };
    if( $@ ){
        problem("cannot parse data in (" . $R->config('current_file') . "). cannot process\n");
        return ;

lib/AC/MrGamoo/API/Chk.pm  view on Meta::CPAN

#
# $Id: Chk.pm,v 1.1 2010/11/01 18:41:50 jaw Exp $

package AC::MrGamoo::API::Chk;
use AC::MrGamoo::Debug 'api_del';
use AC::MrGamoo::API::Simple;
use AC::MrGamoo::Scriblr;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    my $file = filename($req->{filename});

    if( $file && -f $file ){
        reply( 500, 'Error', $io, $proto, $req );

lib/AC/MrGamoo/API/Client.pm  view on Meta::CPAN

use AC::MrGamoo::Debug 'client';
use AC::MrGamoo::Protocol;
use AC::DC::IO::TCP::Client;
use Socket;
our @ISA = 'AC::DC::IO::TCP::Client';

use strict;

my $TIMEOUT  = 15;

sub new {
    my $class = shift;
    my $addr  = shift;
    my $port  = shift;
    my $info  = shift;
    my $req   = shift;
    my $data  = shift;

    debug("new client type: $req->{type} to $addr:$port");
    my $send = AC::MrGamoo::Protocol->encode_request( $req, $data );
    my $me   = $class->SUPER::new( $addr, $port,
                                 info	 => "client $req->{type} to $addr:$port; $info",
                                 request => $send,
                                );

    return $me;
}

sub start {
    my $me = shift;

    $me->set_callback('timeout',  \&_timeout);
    $me->set_callback('read',     \&_read);
    $me->set_callback('shutdown', \&_shutdown);

    $me->SUPER::start();
    $me->timeout_rel($TIMEOUT);
    $me->write( $me->{request} );

    return $me;
}

sub _timeout {
    my $me = shift;
    $me->shut();
}

sub _shutdown {
    my $me = shift;

    if( $me->{status_ok} ){
        $me->run_callback('on_success', { result => $me->{result} } );
    }else{
        $me->run_callback('on_failure');
    }
}

sub _read {
    my $me  = shift;
    my $evt = shift;

    debug("recvd reply to $me->{info}");

    my($proto, $data, $content) = read_protocol_no_content( $me, $evt );
    return unless $proto;

    # check response
    if( $proto->{is_error} ){

lib/AC/MrGamoo/API/Client.pm  view on Meta::CPAN


    if( $proto->{data}{status_code} != 200 ){
        return $me->_uh_oh("recvd error reply $proto->{data}{status_code} $proto->{data}{status_message}");
    }

    $me->{result}    = $proto;
    $me->{status_ok} = 1;
    $me->shut();
}

sub _uh_oh {
    my $me  = shift;
    my $msg = shift;

    debug("error $msg");
    $me->run_callback('error', { error => $msg } );
    $me->shut();
}


1;

lib/AC/MrGamoo/API/Del.pm  view on Meta::CPAN

# Function: 
#
# $Id: Del.pm,v 1.1 2010/11/01 18:41:51 jaw Exp $

package AC::MrGamoo::API::Del;
use AC::MrGamoo::Debug 'api_del';
use AC::MrGamoo::API::Simple;
use AC::MrGamoo::Scriblr;
use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    # validate filename
    my $file = filename($req->{filename});
    debug("deleting file $file");

lib/AC/MrGamoo/API/Get.pm  view on Meta::CPAN

use AC::MrGamoo::Config;
use AC::MrGamoo::API::Simple;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::Scriblr;
use AC::SHA1File;
use Fcntl;
use POSIX;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    return unless $proto->{want_reply};
    in_background( \&_get_file, $io, $proto, $req, $content );
}

sub _get_file {
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    my $file = filename($req->{filename});
    my $fd = $io->{fd};
    fcntl($fd, F_SETFL, 0);	# unset nbio

    return nbfd_reply(404, "not found", $fd, $proto, $req) unless -f $file;

lib/AC/MrGamoo/API/HB.pm  view on Meta::CPAN

use AC::MrGamoo::About;
use AC::MrGamoo::MySelf;

use Sys::Hostname;

require 'AC/protobuf/heartbeat.pl';
use strict;

my $HOSTNAME = hostname();

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

lib/AC/MrGamoo/API/JobAbort.pm  view on Meta::CPAN


package AC::MrGamoo::API::JobAbort;
use AC::MrGamoo::Debug 'api_job';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;

use AC::MrGamoo::API::Simple;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;


    debug("abort job $req->{jobid}");

    my $r = AC::MrGamoo::Job->abort( jobid => $req->{jobid} );

lib/AC/MrGamoo/API/JobCreate.pm  view on Meta::CPAN


package AC::MrGamoo::API::JobCreate;
use AC::MrGamoo::Debug 'api_job';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;

use AC::MrGamoo::API::Simple;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    debug("new job $req->{jobid}");

    if( $req->{console} =~ /^:/ ){
        # fill in ip addr

lib/AC/MrGamoo/API/Put.pm  view on Meta::CPAN

use AC::MrGamoo::Config;
use AC::MrGamoo::API::Simple;
use AC::SHA1File;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::Scriblr;
use File::Path;
use POSIX;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    if( conf_value('scriblr') =~ /no|off/i ){
        reply( 500, 'Error', $io, $proto, $req );
        return;
    }

    in_background( \&_put_file, $io, $proto, $req, $content );
}

sub _put_file {
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    my $file = filename($req->{filename});
    my $fd = $io->{fd};
    fcntl($fd, F_SETFL, 0);	# unset nbio

    my($dir) = $file =~ m|^(.+)/[^/]+$|;

lib/AC/MrGamoo/API/Simple.pm  view on Meta::CPAN

use AC::MrGamoo::Protocol;
use AC::MrGamoo::Debug 'api';
use AC::Import;
use POSIX;

require 'AC/protobuf/std_reply.pl';
use strict;

our @EXPORT = qw(reply on_success on_failure in_background nbfd_reply);

sub reply {
    my $code    = shift;
    my $msg     = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;

    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

lib/AC/MrGamoo/API/Simple.pm  view on Meta::CPAN

        is_reply        => 1,
    }, {
        status_code	=> $code,
        status_message	=> $msg,
    } );

    debug("sending $code reply for $proto->{type} on $io->{info}");
    $io->write_and_shut( $response );
}

sub nbfd_reply {
    my $code    = shift;
    my $msg     = shift;
    my $fd      = shift;
    my $proto   = shift;
    my $req     = shift;

    return unless $proto->{want_reply};

    my $response = AC::MrGamoo::Protocol->encode_reply( {
        type            => $proto->{type},

lib/AC/MrGamoo/API/Simple.pm  view on Meta::CPAN

        is_reply        => 1,
    }, {
        status_code	=> $code,
        status_message	=> $msg,
    } );

    debug("sending $code reply for $proto->{type} (from bkg)");
    syswrite( $fd, $response );
}

sub on_success {
    my $x  = shift;
    my $e  = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;

    reply( 200, 'OK', $io, $proto, $req );
}

sub on_failure {
    my $x  = shift;
    my $e  = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;

    reply( 500, 'Error', $io, $proto, $req );
}


sub in_background {
    my $func    = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;

    my $pid = fork();

    if( !defined($pid) ){
        problem("cannot fork: $!");
        reply( 500, 'Error', $io, $proto, $req );

lib/AC/MrGamoo/API/TaskAbort.pm  view on Meta::CPAN


package AC::MrGamoo::API::TaskAbort;
use AC::MrGamoo::Debug 'api_task';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;

use AC::MrGamoo::API::Simple;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;


    debug("abort task $req->{jobid}/$req->{taskid}");

    my $r = AC::MrGamoo::Task->abort( jobid => $req->{jobid}, taskid => $req->{taskid} );

lib/AC/MrGamoo/API/TaskCreate.pm  view on Meta::CPAN


package AC::MrGamoo::API::TaskCreate;
use AC::MrGamoo::Debug 'api_task';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;

use AC::MrGamoo::API::Simple;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    debug("new task $req->{jobid}/$req->{taskid}");
    my $x = AC::MrGamoo::Task->new( %$req );
    my $r = $x ? $x->start() : undef;

lib/AC/MrGamoo/API/TaskStatus.pm  view on Meta::CPAN


package AC::MrGamoo::API::TaskStatus;
use AC::MrGamoo::Debug 'api_job';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;

use AC::MrGamoo::API::Simple;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;


    debug("updating task status $req->{jobid}/$req->{taskid}");

    my $r = AC::MrGamoo::Job->task_status( %$req );

lib/AC/MrGamoo/API/Xfer.pm  view on Meta::CPAN

use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::PeerList;

use AC::MrGamoo::API::Simple;

use strict;

my $MSGID = $$;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    # validate filename
    if( $req->{filename} =~ m%/\.|^\.% ){
        reply( 500, 'Error', $io, $proto, $req );
        return;

lib/AC/MrGamoo/API/Xfer.pm  view on Meta::CPAN

    }

    # send status when finished
    $x->set_callback('on_success', \&_yippee, $proto, $req);
    $x->set_callback('on_failure', \&_boohoo, $proto, $req);

    # start
    $x->start();
}

sub _mk_xfer {
    my $loc  = shift;
    my $req  = shift;

    my $x = AC::MrGamoo::Xfer->new(
        $req->{filename}, ($req->{dstname} || $req->{filename}), $loc, $req,
       );

    return $x;
}

################################################################

sub _yippee {
    my $x  = shift;
    my $e  = shift;
    my $proto = shift;
    my $req   = shift;

    tell_master( $req, 200, 'OK' );
}

sub _boohoo {
    my $x  = shift;
    my $e  = shift;
    my $proto = shift;
    my $req   = shift;

    debug("boohoo - xfer failed $req->{copyid}");
    tell_master( $req, 500, 'Failed' );
}

sub tell_master {
    my $req   = shift;
    my $code  = shift;
    my $msg   = shift;

    my($addr, $port) = get_peer_addr_from_id( $req->{master} );
    debug("sending xfer status update for $req->{copyid} => $code => $req->{master}");
    debug("cannot find addr") unless $addr;
    return unless $addr;

    my $x = AC::MrGamoo::API::Client->new( $addr, $port, "xfer $req->{copyid}", {

lib/AC/MrGamoo/API/XferStatus.pm  view on Meta::CPAN


package AC::MrGamoo::API::XferStatus;
use AC::MrGamoo::Debug 'api_job';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;

use AC::MrGamoo::API::Simple;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;


    debug("updating xfer status $req->{jobid}/$req->{copyid}");

    my $r = AC::MrGamoo::Job->xfer_status( %$req );

lib/AC/MrGamoo/About.pm  view on Meta::CPAN

#
# $Id: About.pm,v 1.1 2010/11/01 18:41:39 jaw Exp $

package AC::MrGamoo::About;
use AC::Import;
use strict;

our @EXPORT = 'my_port';
my $port;

sub init {
    my $class = shift;
    $port = shift;
}

sub my_port { $port }

1;

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

use JSON;
use Sys::Hostname;
use Socket;

require 'AC/protobuf/mrgamoo.pl';
require 'AC/protobuf/mrgamoo_status.pl';
require 'AC/protobuf/std_reply.pl';
use strict;


sub new {
    my $class = shift;
    my $from = shift;	# file | text
    my $src  = shift;
    my $cfg  = shift;

    my $host = hostname();
    my $user = getpwuid($<);
    my $trace = "$user/$$\@$host:" . ($from eq 'file' ? $src : 'text');

    my $me   = bless {
        traceinfo	=> $trace,
    }, $class;
    $me->{fdebug} = $cfg->{debug} ? sub{ print STDERR "@_\n" } : sub {};

    # compile job
    my $mr = AC::MrGamoo::Submit::Compile->new( $from => $src );
    $me->{program} = $mr;

    # merge job %config section with passed in config
    $mr->add_config($cfg);

    return $me;
}

sub get_config_param {
    my $me = shift;

    $me->{program}->get_config_param(@_);
}

sub set_config_param {
    my $me = shift;

    $me->{program}->set_config_param(@_);
}

sub open_console {
    my $me = shift;

    my $fd;
    socket($fd, PF_INET, SOCK_DGRAM, 0);
    bind($fd, sockaddr_in(0, INADDR_ANY));
    my $s = getsockname($fd);
    my($port, $addr) = sockaddr_in($s);

    $me->{console_fd}   = $fd;
    $me->{console_port} = $port;
}

sub run_console {
    my $me = shift;
    my $fd = $me->{console_fd};

    while(1){
        my $buf;
        recv $fd, $buf, 65535, 0;
        my $proto = AC::MrGamoo::Protocol->decode_header($buf);
        my $data  = substr($buf, AC::MrGamoo::Protocol->header_size());
        my $req   = AC::MrGamoo::Protocol->decode_request($proto, $data);
        last if $req->{type} eq 'finish';
        print STDERR "$req->{msg}"                        if $req->{type} eq 'stderr';
        print "$req->{msg}"                               if $req->{type} eq 'stdout';
        $me->{fdebug}->("$req->{server_id}\t$req->{msg}") if $req->{type} eq 'debug';
    }
}

sub submit {
    my $me   = shift;
    my $seed = shift;	# [ "ipaddr:port", ... ]

    my $mr = $me->{program};
    my $r = AC::MrGamoo::Submit::Request->new( $mr );
    $r->{eu_print_stderr} = sub { print STDERR "@_\n" };
    $r->{eu_print_stdout} = sub { print STDERR "@_\n" };

    # run init section
    my $h_init   = $mr->get_code( 'init' );
    my $initres  = ($h_init ? $h_init->{code}() : undef) || {};

    $me->{id} = unique();
    my $req = AC::MrGamoo::Protocol->encode_request( {
        type		=> 'mrgamoo_jobcreate',
        msgidno		=> $^T,
        want_reply	=> 1,

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

        $me->{master} = { addr => $addr, port => $port };
        $ok = 1;
    }else{
        # pick server
        $ok = $me->_pick_master_and_send( $req, $seed );
    }

    return $ok ? $me->{id} : undef;
}

sub abort {
    my $me = shift;

    return unless $me->{master};
    my $res = $me->_submit_to( $me->{master}{addr}, $me->{master}{port}, AC::MrGamoo::Protocol->encode_request( {
        type		=> 'mrgamoo_jobabort',
        msgidno		=> $^T,
        want_reply	=> 1,
    }, {
        jobid		=> $me->{id},
    }));

}

################################################################

sub _pick_master_and_send {
    my $me   = shift;
    my $req  = shift;
    my $seed = shift;

    my @serverlist;

    my $listreq = AC::MrGamoo::Protocol->encode_request( {
        type		=> 'mrgamoo_status',
        msgidno		=> $^T,
        want_reply	=> 1,

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

            $res = $me->_submit_to( $addr, $ip->{port}, $req );
            alarm(0);
        };
        next unless $res && $res->{status_code} == 200;
        $me->{master} = { addr => $addr, port => $ip->{port} };
        return 1;
    }
    return ;
}

sub _submit_to {
    my $me   = shift;
    my $addr = shift;
    my $port = shift;
    my $req  = shift;

    $me->{fdebug}->("sending job to $addr:$port");
    my $reply = AC::MrGamoo::Protocol->send_request( inet_aton($addr), $port, $req, $me->{fdebug}, 120 );
    my $res   = AC::MrGamoo::Protocol->decode_reply($reply);

    return $res;
}

################################################################

sub check_code {
    my $me = shift;

    my $mr = $me->{program};
    my $nr = @{ $mr->{content}{reduce} };

    $me->_check('map');
    $me->_check('reduce', $_) for (0 .. $nr - 1);
    $me->_check('final');

    return 1;
}

sub _check {
    my $me = shift;
    my $mr = $me->{program};

    my $prog = $mr->compile(@_);
    eval "sub $prog";
    die $@ if $@;
}


1;

lib/AC/MrGamoo/Config.pm  view on Meta::CPAN

    seedpeer    => \&AC::ConfigFile::Simple::parse_keyarray,
    scriblr	=> \&AC::ConfigFile::Simple::parse_keyvalue,
    sortprog	=> \&AC::ConfigFile::Simple::parse_keyvalue,
    gzprog	=> \&AC::ConfigFile::Simple::parse_keyvalue,
);



################################################################

sub handle_config {
    my $me   = shift;
    my $key  = shift;
    my $rest = shift;

    my $fnc = $CONFIG{$key};
    return unless $fnc;
    $fnc->($me, $key, $rest);
    return 1;
}

################################################################

sub conf_value {
    my $key = shift;

    return $AC::MrGamoo::CONF->{config}{$key};
}


1;

lib/AC/MrGamoo/Customize.pm  view on Meta::CPAN

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-26 10:37 (EST)
# Function: connect user provided implementation
#
# $Id: Customize.pm,v 1.1 2010/11/01 18:41:40 jaw Exp $

package AC::MrGamoo::Customize;
use strict;

sub customize {
    my $class  = shift;
    my $implby = shift;

    (my $default = $class) =~ s/(.*)::([^:]+)$/$1::Default::$2/;

    # load user's implemantation + default
    for my $p ($implby, $default){
        eval "require $p" if $p;
        die $@ if $@;
    }

lib/AC/MrGamoo/D.pm  view on Meta::CPAN

use AC::MrGamoo::Submit::Request;

use AC::Daemon;
use AC::Misc;

require 'AC/protobuf/mrgamoo.pl';
require 'AC/protobuf/std_reply.pl';
use strict;


sub new {
    my $class = shift;
    my %p = @_;

    AC::MrGamoo::MySelf->customize(    $p{class_myself} );
    AC::MrGamoo::FileList->customize(  $p{class_filelist} );
    AC::MrGamoo::ReadInput->customize( $p{class_readinput} );
    # ...

    return bless \$class, $class;
}

sub daemon {
    my $me    = shift;
    my $cfile = shift;
    my $opt   = shift;	# foreground, debugall, persistent_id, argv

    die "no config file specified\n" unless $cfile;

    # configure

    $AC::MrGamoo::CONF = AC::MrGamoo::Config->new(
        $cfile, onreload => sub {},
       );

    initlog( 'mrgamoo', (conf_value('syslog') || 'local4'), $opt->{debugall} );
    AC::MrGamoo::Debug->init( $opt->{debugall}, $AC::MrGamoo::CONF );

    daemonize(5, 'mrgamood', $opt->{argv}) unless $opt->{foreground};
    verbose("starting.");

    $SIG{CHLD} = $SIG{PIPE} = sub {};        				# ignore
    $SIG{INT}  = $SIG{TERM} = $SIG{QUIT} = \&AC::DC::IO::request_exit;  # abort

    # initialize subsystems

    my $port = $opt->{port} || conf_value('port');

    AC::MrGamoo::About->init( $port );
    AC::MrGamoo::MySelf->init( $port, $opt->{persistent_id} );
    AC::DC::IO::TCP::Server->new( $port, 'AC::MrGamoo::Server' );

    # start "cronjobs"
    AC::DC::Sched->new(
        info	=> 'check config files',
        freq	=> 30,
        func	=> sub { $AC::MrGamoo::CONF->check() },
       );

    run_and_watch(
        ($opt->{foreground} || $opt->{debugall}),
        \&AC::DC::IO::mainloop,
       );
}


1;

lib/AC/MrGamoo/Default/FileList.pm  view on Meta::CPAN

use strict;


# return an array of:
#   {
#     filename    => www/2010/01/17/23/5943_prod_5x2N5qyerdeddsNi
#     location    => [ mrm@server1, mrm@server2 ]
#     size        => 10863
#   }

sub get_file_list {
    my $config = shift;

    die "get_file_list not implemented. you need to provide this.\nsee 'class_filelist' in the documentation\n";
}


1;

lib/AC/MrGamoo/Default/MySelf.pm  view on Meta::CPAN

use AC::MrGamoo::Debug;
use Sys::Hostname;
use Socket;
use strict;


my $SERVERID;
my $MYIP = inet_ntoa(scalar gethostbyname(hostname()));
die "cannot determine my IP addr.\nsee 'class_myself' in the documentation\n" unless $MYIP;

sub init {
    my $class = shift;
    my $port  = shift;	# not used
    my $id    = shift;

    $SERVERID = $id;
    unless( $SERVERID ){
        $SERVERID = 'mrm/' . conf_value('environment') . '@' . hostname();
    }
    verbose("system persistent-id: $SERVERID");
}

sub my_server_id {
    return $SERVERID;
}

sub my_network_info {
    return [ { ipa => $MYIP } ];
}

sub my_datacenter {
    return 'default';
}

1;

lib/AC/MrGamoo/Default/ReadInput.pm  view on Meta::CPAN

# Created: 2010-Jan-26 12:01 (EST)
# Function: read input - line by line
#
# $Id: ReadInput.pm,v 1.1 2010/11/01 18:41:54 jaw Exp $

package AC::MrGamoo::Default::ReadInput;
use strict;

# return ( record, eof );

sub readinput {
    my $fd = shift;

    my $line = scalar <$fd>;
    return (undef, 1) unless defined $line;	# eof
    return ($line, 0);
}

1;



( run in 0.458 second using v1.01-cache-2.11-cpan-4d50c553e7e )