view release on metacpan or search on metacpan
eg/example.mrjob view on Meta::CPAN
tasktimeout => 120
</%config>
%################################################################
%# common block is prepended to all other blocks.
%# used to load modules
<%common>
use AC::Misc;
use AC::Dumper;
</%common>
%################################################################
%# init block runs once at startup.
%# the return value can be retrieved by other blocks.
%# used to calculate values or fetch things from a db.
<%init>
$R->print("starting map/reduce job");
return {
mood => 'joyous',
};
</%init>
%################################################################
<%map>
<%attr>
%# override various parameters
maxrun => 300
eg/example.mrjob view on Meta::CPAN
%################################################################
%# final block runs once with the results of the previous reduce.
%# used to generate report or insert to db
<%final>
<%attr>
%# override various parameters
use_strict => 0
in_package => My::Private::Space
</%attr>
<%init>
# init sub-block runs at start of final block
my $report;
</%init>
<%cleanup>
# cleanup sub-block runs at end of final block
# get the values from the init block
my $iv = $R->initvalue();
# stdout is tied to $R. so plain print also works.
eg/filelist.pm view on Meta::CPAN
sub get_file_list {
my $config = shift;
# get files + metadata from yenta
my $yenta = AC::Yenta::Direct->new( 'files', $YDBFILE );
# the job config is asking for files that match:
my $syst = $config->{system};
my $tmax = $config->{end}; # time_t
my $tmin = $config->{start}; # time_t
# the keys in yenta are of the form: 20100126150139_[...]
my $start = isotime($tmin); # 1286819830 => 20101011T175710Z
$start =~ s/^(\d+)T(\d+).*/$1$2/; # 20101011T175710Z => 20101011175710
my @files = grep {
# does this file match the request?
($_->{subsystem} eq $syst) &&
($_->{end_time} >= $tmin) &&
($_->{start_time} <= $tmax)
} map {
# get meta-data on this file. data is json encoded
my $d = $yenta->get($_);
$d = $d ? decode_json($d) : {};
# convert space seperated locations to arrayref
$d->{location} = [ (split /\s+/, $d->{location}) ];
$d;
} $yenta->getrange($start, undef); # get all files from $start to now
return \@files;
}
1;
my %opt;
use Getopt::Long qw(:config no_ignore_case);
use AC::MrMagoo::Client;
use strict;
GetOptions(\%opt,
"check",
"verbose|v",
"debug|d",
"console",
"start=s",
"end=s",
) || exit;
my $mrm = AC::MrMagoo::Client->new( $ARGV[0], \%opt );
if( $opt{check} ){
$mrm->check_code();
exit;
}
eg/mrgamoo.conf view on Meta::CPAN
# example config
#
# file will be reloaded automagically if it changes. no need to hup or restart.
port 3504
syslog local4
environment prod
allow 10.200.2.0/23
basedir /home/aclogs
seedpeer 10.200.2.3:3504
eg/readinput.pm view on Meta::CPAN
my $line = scalar <$fd>;
# end of file?
return (undef, 1) unless defined $line;
my $d = json_decode($line);
# filter input on date range. we could just as easily filter
# in 'map', but doing it here, behind the scenes, keeps things
# simpler for the jr. developers writing reports.
return (undef, 0) if $d->{tstart} < $R->config('start');
return (undef, 0) if $d->{tstart} >= $R->config('end');
return ($d, 0);
}
1;
lib/AC/MrGamoo.pm view on Meta::CPAN
=item allow
specify networks allowed to connect.
allow 127.0.0.1
allow 192.168.10.0/24
=item seedpeer
specify initial peers to contact when starting. the author generally
specifies 2 on the east coast, and 2 on the west coast.
seedpeer 192.168.10.11:3503
seedpeer 192.168.10.12:3503
=item secret
specify a secret key used to encrypt data transfered between
systems in different datacenters.
lib/AC/MrGamoo/AC/FileList.pm view on Meta::CPAN
);
sub get_file_list {
my $config = shift;
my $yenta = AC::Yenta::Direct->new( 'logfile', $YDBFILE );
my $mode = $config->{datamode};
my $syst = $config->{system};
my $tmax = $config->{end};
my $tmin = $config->{start};
my $start = isotime($tmin);
$start =~ s/^(\d+)T(\d+).*/$1$2/; # 20091109T123456... => 20091109123456
# NB: keys in the yenta logfile map are of the form: 20100126150139_eqaB5uSerdeddsOw
$syst = undef if $syst eq '*';
$mode = undef if $mode eq '*';
$syst =~ s/[ ,]/\|/g;
if( $syst ){
$syst = qr/^($syst)$/;
}
debug("mode=$mode, syst=$syst, tmin=$tmin, tmax=$tmax, start=$start");
my @files = grep {
(!$mode || ($_->{environment} eq $mode)) &&
(!$syst || ($_->{subsystem} =~ $syst)) &&
($_->{end_time} >= $tmin) &&
($_->{start_time} <= $tmax)
} map {
#debug("file: $_");
my $d = $yenta->get($_);
$d = $d ? decode_json($d) : {};
$d->{location} = [ map { $CONVERT{$_} || $_ } (split /\s+/, $d->{location}) ];
$d;
} $yenta->getrange($start, undef);
debug("found " .scalar(@files)." files");
return \@files;
}
1;
lib/AC/MrGamoo/AC/ReadInput.pm view on Meta::CPAN
my $d;
eval { $d = parse_dancr_log($line); };
if( $@ ){
problem("cannot parse data in (" . $R->config('current_file') . "). cannot process\n");
return ;
}
# filter input on date range. we could just as easily filter
# in 'map', but doing here, behind the scenes, keeps things
# simpler for the jr. developers writing reports.
return if $d->{tstart} < $R->config('start');
return if $d->{tstart} >= $R->config('end');
return ($d, 0);
}
1;
lib/AC/MrGamoo/API/Client.pm view on Meta::CPAN
debug("new client type: $req->{type} to $addr:$port");
my $send = AC::MrGamoo::Protocol->encode_request( $req, $data );
my $me = $class->SUPER::new( $addr, $port,
info => "client $req->{type} to $addr:$port; $info",
request => $send,
);
return $me;
}
sub start {
my $me = shift;
$me->set_callback('timeout', \&_timeout);
$me->set_callback('read', \&_read);
$me->set_callback('shutdown', \&_shutdown);
$me->SUPER::start();
$me->timeout_rel($TIMEOUT);
$me->write( $me->{request} );
return $me;
}
sub _timeout {
my $me = shift;
$me->shut();
}
lib/AC/MrGamoo/API/JobCreate.pm view on Meta::CPAN
my $content = shift;
debug("new job $req->{jobid}");
if( $req->{console} =~ /^:/ ){
# fill in ip addr
$req->{console} = $io->{from_ip} . $req->{console};
}
my $x = AC::MrGamoo::Job->new( %$req );
my $r = $x ? $x->start() : undef;
if( $r ){
reply( 200, 'OK', $io, $proto, $req );
}else{
reply( 500, 'Error', $io, $proto, $req );
}
}
1;
lib/AC/MrGamoo/API/TaskCreate.pm view on Meta::CPAN
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
debug("new task $req->{jobid}/$req->{taskid}");
my $x = AC::MrGamoo::Task->new( %$req );
my $r = $x ? $x->start() : undef;
if( $r ){
reply( 200, 'OK', $io, $proto, $req );
}else{
reply( 500, 'Error', $io, $proto, $req );
}
}
1;
lib/AC/MrGamoo/API/Xfer.pm view on Meta::CPAN
my $req = shift;
my $content = shift;
# validate filename
if( $req->{filename} =~ m%/\.|^\.% ){
reply( 500, 'Error', $io, $proto, $req );
return;
}
# new retry
debug("starting file xfer $req->{copyid} => $req->{filename}");
# start working on the copy
my $x = AC::MrGamoo::Retry->new(
newobj => \&_mk_xfer,
newargs => [ $req ],
tryeach => $req->{location},
);
# reply now
if( $x ){
reply( 200, 'OK', $io, $proto, $req );
}else{
debug("sending error, xfer/retrier failed, $io->{info}");
reply( 501, 'Error', $io, $proto, $req );
}
# send status when finished
$x->set_callback('on_success', \&_yippee, $proto, $req);
$x->set_callback('on_failure', \&_boohoo, $proto, $req);
# start
$x->start();
}
sub _mk_xfer {
my $loc = shift;
my $req = shift;
my $x = AC::MrGamoo::Xfer->new(
$req->{filename}, ($req->{dstname} || $req->{filename}), $loc, $req,
);
lib/AC/MrGamoo/API/Xfer.pm view on Meta::CPAN
want_reply => 1,
}, {
jobid => $req->{jobid},
copyid => $req->{copyid},
status_code => $code,
status_message => $msg,
} );
debug("cannot create client") unless $x;
return unless $x;
$x->start();
# we don't need any reply or reply callbacks. just send + forget
}
################################################################
1;
lib/AC/MrGamoo/D.pm view on Meta::CPAN
# configure
$AC::MrGamoo::CONF = AC::MrGamoo::Config->new(
$cfile, onreload => sub {},
);
initlog( 'mrgamoo', (conf_value('syslog') || 'local4'), $opt->{debugall} );
AC::MrGamoo::Debug->init( $opt->{debugall}, $AC::MrGamoo::CONF );
daemonize(5, 'mrgamood', $opt->{argv}) unless $opt->{foreground};
verbose("starting.");
$SIG{CHLD} = $SIG{PIPE} = sub {}; # ignore
$SIG{INT} = $SIG{TERM} = $SIG{QUIT} = \&AC::DC::IO::request_exit; # abort
# initialize subsystems
my $port = $opt->{port} || conf_value('port');
AC::MrGamoo::About->init( $port );
AC::MrGamoo::MySelf->init( $port, $opt->{persistent_id} );
AC::DC::IO::TCP::Server->new( $port, 'AC::MrGamoo::Server' );
# start "cronjobs"
AC::DC::Sched->new(
info => 'check config files',
freq => 30,
func => sub { $AC::MrGamoo::CONF->check() },
);
run_and_watch(
($opt->{foreground} || $opt->{debugall}),
\&AC::DC::IO::mainloop,
);
lib/AC/MrGamoo/FileList.pm view on Meta::CPAN
=head1 DESCRIPTION
MrGamoo only runs map/reduce jobs.
It is up to you to get the files on to the servers
and keep track of where they are. And to tell MrGamoo.
Some people keep the file meta-information in a sql database.
Some people keep the file meta-information in a yenta map.
Some people keep the file meta-information in the filesystem.
When a new job starts, your C<get_file_list> function will be
called with the job config, and should return an arrayref
of matching files along with meta-info.
Each element of the returned arrayref should be a hashref
containing at least the following fields:
=head2 filename
the name of the file, relative to the C<basedir> in your config file.
lib/AC/MrGamoo/Job.pm view on Meta::CPAN
phase_no => -1,
file_info => {},
tmp_file => [],
server_info => {},
task_running => {},
task_pending => {},
xfer_running => {},
xfer_pending => {},
request_running => {},
request_pending => {},
statistics => { job_start => time() },
}, $class;
if( $REGISTRY{ $me->{request}{jobid} } ){
verbose("ignoring duplicate request job $me->{request}{jobid}");
# will cause a 200 OK, so the requestor will not retry
return $REGISTRY{ $me->{request}{jobid} };
}
verbose("new job: $me->{request}{jobid} ($me->{request}{traceinfo})");
lib/AC/MrGamoo/Job.pm view on Meta::CPAN
$me->{maxfail} = 5 * ( (keys %{$plan->{taskidx}}) + @{$plan->{copying}});
$me->{server_info}{$_->{id}} = {} for @$servers;
$me->_preload_file_copies();
$REGISTRY{ $me->{request}{jobid} } = $me;
return $me;
}
sub start {
my $me = shift;
debug("start job");
$me->{euconsole}->send_msg('debug', 'starting job');
$me->_try_to_do_something();
1;
}
################################################################
# record status rcvd from task
sub task_status {
my $me = _find(shift, @_);
my %p = @_;
lib/AC/MrGamoo/Job.pm view on Meta::CPAN
for my $job (values %REGISTRY){
$job->_periodic();
}
}
sub _periodic {
my $me = shift;
my @t = values %{$me->{task_running}};
for my $t ( @t ){
my $lt = $t->{status_time} || $t->{start_time};
next if $^T - $lt < ($me->{options}{tasktimeout} || $TASKTIMEOUT);
$t->failed( $me, 'timeout' );
}
my @c = values %{$me->{xfer_running}};
for my $c ( @c ){
my $lt = $c->{status_time} || $c->{start_time};
next if $^T - $lt < ($me->{options}{xfertimeout} || $XFERTIMEOUT);
$c->failed( $me, 'timeout' );
}
$me->_try_to_do_something();
my $tr = keys %{ $me->{task_running} };
my $tp = keys %{ $me->{task_pending} };
my $xr = keys %{ $me->{xfer_running} };
lib/AC/MrGamoo/Job.pm view on Meta::CPAN
my $ph = $me->{plan}{phases}[ $me->{phase_no} ] || 'none';
debug("status: phase $ph, task $tr / $tp, xfer $xr / $xp, reqs $rr / $rp")
if $tr || $tp || $xr || $xp || $rr || $rp;
}
################################################################
sub _start_next_phase {
my $me = shift;
debug("next phase");
$me->{phase_no} ++;
my $tp = $me->{plan}{taskplan}[ $me->{phase_no} ];
# finished all phases ?
unless( $tp ){
if( $me->{_cleanedup} ){
$me->_finished();
return 1;
}else{
$me->_cleanup_files();
return;
}
}
debug("job $me->{request}{jobid} starting phase $tp->{phase}");
$me->{euconsole}->send_msg('debug', "starting phase $tp->{phase}");
# move tasks to pending
for my $t (@{$tp->{task}}){
$t->pend($me);
}
}
sub _maybe_start_task {
my $me = shift;
my $task = shift;
#debug("maybe start task");
my $server = $task->{server};
my $underway = keys %{$me->{server_info}{$server}{task_running}};
return if $underway >= $TASKSRVRMAX; # don't overload server
return if $task->{start_after} > $^T; # rate limit retries
# RSN - check that prerequisite xfers completed
$task->start( $me );
return 1;
}
sub _maybe_start_xfer {
my $me = shift;
my $copy = shift;
#debug("maybe start xfer");
my $server = $copy->{server};
my $underway = keys %{$me->{server_info}{$server}{xfer_running}};
return if $underway >= $XFERSRVRMAX; # don't overload server
return if $copy->{start_after} > $^T; # rate limit retries
$copy->start( $me );
return 1;
}
sub _maybe_start_request {
my $me = shift;
my $req = shift;
$req->start( $me );
return 1;
}
################################################################
sub _preload_file_copies {
my $me = shift;
# start copying files
for my $c ( @{$me->{plan}{copying}} ){
$c->pend($me);
}
}
################################################################
sub _try_to_do_something {
lib/AC/MrGamoo/Job.pm view on Meta::CPAN
# is this phase done
if( !(keys %{$me->{task_running}})
&& !(keys %{$me->{task_pending}})
&& !(keys %{$me->{xfer_running}})
&& !(keys %{$me->{xfer_pending}})
&& !(keys %{$me->{request_running}})
&& !(keys %{$me->{request_pending}})
){
# this phase is finished
return if $me->_start_next_phase();
}
if( $me->{aborted}
&& !(keys %{$me->{request_running}})
&& !(keys %{$me->{request_pending}})
){
# this phase is finished
return if $me->_start_next_phase();
}
# check load ave, etc
return unless $me->_ok_to_do_more_p();
$_trying ++;
# are there requests that can start
my @rp = values %{$me->{request_pending}};
my $startreqs = $REQMAX - keys %{$me->{request_running}};
for my $r (@rp){
last if $startreqs <= 0;
next unless $me->_maybe_start_request( $r );
$startreqs --;
}
unless( $me->{aborted} ){
# are there tasks that can start
my $started = 0;
my @tp = sort { $a->{created} <=> $b->{created} } values %{$me->{task_pending}};
for my $t (@tp){
$started += $me->_maybe_start_task( $t );
last if $started >= $me->{plan}{nserver} / 4; # keep them from getting in to lockstep
}
# are there copies that can start
my @cp = sort { $a->{created} <=> $b->{created} } values %{$me->{xfer_pending}};
for my $c (@cp){
$me->_maybe_start_xfer( $c );
}
}
# should we speculatively copy some files
# should we speculatively retry a task
$_trying --;
}
lib/AC/MrGamoo/Job.pm view on Meta::CPAN
}
################################################################
sub report {
my $txt;
for my $j (values %REGISTRY){
my $ph;
$ph = 'start' if $j->{phase_no} < 0;
$ph ||= 'cleanup' if $j->{phase_no} >= @{$j->{plan}{phases}};
$ph ||= $j->{plan}{phases}[ $j->{phase_no} ];
my $tr = keys %{$j->{task_running}};
my $tp = keys %{$j->{task_pending}};
my $cr = keys %{$j->{copy_running}};
my $cp = keys %{$j->{copy_pending}};
my $rr = keys %{$j->{request_running}};
my $rp = keys %{$j->{request_pending}};
lib/AC/MrGamoo/Job/Action.pm view on Meta::CPAN
package AC::MrGamoo::Job::Action;
use AC::MrGamoo::Debug 'job_action';
use AC::MrGamoo::Job::Task;
use AC::MrGamoo::Job::Xfer;
use AC::MrGamoo::Job::Request;
use Time::HiRes 'time';
use strict;
sub started {
my $me = shift;
my $job = shift;
my $x = shift;
my $server = $me->{server};
my $id = $me->{id};
$me->{start_time} = time();
# remove from _pending
# add to _running
# add to server_info._running
debug("$x started $id on $server");
delete $job->{"${x}_pending"}{$id};
$job->{"${x}_running"}{$id} = $me;
$job->{server_info}{$server}{"${x}_running"}{$id} = $me;
}
sub finished {
my $me = shift;
my $job = shift;
my $x = shift;
my $server = $me->{server};
my $id = $me->{id};
$me->{finished} = 1;
# remove from _running
# remove from server_info._running
# add to server_info._finished
# record timing info
my $started = $me->{start_time};
$job->{statistics}{"${x}_run_time"} += time() - $started;
$job->{statistics}{"${x}_run"} ++;
debug("$x finished $id on $server");
delete $job->{"${x}_running"}{$id};
delete $job->{server_info}{$server}{"${x}_running"}{$id};
$me->{server_info}{$server}{"${x}_finished"}{$id} = 1;
}
sub failed {
my $me = shift;
lib/AC/MrGamoo/Job/Done.pm view on Meta::CPAN
use File::Path 'rmtree';
use strict;
our %REGISTRY;
our $MSGID;
sub _finished {
my $me = shift;
debug("finished");
$me->{statistics}{cleanup_time} = time() - $me->{statistics}{cleanup_start};
verbose("job stats: task $me->{statistics}{task_run} $me->{statistics}{task_run_time}, " .
"copy: $me->{statistics}{xfer_run} $me->{statistics}{xfer_run_time}, " .
"job: $me->{statistics}{job_time}, " .
"cleanup: $me->{statistics}{cleanup_files} $me->{statistics}{cleanup_time}");
$me->{euconsole}->send_msg('debug', 'finished job');
$me->{_finished} = 1;
delete $REGISTRY{ $me->{request}{jobid} };
lib/AC/MrGamoo/Job/Done.pm view on Meta::CPAN
$me->{euconsole}->send_msg('stderr', 'aborted job' . ($p{reason} ? ": $p{reason}" : ''));
}
################################################################
sub _cleanup_files {
my $me = shift;
$me->{_cleanedup} = 1;
$me->{statistics}{cleanup_start} = time();
$me->{statistics}{job_time} = time() - $me->{statistics}{job_start};
$me->{euconsole}->send_msg('debug', 'cleaning up');
$me->{euconsole}->send_msg('finish', 'finish');
# remove tmp files
for my $fi (@{$me->{tmp_file}}){
# debug("deleting $fi->{filename} from $fi->{server}");
$me->{statistics}{cleanup_files} ++;
AC::MrGamoo::Job::Request->new( $me,
lib/AC/MrGamoo/Job/Request.pm view on Meta::CPAN
sub new {
my $class = shift;
my $job = shift;
my $me = bless { @_ }, $class;
$job->{request_pending}{$me->{id}} = $me;
return $me;
}
sub start {
my $me = shift;
my $job = shift;
debug("starting request $me->{info}");
delete $job->{request_pending}{$me->{id}};
my $x = $job->_send_request( $me->{server}, $me->{info}, $me->{proto}, $me->{request});
unless( $x ){
verbose("cannot start request");
return;
}
$x->set_callback('on_success', \&_cb_start_req, $me, $job, 1);
$x->set_callback('on_failure', \&_cb_start_req, $me, $job, 0);
$job->{request_running}{$me->{id}} = $me;
$x->start();
}
sub _cb_start_req {
my $io = shift;
my $evt = shift;
my $me = shift;
my $job = shift;
my $ok = shift;
debug("request finished $me->{info}");
delete $job->{request_running}{$me->{id}};
$job->_try_to_do_something()
if $ok
&& (keys %{$job->{request_pending}})
&& (keys %{$job->{request_running}} < 5); # we go faster, if we can start a few at a time
}
1;
lib/AC/MrGamoo/Job/Task.pm view on Meta::CPAN
created => time(),
};
$job->{task_pending}{$id} = $me;
debug(" => pending task $info->{id} => $id on $server");
return $me;
}
sub start {
my $me = shift;
my $job = shift;
my $server = $me->{server};
debug("starting task $job->{request}{jobid}/$me->{info}{id}/$me->{id} on $server");
# send request to server
my $ti = $me->{info};
my $x = $job->_send_request( $server, "task $me->{id}", {
type => 'mrgamoo_taskcreate',
msgidno => $^T,
want_reply => 1,
}, {
jobid => $job->{request}{jobid},
lib/AC/MrGamoo/Job/Task.pm view on Meta::CPAN
options => $job->{request}{options},
initres => $job->{request}{initres},
console => ($job->{request}{console} || ''),
phase => $ti->{phase},
infile => $ti->{infile},
outfile => [ map { $_->{filename} } @{$ti->{outfile}} ],
master => my_server_id(),
} );
unless( $x ){
verbose("cannot start task");
$me->failed($job);
return;
}
# no success cb here. we will either timeout, or get a TaskStatus msg.
$x->set_callback('on_failure', \&_cb_start_task_fail, $me, $job );
$me->started($job, 'task');
$x->start();
}
sub _cb_start_task_fail {
my $io = shift;
my $evt = shift;
my $me = shift;
my $job = shift;
$me->failed($job, 'network');
}
sub update_status {
my $me = shift;
lib/AC/MrGamoo/Job/Xfer.pm view on Meta::CPAN
created => $^T,
};
$job->{xfer_pending}{$id} = $me;
debug("pending xfer $info->{id} => $id on $server");
return $me;
}
sub start {
my $me = shift;
my $job = shift;
# send request to server
my $server = $me->{server};
my $filename = $me->{info}{filename};
debug("starting xfer $me->{id} on $server of $filename");
my $x = $job->_send_request( $server, "xfer $me->{id}", {
type => 'mrgamoo_filexfer',
msgidno => $^T,
want_reply => 1,
}, {
jobid => $job->{request}{jobid},
copyid => $me->{id},
filename => $filename,
dstname => ($me->{info}{dstname} || $filename),
location => ($job->{file_info}{$filename}{location} || $me->{info}{location}),
console => $job->{request}{console},
master => my_server_id(),
} );
unless( $x ){
verbose("cannot start xfer");
$me->failed( $job );
return;
}
# no success cb here. we will either timeout, or get a XferStatus msg.
$x->set_callback('on_failure', \&_cb_start_xfer_fail, $me, $job );
$me->started($job, 'xfer');
$x->start();
}
sub _cb_start_xfer_fail {
my $io = shift;
my $evt = shift;
my $me = shift;
my $job = shift;
$me->failed($job, 'network');
}
# record status rcvd from file xfer
sub update_status {
lib/AC/MrGamoo/Job/Xfer.pm view on Meta::CPAN
# add to server_info.has_files
# add to file_info, tmp_file
$job->{server_info}{$server}{has_files}{$file} = 1;
push @{ $job->{file_info}{$file}{location} }, $server;
push @{$job->{tmp_file}}, { filename => $file, server => $server };
my $limit = $job->{plan}{nserver} * 1.5;
$job->_try_to_do_something()
if (keys %{$job->{xfer_pending}})
&& (keys %{$job->{xfer_running}} < $limit); # we go faster, if we can start a few at a time
}
1;
lib/AC/MrGamoo/Kibitz/Client.pm view on Meta::CPAN
our @ISA = 'AC::DC::IO::TCP::Client';
my $HDRSIZE = AC::MrGamoo::Protocol->header_size();
my $TIMEOUT = 3;
my $msgid = $$;
sub new {
my $class = shift;
# addr, port, ...
debug('starting kibitz status client');
my $me = $class->SUPER::new( @_ );
return unless $me;
$me->set_callback('timeout', \&timeout);
$me->set_callback('read', \&read);
$me->set_callback('shutdown', \&shutdown);
$me->start();
# build request
my $req = AC::MrGamoo::Protocol->encode_request( {
type => 'mrgamoo_status',
content_length => 0,
want_reply => 1,
msgid => $msgid++,
}, {
myself => AC::MrGamoo::Kibitz->about_myself(),
} );
lib/AC/MrGamoo/Kibitz/Peers.pm view on Meta::CPAN
}
debug("using seedpeer");
return("seed/$ip:$port", $ip, $port);
}
sub _kibitz_with_random_peer {
my( $id, $addr, $port ) = _random_peer();
return unless $id;
debug("starting status kibitz client to $id");
my $ok = AC::MrGamoo::Kibitz::Client->new( $addr, $port,
info => "status client: $id",
status_peer => $id,
);
__PACKAGE__->maybe_down($id, 'connect') unless $ok;
}
1;
lib/AC/MrGamoo/MySelf.pm view on Meta::CPAN
return [
# use this IP for communication with servers this datacenter (same natdom)
{ ip => $privat_ip, natdom => my_datacenter() },
# otherwise use this IP
{ ip => $public_ip },
]
}
=head2 init
inialization function called at startup. typically used to lookup hostanmes, IP addresses,
and such and store them in variables to make the above functions faster.
my $HOSTNAME;
my $DOMAIN;
sub init {
$HOSTNAME = hostname();
($DOMAIN) = $HOSTNAME =~ /^[\.]+\.(.*)/;
}
=head1 BUGS
lib/AC/MrGamoo/Retry.pm view on Meta::CPAN
sub new {
my $class = shift;
my $me = bless { @_, tries => 0 }, $class;
$me->{tryeach} ||= [];
$me->{maxtries} ||= @{ $me->{tryeach} };
return $me;
}
sub start {
my $me = shift;
$me->_try();
}
################################################################
sub _try {
my $me = shift;
my $a = $me->{tryeach}[ $me->{tries} ];
my $o = $me->{newobj}->( $a, @{$me->{newargs}} );
$me->{tries} ++;
debug("try $me->{tries}");
return _on_failure(undef, undef, $me) unless $o;
$o->set_callback( 'on_success', \&_on_success, $me );
$o->set_callback( 'on_failure', \&_on_failure, $me );
$o->start();
}
sub _on_success {
my $x = shift;
my $e = shift;
my $me = shift;
debug("all done!");
return $me->run_callback( 'on_success' );
}
lib/AC/MrGamoo/Server.pm view on Meta::CPAN
my $fd = shift;
my $ip = shift;
unless( $AC::MrGamoo::CONF->check_acl( $ip ) ){
verbose("rejecting connection from $ip");
return;
}
my $me = $class->SUPER::new( info => 'tcp mrgamoo server', from_ip => $ip );
$me->start($fd);
$me->timeout_rel($TIMEOUT);
$me->set_callback('read', \&read);
$me->set_callback('timeout', \&timeout);
}
sub timeout {
my $me = shift;
debug("connection timed out");
$me->shut();
lib/AC/MrGamoo/Submit/Compile.pm view on Meta::CPAN
}
sub _compile {
my $me = shift;
while(1){
my $line = $me->_next();
last unless defined $line;
chomp $line;
# white, comment, or start
$line =~ s/^%#.*//;
$line =~ s/#.*//;
next if $line =~ /^\s*$/;
my($tag) = $line =~ m|^<%(.*)>\s*$|;
my $d = $COMPILE{$tag};
if( $d->{tag} eq 'block'){
$me->_add_block($tag, $me->_compile_block($tag));
}
lib/AC/MrGamoo/Task.pm view on Meta::CPAN
$me->{_inputsize} += $s
}
debug("input size: $me->{_inputsize}");
# print STDERR "Task: ", dumper($me), "\n";
$REGISTRY{$task} = $me;
return $me;
}
sub start {
my $me = shift;
# if too many tasks are running, queue
my $nrun = 0;
for my $t (values %REGISTRY){
$nrun ++ if $t->{io};
}
if( $nrun >= $MAXRUNNING ){
$me->{_queueprio} = $^T - $TSTART + $me->{_inputsize} / 1_000_000;
$me->{status}{phase} = 'QUEUED';
$me->{status}{amt} = 0;
debug("queue $me->{request}{phase} task $me->{request}{jobid}/$me->{request}{taskid} prio $me->{_queueprio}");
return 1;
}
$me->_start();
}
sub _start {
my $me = shift;
debug("start $me->{request}{phase} task $me->{request}{jobid}/$me->{request}{taskid}");
my $io = AC::DC::IO::Forked->new(
\&AC::MrGamoo::Task::Running::_start_task, [ $me ],
info => "task $me->{request}{jobid}/$me->{request}{taskid}",
);
$me->{io} = $io;
$io->timeout_rel($TIMEOUT);
$io->set_callback('timeout', \&_timeout);
$io->set_callback('read', \&_read, $me);
$io->set_callback('shutdown', \&_shutdown, $me);
$io->start();
}
sub abort {
my $me = _find(shift, @_);
return unless $me;
debug("abort task $me->{request}{taskid}");
$me->{io}->shut() if $me->{io};
return 1;
}
lib/AC/MrGamoo/Task.pm view on Meta::CPAN
my $me = shift;
# send status to master
$me->_send_status();
my $task = $me->{request}{taskid};
delete $REGISTRY{$task};
delete $me->{io};
periodic(1); # try to start another task
}
sub _send_status_done {
my $io = shift;
my $evt = shift;
my $me = shift;
$me->{_status_underway} --;
}
lib/AC/MrGamoo/Task.pm view on Meta::CPAN
taskid => $me->{request}{taskid},
phase => $me->{status}{phase},
progress => $me->{status}{amt},
} );
return unless $x;
$me->{_status_underway} ++;
$x->set_callback('shutdown', \&_send_status_done, $me);
$x->start();
}
sub attr {
my $me = shift;
my $bk = shift;
my $p = shift;
return $bk->{attr}{$p} if $bk && $bk->{attr}{$p};
return $me->{options}{$p};
lib/AC/MrGamoo/Task.pm view on Meta::CPAN
my $quick = shift;
# how many tasks are running?
my $nrun = 0;
for my $t (values %REGISTRY){
$nrun ++ if $t->{io};
}
return if $quick && $nrun >= $MAXRUNNING;
# queued? send status, maybe start
for my $t (sort { $a->{_queueprio} <=> $b->{_queueprio} } values %REGISTRY){
next if $t->{io};
$t->_send_status() unless $quick;
if( $nrun < $MAXRUNNING ){
$t->_start();
$nrun ++;
}
}
}
1;
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
use Socket;
use JSON;
use strict;
my $STATUSTIME = 5; # seconds
my $MAXRUN = 3600; # override with %attr maxrun
my $SORTPROG = '/usr/bin/sort'; # override with %attr sortprog or config file
my $GZPROG = '/usr/bin/gzcat'; # override with %attr gzprog or config file
# in child process
sub _start_task {
my $me = shift;
debug("start child task");
$^T = time();
_setup_stdio_etal();
_setup_console( $me );
_update_status( 'STARTING', 0 );
# send STDOUT + STDERR to end-user console session
$me->{R}{eu_print_stderr} = sub { eu_print_stderr( $me, @_ ) };
$me->{R}{eu_print_stdout} = sub { eu_print_stdout( $me, @_ ) };
$me->{R}->redirect_io();
lib/AC/MrGamoo/Xfer.pm view on Meta::CPAN
rbufsize => 65536,
);
return unless $me;
$REGISTRY{$req->{copyid}} = $me;
debug("xfer requesting $loc:$srcname => $dstname, id $req->{copyid}");
return $me;
}
sub start {
my $me = shift;
my $nrun = 0;
for my $t (values %REGISTRY){
$nrun ++ if $t->{fd};
}
if( $nrun >= $MAXRUNNING ){
$me->{_queueprio} = $^T;
debug("queue xfer $me->{request}{copyid}");
return 1;
}
$me->_start();
}
sub _start {
my $me = shift;
debug("start xfer $me->{request}{copyid}");
$me->timeout_rel($TIMEOUT);
$me->set_callback('timeout', \&timeout);
$me->set_callback('shutdown', \&shutdown);
$me->SUPER::start();
}
sub _run_child {
my $srcname = shift; # on remote system
my $dstname = shift; # on local system
my $tmpfile = shift;
my $loc = shift;
my $req = shift;
lib/AC/MrGamoo/Xfer.pm view on Meta::CPAN
delete $REGISTRY{$me->{request}{copyid}};
debug("exitval = $me->{exitval}");
if( !$me->{exitval} ){
$me->run_callback('on_success');
}else{
$me->run_callback('on_failure');
}
periodic(1); # try to start another xfer
}
sub _send_status_update {
my $req = shift;
debug('send xfer status');
AC::MrGamoo::API::Xfer::tell_master( $req, 100, 'Working...' );
}
lib/AC/MrGamoo/Xfer.pm view on Meta::CPAN
my $quick = shift;
# how many xfers are running?
my $nrun = 0;
for my $t (values %REGISTRY){
$nrun ++ if $t->{fd};
}
return if $quick && $nrun >= $MAXRUNNING;
# queued? send status, maybe start
for my $t (sort { $a->{_queueprio} <=> $b->{_queueprio} } values %REGISTRY){
next if $t->{fd};
_send_status_update( $t->{request} ) unless $quick;
if( $nrun < $MAXRUNNING ){
$t->_start();
$nrun ++;
}
}
}
1;