view release on metacpan or search on metacpan
eg/example.mrjob view on Meta::CPAN
# return a key + a value
return ( $data->{cmp}, 1 );
</%map>
%################################################################
<%reduce>
my $key = shift;
my $itr = shift; # an iterator object
# count
my $n = 0;
$itr->foreach( sub { $n ++ } );
# return a key + a value
return ($key, $n);
</%reduce>
%#
%# additional reduce blocks can go here
%#
%################################################################
%# final block runs once with the results of the previous reduce.
%# used to generate report or insert to db
eg/filelist.pm view on Meta::CPAN
# $Id: filelist.pm,v 1.1 2010/11/01 19:04:21 jaw Exp $
package Local::MrMagoo::FileList;
use AC::ISOTime;
use AC::Yenta::Direct;
use JSON;
use strict;
my $YDBFILE = "/data/files.ydb";
sub get_file_list {
my $config = shift;
# get files + metadata from yenta
my $yenta = AC::Yenta::Direct->new( 'files', $YDBFILE );
# the job config is asking for files that match:
my $syst = $config->{system};
my $tmax = $config->{end}; # time_t
my $tmin = $config->{start}; # time_t
die "could not run job\n" unless $id;
print STDERR "job: $id\n" if $opt{verbose};
$SIG{INT} = $SIG{QUIT} = sub{ $mrm->abort(); exit; };
$mrm->run_console() if $opt{console};
exit;
################################################################
sub seedlist {
# determine list of servers to try
return '10.200.2.3:3504';
}
eg/myself.pm view on Meta::CPAN
# example myself
# $Id: myself.pm,v 1.1 2010/11/01 19:04:21 jaw Exp $
package Local::MrGamoo::MySelf;
use Sys::Hostname;
use strict;
my $SERVERID;
sub init {
my $class = shift;
my $port = shift; # our tcp port
my $id = shift; # from cmd line
$SERVERID = $id;
unless( $SERVERID ){
(my $h = hostname()) =~ s/\.example.com//; # remove domain
$SERVERID = "mrm/$h";
}
verbose("system persistent-id: $SERVERID");
}
sub my_server_id {
return $SERVERID;
}
1;
eg/readinput.pm view on Meta::CPAN
# $Id: readinput.pm,v 1.1 2010/11/01 19:04:22 jaw Exp $
package Local::MrMagoo::ReadInput;
use AC::MrMagoo::User;
use JSON;
use strict;
our $R; # exported by AC::MrMagoo::User
sub readinput {
my $fd = shift; # file handle
# our file is newline delimted json data
# read next line
my $line = scalar <$fd>;
# end of file?
return (undef, 1) unless defined $line;
my $d = json_decode($line);
lib/AC/MrGamoo/AC/FileList.pm view on Meta::CPAN
'scrib@a2be021ad31c' => 'mrm@gefiltefish1-r3.ccsphl',
'scrib@a2be021bd31c' => 'mrm@gefiltefish1-r4.ccsphl',
'scrib@a2be021cd31c' => 'mrm@gefiltefish2-r3.ccsphl',
'scrib@a2be021dd31c' => 'mrm@gefiltefish2-r4.ccsphl',
'scrib@a2be021ed31c' => 'mrm@gefiltefish3-r3.ccsphl',
'scrib@a2be021fd31c' => 'mrm@gefiltefish3-r4.ccsphl',
'scrib@a2be0220d31c' => 'mrm@gefiltefish4-r3.ccsphl',
'scrib@a2be0221d31c' => 'mrm@gefiltefish4-r4.ccsphl',
);
sub get_file_list {
my $config = shift;
my $yenta = AC::Yenta::Direct->new( 'logfile', $YDBFILE );
my $mode = $config->{datamode};
my $syst = $config->{system};
my $tmax = $config->{end};
my $tmin = $config->{start};
my $start = isotime($tmin);
$start =~ s/^(\d+)T(\d+).*/$1$2/; # 20091109T123456... => 20091109123456
lib/AC/MrGamoo/AC/MySelf.pm view on Meta::CPAN
package AC::MrGamoo::AC::MySelf;
use AC::MrGamoo::Config;
use AC::MrGamoo::Debug;
use AC::DataCenter; # provides my_network_info, my_datacenter
use Sys::Hostname;
use strict;
my $SERVERID;
sub init {
my $class = shift;
my $port = shift; # not used
my $id = shift;
$SERVERID = $id;
unless( $SERVERID ){
(my $h = hostname()) =~ s/\.adcopy.*//;
my $v = conf_value('environment');
$SERVERID = 'mrm';
$SERVERID .= '/' . $v unless $v eq 'prod';
$SERVERID .= '@' . $h;
}
verbose("system persistent-id: $SERVERID");
}
sub my_server_id {
return $SERVERID;
}
1;
lib/AC/MrGamoo/AC/ReadInput.pm view on Meta::CPAN
# $Id: ReadInput.pm,v 1.1 2010/11/01 18:41:49 jaw Exp $
package AC::MrGamoo::AC::ReadInput;
use AC::Logfile;
use AC::Daemon;
use AC::MrGamoo::User;
use strict;
our $R; # exported by AC::MrGamoo::User
sub readinput {
my $fd = shift;
my $line = scalar <$fd>;
return (undef, 1) unless defined $line;
my $d;
eval { $d = parse_dancr_log($line); };
if( $@ ){
problem("cannot parse data in (" . $R->config('current_file') . "). cannot process\n");
return ;
lib/AC/MrGamoo/API/Chk.pm view on Meta::CPAN
#
# $Id: Chk.pm,v 1.1 2010/11/01 18:41:50 jaw Exp $
package AC::MrGamoo::API::Chk;
use AC::MrGamoo::Debug 'api_del';
use AC::MrGamoo::API::Simple;
use AC::MrGamoo::Scriblr;
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
my $file = filename($req->{filename});
if( $file && -f $file ){
reply( 500, 'Error', $io, $proto, $req );
lib/AC/MrGamoo/API/Client.pm view on Meta::CPAN
use AC::MrGamoo::Debug 'client';
use AC::MrGamoo::Protocol;
use AC::DC::IO::TCP::Client;
use Socket;
our @ISA = 'AC::DC::IO::TCP::Client';
use strict;
my $TIMEOUT = 15;
sub new {
my $class = shift;
my $addr = shift;
my $port = shift;
my $info = shift;
my $req = shift;
my $data = shift;
debug("new client type: $req->{type} to $addr:$port");
my $send = AC::MrGamoo::Protocol->encode_request( $req, $data );
my $me = $class->SUPER::new( $addr, $port,
info => "client $req->{type} to $addr:$port; $info",
request => $send,
);
return $me;
}
sub start {
my $me = shift;
$me->set_callback('timeout', \&_timeout);
$me->set_callback('read', \&_read);
$me->set_callback('shutdown', \&_shutdown);
$me->SUPER::start();
$me->timeout_rel($TIMEOUT);
$me->write( $me->{request} );
return $me;
}
sub _timeout {
my $me = shift;
$me->shut();
}
sub _shutdown {
my $me = shift;
if( $me->{status_ok} ){
$me->run_callback('on_success', { result => $me->{result} } );
}else{
$me->run_callback('on_failure');
}
}
sub _read {
my $me = shift;
my $evt = shift;
debug("recvd reply to $me->{info}");
my($proto, $data, $content) = read_protocol_no_content( $me, $evt );
return unless $proto;
# check response
if( $proto->{is_error} ){
lib/AC/MrGamoo/API/Client.pm view on Meta::CPAN
if( $proto->{data}{status_code} != 200 ){
return $me->_uh_oh("recvd error reply $proto->{data}{status_code} $proto->{data}{status_message}");
}
$me->{result} = $proto;
$me->{status_ok} = 1;
$me->shut();
}
sub _uh_oh {
my $me = shift;
my $msg = shift;
debug("error $msg");
$me->run_callback('error', { error => $msg } );
$me->shut();
}
1;
lib/AC/MrGamoo/API/Del.pm view on Meta::CPAN
# Function:
#
# $Id: Del.pm,v 1.1 2010/11/01 18:41:51 jaw Exp $
package AC::MrGamoo::API::Del;
use AC::MrGamoo::Debug 'api_del';
use AC::MrGamoo::API::Simple;
use AC::MrGamoo::Scriblr;
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
# validate filename
my $file = filename($req->{filename});
debug("deleting file $file");
lib/AC/MrGamoo/API/Get.pm view on Meta::CPAN
use AC::MrGamoo::Config;
use AC::MrGamoo::API::Simple;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::Scriblr;
use AC::SHA1File;
use Fcntl;
use POSIX;
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
return unless $proto->{want_reply};
in_background( \&_get_file, $io, $proto, $req, $content );
}
sub _get_file {
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
my $file = filename($req->{filename});
my $fd = $io->{fd};
fcntl($fd, F_SETFL, 0); # unset nbio
return nbfd_reply(404, "not found", $fd, $proto, $req) unless -f $file;
lib/AC/MrGamoo/API/HB.pm view on Meta::CPAN
use AC::MrGamoo::About;
use AC::MrGamoo::MySelf;
use Sys::Hostname;
require 'AC/protobuf/heartbeat.pl';
use strict;
my $HOSTNAME = hostname();
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
unless( $proto->{want_reply} ){
$io->shut();
return;
}
lib/AC/MrGamoo/API/JobAbort.pm view on Meta::CPAN
package AC::MrGamoo::API::JobAbort;
use AC::MrGamoo::Debug 'api_job';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::API::Simple;
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
debug("abort job $req->{jobid}");
my $r = AC::MrGamoo::Job->abort( jobid => $req->{jobid} );
lib/AC/MrGamoo/API/JobCreate.pm view on Meta::CPAN
package AC::MrGamoo::API::JobCreate;
use AC::MrGamoo::Debug 'api_job';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::API::Simple;
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
debug("new job $req->{jobid}");
if( $req->{console} =~ /^:/ ){
# fill in ip addr
lib/AC/MrGamoo/API/Put.pm view on Meta::CPAN
use AC::MrGamoo::Config;
use AC::MrGamoo::API::Simple;
use AC::SHA1File;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::Scriblr;
use File::Path;
use POSIX;
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
if( conf_value('scriblr') =~ /no|off/i ){
reply( 500, 'Error', $io, $proto, $req );
return;
}
in_background( \&_put_file, $io, $proto, $req, $content );
}
sub _put_file {
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
my $file = filename($req->{filename});
my $fd = $io->{fd};
fcntl($fd, F_SETFL, 0); # unset nbio
my($dir) = $file =~ m|^(.+)/[^/]+$|;
lib/AC/MrGamoo/API/Simple.pm view on Meta::CPAN
use AC::MrGamoo::Protocol;
use AC::MrGamoo::Debug 'api';
use AC::Import;
use POSIX;
require 'AC/protobuf/std_reply.pl';
use strict;
our @EXPORT = qw(reply on_success on_failure in_background nbfd_reply);
sub reply {
my $code = shift;
my $msg = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
unless( $proto->{want_reply} ){
$io->shut();
return;
}
lib/AC/MrGamoo/API/Simple.pm view on Meta::CPAN
is_reply => 1,
}, {
status_code => $code,
status_message => $msg,
} );
debug("sending $code reply for $proto->{type} on $io->{info}");
$io->write_and_shut( $response );
}
sub nbfd_reply {
my $code = shift;
my $msg = shift;
my $fd = shift;
my $proto = shift;
my $req = shift;
return unless $proto->{want_reply};
my $response = AC::MrGamoo::Protocol->encode_reply( {
type => $proto->{type},
lib/AC/MrGamoo/API/Simple.pm view on Meta::CPAN
is_reply => 1,
}, {
status_code => $code,
status_message => $msg,
} );
debug("sending $code reply for $proto->{type} (from bkg)");
syswrite( $fd, $response );
}
sub on_success {
my $x = shift;
my $e = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
reply( 200, 'OK', $io, $proto, $req );
}
sub on_failure {
my $x = shift;
my $e = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
reply( 500, 'Error', $io, $proto, $req );
}
sub in_background {
my $func = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $pid = fork();
if( !defined($pid) ){
problem("cannot fork: $!");
reply( 500, 'Error', $io, $proto, $req );
lib/AC/MrGamoo/API/TaskAbort.pm view on Meta::CPAN
package AC::MrGamoo::API::TaskAbort;
use AC::MrGamoo::Debug 'api_task';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::API::Simple;
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
debug("abort task $req->{jobid}/$req->{taskid}");
my $r = AC::MrGamoo::Task->abort( jobid => $req->{jobid}, taskid => $req->{taskid} );
lib/AC/MrGamoo/API/TaskCreate.pm view on Meta::CPAN
package AC::MrGamoo::API::TaskCreate;
use AC::MrGamoo::Debug 'api_task';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::API::Simple;
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
debug("new task $req->{jobid}/$req->{taskid}");
my $x = AC::MrGamoo::Task->new( %$req );
my $r = $x ? $x->start() : undef;
lib/AC/MrGamoo/API/TaskStatus.pm view on Meta::CPAN
package AC::MrGamoo::API::TaskStatus;
use AC::MrGamoo::Debug 'api_job';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::API::Simple;
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
debug("updating task status $req->{jobid}/$req->{taskid}");
my $r = AC::MrGamoo::Job->task_status( %$req );
lib/AC/MrGamoo/API/Xfer.pm view on Meta::CPAN
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::PeerList;
use AC::MrGamoo::API::Simple;
use strict;
my $MSGID = $$;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
# validate filename
if( $req->{filename} =~ m%/\.|^\.% ){
reply( 500, 'Error', $io, $proto, $req );
return;
lib/AC/MrGamoo/API/Xfer.pm view on Meta::CPAN
}
# send status when finished
$x->set_callback('on_success', \&_yippee, $proto, $req);
$x->set_callback('on_failure', \&_boohoo, $proto, $req);
# start
$x->start();
}
sub _mk_xfer {
my $loc = shift;
my $req = shift;
my $x = AC::MrGamoo::Xfer->new(
$req->{filename}, ($req->{dstname} || $req->{filename}), $loc, $req,
);
return $x;
}
################################################################
sub _yippee {
my $x = shift;
my $e = shift;
my $proto = shift;
my $req = shift;
tell_master( $req, 200, 'OK' );
}
sub _boohoo {
my $x = shift;
my $e = shift;
my $proto = shift;
my $req = shift;
debug("boohoo - xfer failed $req->{copyid}");
tell_master( $req, 500, 'Failed' );
}
sub tell_master {
my $req = shift;
my $code = shift;
my $msg = shift;
my($addr, $port) = get_peer_addr_from_id( $req->{master} );
debug("sending xfer status update for $req->{copyid} => $code => $req->{master}");
debug("cannot find addr") unless $addr;
return unless $addr;
my $x = AC::MrGamoo::API::Client->new( $addr, $port, "xfer $req->{copyid}", {
lib/AC/MrGamoo/API/XferStatus.pm view on Meta::CPAN
package AC::MrGamoo::API::XferStatus;
use AC::MrGamoo::Debug 'api_job';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::API::Simple;
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
debug("updating xfer status $req->{jobid}/$req->{copyid}");
my $r = AC::MrGamoo::Job->xfer_status( %$req );
lib/AC/MrGamoo/About.pm view on Meta::CPAN
#
# $Id: About.pm,v 1.1 2010/11/01 18:41:39 jaw Exp $
package AC::MrGamoo::About;
use AC::Import;
use strict;
our @EXPORT = 'my_port';
my $port;
sub init {
my $class = shift;
$port = shift;
}
sub my_port { $port }
1;
lib/AC/MrGamoo/Client.pm view on Meta::CPAN
use JSON;
use Sys::Hostname;
use Socket;
require 'AC/protobuf/mrgamoo.pl';
require 'AC/protobuf/mrgamoo_status.pl';
require 'AC/protobuf/std_reply.pl';
use strict;
sub new {
my $class = shift;
my $from = shift; # file | text
my $src = shift;
my $cfg = shift;
my $host = hostname();
my $user = getpwuid($<);
my $trace = "$user/$$\@$host:" . ($from eq 'file' ? $src : 'text');
my $me = bless {
traceinfo => $trace,
}, $class;
$me->{fdebug} = $cfg->{debug} ? sub{ print STDERR "@_\n" } : sub {};
# compile job
my $mr = AC::MrGamoo::Submit::Compile->new( $from => $src );
$me->{program} = $mr;
# merge job %config section with passed in config
$mr->add_config($cfg);
return $me;
}
sub get_config_param {
my $me = shift;
$me->{program}->get_config_param(@_);
}
sub set_config_param {
my $me = shift;
$me->{program}->set_config_param(@_);
}
sub open_console {
my $me = shift;
my $fd;
socket($fd, PF_INET, SOCK_DGRAM, 0);
bind($fd, sockaddr_in(0, INADDR_ANY));
my $s = getsockname($fd);
my($port, $addr) = sockaddr_in($s);
$me->{console_fd} = $fd;
$me->{console_port} = $port;
}
sub run_console {
my $me = shift;
my $fd = $me->{console_fd};
while(1){
my $buf;
recv $fd, $buf, 65535, 0;
my $proto = AC::MrGamoo::Protocol->decode_header($buf);
my $data = substr($buf, AC::MrGamoo::Protocol->header_size());
my $req = AC::MrGamoo::Protocol->decode_request($proto, $data);
last if $req->{type} eq 'finish';
print STDERR "$req->{msg}" if $req->{type} eq 'stderr';
print "$req->{msg}" if $req->{type} eq 'stdout';
$me->{fdebug}->("$req->{server_id}\t$req->{msg}") if $req->{type} eq 'debug';
}
}
sub submit {
my $me = shift;
my $seed = shift; # [ "ipaddr:port", ... ]
my $mr = $me->{program};
my $r = AC::MrGamoo::Submit::Request->new( $mr );
$r->{eu_print_stderr} = sub { print STDERR "@_\n" };
$r->{eu_print_stdout} = sub { print STDERR "@_\n" };
# run init section
my $h_init = $mr->get_code( 'init' );
my $initres = ($h_init ? $h_init->{code}() : undef) || {};
$me->{id} = unique();
my $req = AC::MrGamoo::Protocol->encode_request( {
type => 'mrgamoo_jobcreate',
msgidno => $^T,
want_reply => 1,
lib/AC/MrGamoo/Client.pm view on Meta::CPAN
$me->{master} = { addr => $addr, port => $port };
$ok = 1;
}else{
# pick server
$ok = $me->_pick_master_and_send( $req, $seed );
}
return $ok ? $me->{id} : undef;
}
sub abort {
my $me = shift;
return unless $me->{master};
my $res = $me->_submit_to( $me->{master}{addr}, $me->{master}{port}, AC::MrGamoo::Protocol->encode_request( {
type => 'mrgamoo_jobabort',
msgidno => $^T,
want_reply => 1,
}, {
jobid => $me->{id},
}));
}
################################################################
sub _pick_master_and_send {
my $me = shift;
my $req = shift;
my $seed = shift;
my @serverlist;
my $listreq = AC::MrGamoo::Protocol->encode_request( {
type => 'mrgamoo_status',
msgidno => $^T,
want_reply => 1,
lib/AC/MrGamoo/Client.pm view on Meta::CPAN
$res = $me->_submit_to( $addr, $ip->{port}, $req );
alarm(0);
};
next unless $res && $res->{status_code} == 200;
$me->{master} = { addr => $addr, port => $ip->{port} };
return 1;
}
return ;
}
sub _submit_to {
my $me = shift;
my $addr = shift;
my $port = shift;
my $req = shift;
$me->{fdebug}->("sending job to $addr:$port");
my $reply = AC::MrGamoo::Protocol->send_request( inet_aton($addr), $port, $req, $me->{fdebug}, 120 );
my $res = AC::MrGamoo::Protocol->decode_reply($reply);
return $res;
}
################################################################
sub check_code {
my $me = shift;
my $mr = $me->{program};
my $nr = @{ $mr->{content}{reduce} };
$me->_check('map');
$me->_check('reduce', $_) for (0 .. $nr - 1);
$me->_check('final');
return 1;
}
sub _check {
my $me = shift;
my $mr = $me->{program};
my $prog = $mr->compile(@_);
eval "sub $prog";
die $@ if $@;
}
1;
lib/AC/MrGamoo/Config.pm view on Meta::CPAN
seedpeer => \&AC::ConfigFile::Simple::parse_keyarray,
scriblr => \&AC::ConfigFile::Simple::parse_keyvalue,
sortprog => \&AC::ConfigFile::Simple::parse_keyvalue,
gzprog => \&AC::ConfigFile::Simple::parse_keyvalue,
);
################################################################
sub handle_config {
my $me = shift;
my $key = shift;
my $rest = shift;
my $fnc = $CONFIG{$key};
return unless $fnc;
$fnc->($me, $key, $rest);
return 1;
}
################################################################
sub conf_value {
my $key = shift;
return $AC::MrGamoo::CONF->{config}{$key};
}
1;
lib/AC/MrGamoo/Customize.pm view on Meta::CPAN
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-26 10:37 (EST)
# Function: connect user provided implementation
#
# $Id: Customize.pm,v 1.1 2010/11/01 18:41:40 jaw Exp $
package AC::MrGamoo::Customize;
use strict;
sub customize {
my $class = shift;
my $implby = shift;
(my $default = $class) =~ s/(.*)::([^:]+)$/$1::Default::$2/;
# load user's implemantation + default
for my $p ($implby, $default){
eval "require $p" if $p;
die $@ if $@;
}
lib/AC/MrGamoo/D.pm view on Meta::CPAN
use AC::MrGamoo::Submit::Request;
use AC::Daemon;
use AC::Misc;
require 'AC/protobuf/mrgamoo.pl';
require 'AC/protobuf/std_reply.pl';
use strict;
sub new {
my $class = shift;
my %p = @_;
AC::MrGamoo::MySelf->customize( $p{class_myself} );
AC::MrGamoo::FileList->customize( $p{class_filelist} );
AC::MrGamoo::ReadInput->customize( $p{class_readinput} );
# ...
return bless \$class, $class;
}
sub daemon {
my $me = shift;
my $cfile = shift;
my $opt = shift; # foreground, debugall, persistent_id, argv
die "no config file specified\n" unless $cfile;
# configure
$AC::MrGamoo::CONF = AC::MrGamoo::Config->new(
$cfile, onreload => sub {},
);
initlog( 'mrgamoo', (conf_value('syslog') || 'local4'), $opt->{debugall} );
AC::MrGamoo::Debug->init( $opt->{debugall}, $AC::MrGamoo::CONF );
daemonize(5, 'mrgamood', $opt->{argv}) unless $opt->{foreground};
verbose("starting.");
$SIG{CHLD} = $SIG{PIPE} = sub {}; # ignore
$SIG{INT} = $SIG{TERM} = $SIG{QUIT} = \&AC::DC::IO::request_exit; # abort
# initialize subsystems
my $port = $opt->{port} || conf_value('port');
AC::MrGamoo::About->init( $port );
AC::MrGamoo::MySelf->init( $port, $opt->{persistent_id} );
AC::DC::IO::TCP::Server->new( $port, 'AC::MrGamoo::Server' );
# start "cronjobs"
AC::DC::Sched->new(
info => 'check config files',
freq => 30,
func => sub { $AC::MrGamoo::CONF->check() },
);
run_and_watch(
($opt->{foreground} || $opt->{debugall}),
\&AC::DC::IO::mainloop,
);
}
1;
lib/AC/MrGamoo/Default/FileList.pm view on Meta::CPAN
use strict;
# return an array of:
# {
# filename => www/2010/01/17/23/5943_prod_5x2N5qyerdeddsNi
# location => [ mrm@server1, mrm@server2 ]
# size => 10863
# }
sub get_file_list {
my $config = shift;
die "get_file_list not implemented. you need to provide this.\nsee 'class_filelist' in the documentation\n";
}
1;
lib/AC/MrGamoo/Default/MySelf.pm view on Meta::CPAN
use AC::MrGamoo::Debug;
use Sys::Hostname;
use Socket;
use strict;
my $SERVERID;
my $MYIP = inet_ntoa(scalar gethostbyname(hostname()));
die "cannot determine my IP addr.\nsee 'class_myself' in the documentation\n" unless $MYIP;
sub init {
my $class = shift;
my $port = shift; # not used
my $id = shift;
$SERVERID = $id;
unless( $SERVERID ){
$SERVERID = 'mrm/' . conf_value('environment') . '@' . hostname();
}
verbose("system persistent-id: $SERVERID");
}
sub my_server_id {
return $SERVERID;
}
sub my_network_info {
return [ { ipa => $MYIP } ];
}
sub my_datacenter {
return 'default';
}
1;
lib/AC/MrGamoo/Default/ReadInput.pm view on Meta::CPAN
# Created: 2010-Jan-26 12:01 (EST)
# Function: read input - line by line
#
# $Id: ReadInput.pm,v 1.1 2010/11/01 18:41:54 jaw Exp $
package AC::MrGamoo::Default::ReadInput;
use strict;
# return ( record, eof );
sub readinput {
my $fd = shift;
my $line = scalar <$fd>;
return (undef, 1) unless defined $line; # eof
return ($line, 0);
}
1;