view release on metacpan or search on metacpan
# $Id: mrgamoo,v 1.1 2010/11/01 19:02:57 jaw Exp $
my %opt;
use Getopt::Long qw(:config no_ignore_case);
use AC::MrMagoo::Client;
use strict;
GetOptions(\%opt,
"check",
"verbose|v",
"debug|d",
"console",
"start=s",
"end=s",
) || exit;
my $mrm = AC::MrMagoo::Client->new( $ARGV[0], \%opt );
if( $opt{check} ){
$mrm->check_code();
eg/mrgamood view on Meta::CPAN
use Getopt::Std;
use AC::MrMagoo::D;
use strict;
our %OPT;
my @saved_argv = @ARGV;
getopts('c:dDfp:', \%OPT) || die "usage...\n";
# -c config file
# -d enable all debugging
# -f foreground
# -p port
my $mrm = AC::MrMagoo::D->new(
class_filelist => 'My::Code::FileList',
class_readinput => 'My::Code::ReadInput',
class_myself => 'My::Code::MySelf',
);
$mrm->daemon( $OPT{c}, {
argv => \@saved_argv,
foreground => $OPT{f},
debugall => $OPT{d},
port => $OPT{p},
} );
lib/AC/MrGamoo.pm view on Meta::CPAN
=head1 SYNOPSIS
use AC::MrGamoo::D;
use strict;
my $m = AC::MrGamoo::D->new( );
$m->daemon( $configfile, {
argv => \@ARGV,
foreground => $OPT{f},
debugall => $OPT{d},
port => $OPT{p},
} );
exit;
=head1 CONFIG FILE
various parameters need to be specified in a config file.
if you modify the file, it will be reloaded automagically.
lib/AC/MrGamoo.pm view on Meta::CPAN
specify a syslog facility for log messages.
syslog local5
=item basedir
local directory to store files
basedir /home/data
=item debug
enable debugging for a particular section
debug job
=back
=head1 BUGS
Too many to list here.
=head1 SEE ALSO
AC::MrGamoo::Client
lib/AC/MrGamoo/AC/FileList.pm view on Meta::CPAN
# NB: keys in the yenta logfile map are of the form: 20100126150139_eqaB5uSerdeddsOw
$syst = undef if $syst eq '*';
$mode = undef if $mode eq '*';
$syst =~ s/[ ,]/\|/g;
if( $syst ){
$syst = qr/^($syst)$/;
}
debug("mode=$mode, syst=$syst, tmin=$tmin, tmax=$tmax, start=$start");
my @files = grep {
(!$mode || ($_->{environment} eq $mode)) &&
(!$syst || ($_->{subsystem} =~ $syst)) &&
($_->{end_time} >= $tmin) &&
($_->{start_time} <= $tmax)
} map {
#debug("file: $_");
my $d = $yenta->get($_);
$d = $d ? decode_json($d) : {};
$d->{location} = [ map { $CONVERT{$_} || $_ } (split /\s+/, $d->{location}) ];
$d;
} $yenta->getrange($start, undef);
debug("found " .scalar(@files)." files");
return \@files;
}
1;
lib/AC/MrGamoo/API/Client.pm view on Meta::CPAN
my $TIMEOUT = 15;
sub new {
my $class = shift;
my $addr = shift;
my $port = shift;
my $info = shift;
my $req = shift;
my $data = shift;
debug("new client type: $req->{type} to $addr:$port");
my $send = AC::MrGamoo::Protocol->encode_request( $req, $data );
my $me = $class->SUPER::new( $addr, $port,
info => "client $req->{type} to $addr:$port; $info",
request => $send,
);
return $me;
}
sub start {
lib/AC/MrGamoo/API/Client.pm view on Meta::CPAN
$me->run_callback('on_success', { result => $me->{result} } );
}else{
$me->run_callback('on_failure');
}
}
sub _read {
my $me = shift;
my $evt = shift;
debug("recvd reply to $me->{info}");
my($proto, $data, $content) = read_protocol_no_content( $me, $evt );
return unless $proto;
# check response
if( $proto->{is_error} ){
return $me->_uhoh("rcvd error response");
}
$proto->{data} = AC::MrGamoo::Protocol->decode_reply($proto, $data);
debug("recvd reply to $me->{info} - $proto->{data}{status_code} $proto->{data}{status_message}");
if( $proto->{data}{status_code} != 200 ){
return $me->_uh_oh("recvd error reply $proto->{data}{status_code} $proto->{data}{status_message}");
}
$me->{result} = $proto;
$me->{status_ok} = 1;
$me->shut();
}
sub _uh_oh {
my $me = shift;
my $msg = shift;
debug("error $msg");
$me->run_callback('error', { error => $msg } );
$me->shut();
}
1;
lib/AC/MrGamoo/API/Del.pm view on Meta::CPAN
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
# validate filename
my $file = filename($req->{filename});
debug("deleting file $file");
if( $file && -f $file ){
unlink $file;
}
if( -f $file ){
reply( 500, 'Error', $io, $proto, $req );
}else{
reply( 200, 'OK', $io, $proto, $req );
}
lib/AC/MrGamoo/API/Get.pm view on Meta::CPAN
my $file = filename($req->{filename});
my $fd = $io->{fd};
fcntl($fd, F_SETFL, 0); # unset nbio
return nbfd_reply(404, "not found", $fd, $proto, $req) unless -f $file;
open(F, $file) || return nbfd_reply(500, 'error', $fd, $proto, $req);
my $size = (stat($file))[7];
my $sha1 = sha1_file($file);
debug("get file '$file' size $size");
# send header
my $gb = ACPScriblReply->encode( { status_code => 200, status_message => 'OK', hash_sha1 => $sha1 } );
my $hdr = AC::MrGamoo::Protocol->encode_header(
type => $proto->{type},
msgidno => $proto->{msgidno},
is_reply => 1,
data_length => length($gb),
content_length => $size,
);
lib/AC/MrGamoo/API/HB.pm view on Meta::CPAN
hostname => $HOSTNAME,
subsystem => 'mrgamoo',
environment => conf_value('environment'),
port => my_port(),
timestamp => time(),
sort_metric => loadave(),
server_id => my_server_id(),
process_id => $$,
} );
debug("sending hb reply");
$io->write_and_shut( $response );
}
1;
lib/AC/MrGamoo/API/JobAbort.pm view on Meta::CPAN
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
debug("abort job $req->{jobid}");
my $r = AC::MrGamoo::Job->abort( jobid => $req->{jobid} );
if( $r ){
reply( 200, 'OK', $io, $proto, $req );
}else{
reply( 500, 'Error', $io, $proto, $req );
}
}
lib/AC/MrGamoo/API/JobCreate.pm view on Meta::CPAN
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
debug("new job $req->{jobid}");
if( $req->{console} =~ /^:/ ){
# fill in ip addr
$req->{console} = $io->{from_ip} . $req->{console};
}
my $x = AC::MrGamoo::Job->new( %$req );
my $r = $x ? $x->start() : undef;
if( $r ){
lib/AC/MrGamoo/API/Simple.pm view on Meta::CPAN
my $response = AC::MrGamoo::Protocol->encode_reply( {
type => $proto->{type},
msgid => $proto->{msgid},
is_reply => 1,
}, {
status_code => $code,
status_message => $msg,
} );
debug("sending $code reply for $proto->{type} on $io->{info}");
$io->write_and_shut( $response );
}
sub nbfd_reply {
my $code = shift;
my $msg = shift;
my $fd = shift;
my $proto = shift;
my $req = shift;
lib/AC/MrGamoo/API/Simple.pm view on Meta::CPAN
my $response = AC::MrGamoo::Protocol->encode_reply( {
type => $proto->{type},
msgid => $proto->{msgid},
is_reply => 1,
}, {
status_code => $code,
status_message => $msg,
} );
debug("sending $code reply for $proto->{type} (from bkg)");
syswrite( $fd, $response );
}
sub on_success {
my $x = shift;
my $e = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
lib/AC/MrGamoo/API/TaskAbort.pm view on Meta::CPAN
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
debug("abort task $req->{jobid}/$req->{taskid}");
my $r = AC::MrGamoo::Task->abort( jobid => $req->{jobid}, taskid => $req->{taskid} );
if( $r ){
reply( 200, 'OK', $io, $proto, $req );
}else{
reply( 500, 'Error', $io, $proto, $req );
}
}
lib/AC/MrGamoo/API/TaskCreate.pm view on Meta::CPAN
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
debug("new task $req->{jobid}/$req->{taskid}");
my $x = AC::MrGamoo::Task->new( %$req );
my $r = $x ? $x->start() : undef;
if( $r ){
reply( 200, 'OK', $io, $proto, $req );
}else{
reply( 500, 'Error', $io, $proto, $req );
}
}
lib/AC/MrGamoo/API/TaskStatus.pm view on Meta::CPAN
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
debug("updating task status $req->{jobid}/$req->{taskid}");
my $r = AC::MrGamoo::Job->task_status( %$req );
if( $r ){
reply( 200, 'OK', $io, $proto, $req );
}else{
reply( 500, 'Error', $io, $proto, $req );
}
}
lib/AC/MrGamoo/API/Xfer.pm view on Meta::CPAN
my $req = shift;
my $content = shift;
# validate filename
if( $req->{filename} =~ m%/\.|^\.% ){
reply( 500, 'Error', $io, $proto, $req );
return;
}
# new retry
debug("starting file xfer $req->{copyid} => $req->{filename}");
# start working on the copy
my $x = AC::MrGamoo::Retry->new(
newobj => \&_mk_xfer,
newargs => [ $req ],
tryeach => $req->{location},
);
# reply now
if( $x ){
reply( 200, 'OK', $io, $proto, $req );
}else{
debug("sending error, xfer/retrier failed, $io->{info}");
reply( 501, 'Error', $io, $proto, $req );
}
# send status when finished
$x->set_callback('on_success', \&_yippee, $proto, $req);
$x->set_callback('on_failure', \&_boohoo, $proto, $req);
# start
$x->start();
}
lib/AC/MrGamoo/API/Xfer.pm view on Meta::CPAN
tell_master( $req, 200, 'OK' );
}
sub _boohoo {
my $x = shift;
my $e = shift;
my $proto = shift;
my $req = shift;
debug("boohoo - xfer failed $req->{copyid}");
tell_master( $req, 500, 'Failed' );
}
sub tell_master {
my $req = shift;
my $code = shift;
my $msg = shift;
my($addr, $port) = get_peer_addr_from_id( $req->{master} );
debug("sending xfer status update for $req->{copyid} => $code => $req->{master}");
debug("cannot find addr") unless $addr;
return unless $addr;
my $x = AC::MrGamoo::API::Client->new( $addr, $port, "xfer $req->{copyid}", {
type => 'mrgamoo_xferstatus',
msgidno => $MSGID++,
want_reply => 1,
}, {
jobid => $req->{jobid},
copyid => $req->{copyid},
status_code => $code,
status_message => $msg,
} );
debug("cannot create client") unless $x;
return unless $x;
$x->start();
# we don't need any reply or reply callbacks. just send + forget
}
################################################################
1;
lib/AC/MrGamoo/API/XferStatus.pm view on Meta::CPAN
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
debug("updating xfer status $req->{jobid}/$req->{copyid}");
my $r = AC::MrGamoo::Job->xfer_status( %$req );
if( $r ){
reply( 200, 'OK', $io, $proto, $req );
}else{
reply( 500, 'Error', $io, $proto, $req );
}
}
lib/AC/MrGamoo/Client.pm view on Meta::CPAN
my $src = shift;
my $cfg = shift;
my $host = hostname();
my $user = getpwuid($<);
my $trace = "$user/$$\@$host:" . ($from eq 'file' ? $src : 'text');
my $me = bless {
traceinfo => $trace,
}, $class;
$me->{fdebug} = $cfg->{debug} ? sub{ print STDERR "@_\n" } : sub {};
# compile job
my $mr = AC::MrGamoo::Submit::Compile->new( $from => $src );
$me->{program} = $mr;
# merge job %config section with passed in config
$mr->add_config($cfg);
return $me;
}
lib/AC/MrGamoo/Client.pm view on Meta::CPAN
while(1){
my $buf;
recv $fd, $buf, 65535, 0;
my $proto = AC::MrGamoo::Protocol->decode_header($buf);
my $data = substr($buf, AC::MrGamoo::Protocol->header_size());
my $req = AC::MrGamoo::Protocol->decode_request($proto, $data);
last if $req->{type} eq 'finish';
print STDERR "$req->{msg}" if $req->{type} eq 'stderr';
print "$req->{msg}" if $req->{type} eq 'stdout';
$me->{fdebug}->("$req->{server_id}\t$req->{msg}") if $req->{type} eq 'debug';
}
}
sub submit {
my $me = shift;
my $seed = shift; # [ "ipaddr:port", ... ]
my $mr = $me->{program};
my $r = AC::MrGamoo::Submit::Request->new( $mr );
$r->{eu_print_stderr} = sub { print STDERR "@_\n" };
lib/AC/MrGamoo/Client.pm view on Meta::CPAN
jobid => $me->{id},
options => to_json( $r->{config} ),
initres => to_json( $initres, {allow_nonref => 1} ),
jobsrc => $mr->src(),
console => ($me->{console_port} ? ":$me->{console_port}" : ''),
traceinfo => $me->{traceinfo},
} );
my $ok;
if( my $master = $me->get_config_param('master') ){
# use specified master (for debugging)
my($addr, $port) = split /:/, $master;
$me->_submit_to( $addr, $port, $req );
$me->{master} = { addr => $addr, port => $port };
$ok = 1;
}else{
# pick server
$ok = $me->_pick_master_and_send( $req, $seed );
}
return $ok ? $me->{id} : undef;
lib/AC/MrGamoo/Client.pm view on Meta::CPAN
my $listreq = AC::MrGamoo::Protocol->encode_request( {
type => 'mrgamoo_status',
msgidno => $^T,
want_reply => 1,
}, {});
# get the full list of servers
# contact each seed passed in above, until we get a reply
for my $s ( @$seed ){
my($addr, $port) = split /:/, $s;
$me->{fdebug}->("attempting to fetch server list from $addr:$port");
eval {
alarm(1);
my $reply = AC::MrGamoo::Protocol->send_request( inet_aton($addr), $port, $listreq, $me->{fdebug} );
my $res = AC::MrGamoo::Protocol->decode_reply($reply);
alarm(0);
my $list = $res->{status};
@serverlist = @$list if $list && @$list;
};
last if @serverlist;
}
# sort+filter list
@serverlist = sort { ($a->{sort_metric} <=> $b->{sort_metric}) || int(rand(3)) - 1 }
lib/AC/MrGamoo/Client.pm view on Meta::CPAN
}
return ;
}
sub _submit_to {
my $me = shift;
my $addr = shift;
my $port = shift;
my $req = shift;
$me->{fdebug}->("sending job to $addr:$port");
my $reply = AC::MrGamoo::Protocol->send_request( inet_aton($addr), $port, $req, $me->{fdebug}, 120 );
my $res = AC::MrGamoo::Protocol->decode_reply($reply);
return $res;
}
################################################################
sub check_code {
my $me = shift;
lib/AC/MrGamoo/Config.pm view on Meta::CPAN
use Socket;
use strict;
our @ISA = 'AC::ConfigFile::Simple';
our @EXPORT = qw(conf_value);
my %CONFIG = (
include => \&AC::ConfigFile::Simple::include_file,
debug => \&AC::ConfigFile::Simple::parse_debug,
allow => \&AC::ConfigFile::Simple::parse_allow,
port => \&AC::ConfigFile::Simple::parse_keyvalue,
environment => \&AC::ConfigFile::Simple::parse_keyvalue,
basedir => \&AC::ConfigFile::Simple::parse_keyvalue,
syslog => \&AC::ConfigFile::Simple::parse_keyvalue,
seedpeer => \&AC::ConfigFile::Simple::parse_keyarray,
scriblr => \&AC::ConfigFile::Simple::parse_keyvalue,
sortprog => \&AC::ConfigFile::Simple::parse_keyvalue,
gzprog => \&AC::ConfigFile::Simple::parse_keyvalue,
);
lib/AC/MrGamoo/D.pm view on Meta::CPAN
AC::MrGamoo::FileList->customize( $p{class_filelist} );
AC::MrGamoo::ReadInput->customize( $p{class_readinput} );
# ...
return bless \$class, $class;
}
sub daemon {
my $me = shift;
my $cfile = shift;
my $opt = shift; # foreground, debugall, persistent_id, argv
die "no config file specified\n" unless $cfile;
# configure
$AC::MrGamoo::CONF = AC::MrGamoo::Config->new(
$cfile, onreload => sub {},
);
initlog( 'mrgamoo', (conf_value('syslog') || 'local4'), $opt->{debugall} );
AC::MrGamoo::Debug->init( $opt->{debugall}, $AC::MrGamoo::CONF );
daemonize(5, 'mrgamood', $opt->{argv}) unless $opt->{foreground};
verbose("starting.");
$SIG{CHLD} = $SIG{PIPE} = sub {}; # ignore
$SIG{INT} = $SIG{TERM} = $SIG{QUIT} = \&AC::DC::IO::request_exit; # abort
# initialize subsystems
my $port = $opt->{port} || conf_value('port');
lib/AC/MrGamoo/D.pm view on Meta::CPAN
AC::DC::IO::TCP::Server->new( $port, 'AC::MrGamoo::Server' );
# start "cronjobs"
AC::DC::Sched->new(
info => 'check config files',
freq => 30,
func => sub { $AC::MrGamoo::CONF->check() },
);
run_and_watch(
($opt->{foreground} || $opt->{debugall}),
\&AC::DC::IO::mainloop,
);
}
1;
lib/AC/MrGamoo/Debug.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-22 13:21 (EST)
# Function: debugging
#
# $Id: Debug.pm,v 1.1 2010/11/01 18:41:41 jaw Exp $
package AC::MrGamoo::Debug;
use AC::DC::Debug;
our @ISA = 'AC::DC::Debug';
use strict;
1;
lib/AC/MrGamoo/Job.pm view on Meta::CPAN
$me->{server_info}{$_->{id}} = {} for @$servers;
$me->_preload_file_copies();
$REGISTRY{ $me->{request}{jobid} } = $me;
return $me;
}
sub start {
my $me = shift;
debug("start job");
$me->{euconsole}->send_msg('debug', 'starting job');
$me->_try_to_do_something();
1;
}
################################################################
# record status rcvd from task
sub task_status {
my $me = _find(shift, @_);
my %p = @_;
lib/AC/MrGamoo/Job.pm view on Meta::CPAN
return unless $c;
$c->update_status( $me, $p{status_code} );
1;
}
################################################################
sub periodic {
# debug("periodic check");
$_trying = 0;
for my $job (values %REGISTRY){
$job->_periodic();
}
}
sub _periodic {
my $me = shift;
lib/AC/MrGamoo/Job.pm view on Meta::CPAN
my $tr = keys %{ $me->{task_running} };
my $tp = keys %{ $me->{task_pending} };
my $xr = keys %{ $me->{xfer_running} };
my $xp = keys %{ $me->{xfer_pending} };
my $rr = keys %{ $me->{request_running} };
my $rp = keys %{ $me->{request_pending} };
my $ph = $me->{plan}{phases}[ $me->{phase_no} ] || 'none';
debug("status: phase $ph, task $tr / $tp, xfer $xr / $xp, reqs $rr / $rp")
if $tr || $tp || $xr || $xp || $rr || $rp;
}
################################################################
sub _start_next_phase {
my $me = shift;
debug("next phase");
$me->{phase_no} ++;
my $tp = $me->{plan}{taskplan}[ $me->{phase_no} ];
# finished all phases ?
unless( $tp ){
if( $me->{_cleanedup} ){
$me->_finished();
return 1;
}else{
$me->_cleanup_files();
return;
}
}
debug("job $me->{request}{jobid} starting phase $tp->{phase}");
$me->{euconsole}->send_msg('debug', "starting phase $tp->{phase}");
# move tasks to pending
for my $t (@{$tp->{task}}){
$t->pend($me);
}
}
sub _maybe_start_task {
my $me = shift;
my $task = shift;
#debug("maybe start task");
my $server = $task->{server};
my $underway = keys %{$me->{server_info}{$server}{task_running}};
return if $underway >= $TASKSRVRMAX; # don't overload server
return if $task->{start_after} > $^T; # rate limit retries
# RSN - check that prerequisite xfers completed
$task->start( $me );
return 1;
}
sub _maybe_start_xfer {
my $me = shift;
my $copy = shift;
#debug("maybe start xfer");
my $server = $copy->{server};
my $underway = keys %{$me->{server_info}{$server}{xfer_running}};
return if $underway >= $XFERSRVRMAX; # don't overload server
return if $copy->{start_after} > $^T; # rate limit retries
$copy->start( $me );
return 1;
}
lib/AC/MrGamoo/Job.pm view on Meta::CPAN
$c->pend($me);
}
}
################################################################
sub _try_to_do_something {
my $me = shift;
# debug("try something");
return if $me->{_finished};
return if $_trying;
# is this phase done
if( !(keys %{$me->{task_running}})
&& !(keys %{$me->{task_pending}})
&& !(keys %{$me->{xfer_running}})
&& !(keys %{$me->{xfer_pending}})
&& !(keys %{$me->{request_running}})
lib/AC/MrGamoo/Job/Action.pm view on Meta::CPAN
my $x = shift;
my $server = $me->{server};
my $id = $me->{id};
$me->{start_time} = time();
# remove from _pending
# add to _running
# add to server_info._running
debug("$x started $id on $server");
delete $job->{"${x}_pending"}{$id};
$job->{"${x}_running"}{$id} = $me;
$job->{server_info}{$server}{"${x}_running"}{$id} = $me;
}
sub finished {
my $me = shift;
my $job = shift;
my $x = shift;
lib/AC/MrGamoo/Job/Action.pm view on Meta::CPAN
# remove from _running
# remove from server_info._running
# add to server_info._finished
# record timing info
my $started = $me->{start_time};
$job->{statistics}{"${x}_run_time"} += time() - $started;
$job->{statistics}{"${x}_run"} ++;
debug("$x finished $id on $server");
delete $job->{"${x}_running"}{$id};
delete $job->{server_info}{$server}{"${x}_running"}{$id};
$me->{server_info}{$server}{"${x}_finished"}{$id} = 1;
}
sub failed {
my $me = shift;
my $job = shift;
my $x = shift;
lib/AC/MrGamoo/Job/Done.pm view on Meta::CPAN
package AC::MrGamoo::Job;
use File::Path 'rmtree';
use strict;
our %REGISTRY;
our $MSGID;
sub _finished {
my $me = shift;
debug("finished");
$me->{statistics}{cleanup_time} = time() - $me->{statistics}{cleanup_start};
verbose("job stats: task $me->{statistics}{task_run} $me->{statistics}{task_run_time}, " .
"copy: $me->{statistics}{xfer_run} $me->{statistics}{xfer_run_time}, " .
"job: $me->{statistics}{job_time}, " .
"cleanup: $me->{statistics}{cleanup_files} $me->{statistics}{cleanup_time}");
$me->{euconsole}->send_msg('debug', 'finished job');
$me->{_finished} = 1;
delete $REGISTRY{ $me->{request}{jobid} };
# do something?
}
################################################################
sub abort {
my $me = _find(shift, @_);
my %p = @_;
return unless $me;
debug("abort job $me->{request}{jobid}");
$me->{task_pending} = {};
$me->{xfer_pending} = {};
$me->_cleanup_tasks();
$me->_cleanup_files();
return if $me->{aborted};
$me->{aborted} = 1;
lib/AC/MrGamoo/Job/Done.pm view on Meta::CPAN
################################################################
sub _cleanup_files {
my $me = shift;
$me->{_cleanedup} = 1;
$me->{statistics}{cleanup_start} = time();
$me->{statistics}{job_time} = time() - $me->{statistics}{job_start};
$me->{euconsole}->send_msg('debug', 'cleaning up');
$me->{euconsole}->send_msg('finish', 'finish');
# remove tmp files
for my $fi (@{$me->{tmp_file}}){
# debug("deleting $fi->{filename} from $fi->{server}");
$me->{statistics}{cleanup_files} ++;
AC::MrGamoo::Job::Request->new( $me,
id => unique(),
server => $fi->{server},
info => "delete $fi->{filename} from $fi->{server}",
proto => {
type => 'mrgamoo_filedel',
msgidno => $^T,
want_reply => 1,
lib/AC/MrGamoo/Job/Done.pm view on Meta::CPAN
if( -f $p ){
# there should not be files here. remove.
unlink $p;
next;
}
my $mtime = (stat($p))[9];
next unless $^T - $mtime > 24 * 3600;
debug("removing old dir: $d");
rmtree( $p, undef, undef );
}
}
AC::DC::Sched->new(
info => 'clean up old dirs',
func => \&_cleanup_old_tmp_dirs,
freq => 3600,
);
lib/AC/MrGamoo/Job/Plan.pm view on Meta::CPAN
}
# summary
my %task;
for my $ts (@task){
for my $t ( @{$ts->{task}} ){
$task{ $t->{id} } = $t;
}
}
# debug("plan: " . dumper( \@task ));
debug("infiles: " . @$files . ", precopy: " . @$plancopy . ", maps: " . @$planmap . ", reduces: $nr x $nrp");
return bless {
nserver => scalar(@$servers),
nreduce => $nr,
copying => $plancopy,
phases => \@phase,
taskplan => \@task,
redbin => $redbin,
taskidx => \%task,
}, $class;
lib/AC/MrGamoo/Job/Plan.pm view on Meta::CPAN
map { $_->[1] }
sort{ $a->[0] <=> $b->[0] }
map { [(1 + $bytes{$_}) * (1 + $load->{$_}{metric}), $_] }
grep { $load->{$_}{use} }
(keys %$load);
# copy the file
my @loc = $sa;
push @loc, $sb if $sb;
my $newfile = "mrtmp/j_$job->{request}{jobid}/intmp_" . unique();
debug("no active servers have file: $f->{filename}, copying to @loc => $newfile");
my $ff = {
filename => $newfile,
location => \@loc,
size => $f->{size},
};
push @{$filemap{$sa}}, $ff;
$bytes{$sa} += $ff->{size};
# need to copy this file from its current location to the server(s) that will run map on it
lib/AC/MrGamoo/Job/RePlan.pm view on Meta::CPAN
package AC::MrGamoo::Job;
use strict;
sub _replan {
my $me = shift;
my $server = shift;
my $x = shift;
my $obj = shift;
debug("replan $server, $x $obj->{id}");
if( $x eq 'xfer' ){
$obj = $me->_replan_xfer_fails_task( $obj );
}
return unless $obj;
$obj->replan($me);
}
sub _replan_server {
my $me = shift;
my $server = shift;
my $x = shift;
my $obj = shift;
debug("replan down server $server, $x $obj->{id}");
my $cpn = $me->{phase_no};
$cpn ++ if $x eq 'xfer';
my @replan;
# replan all tasks on server
for my $pn ($cpn .. @{$me->{plan}{phases}} - 1){
for my $t ( @{$me->{plan}{taskplan}[$pn]{task}} ){
next unless $t->{server} eq $server;
push @replan, $t;
}
lib/AC/MrGamoo/Job/RePlan.pm view on Meta::CPAN
my $file = $obj->{dstname} || $obj->{filename};
for my $task (keys %{$me->{plan}{taskidx}}){
my $ti = $me->{plan}{taskidx}{$task};
next unless $obj->{server} eq $ti->{server};
for my $in ( @{$ti->{infile}} ){
next unless $in eq $file;
debug("failed xfer $obj->{id} => fails task $task on $ti->{server}");
return $ti;
}
}
}
1;
lib/AC/MrGamoo/Job/Request.pm view on Meta::CPAN
my $me = bless { @_ }, $class;
$job->{request_pending}{$me->{id}} = $me;
return $me;
}
sub start {
my $me = shift;
my $job = shift;
debug("starting request $me->{info}");
delete $job->{request_pending}{$me->{id}};
my $x = $job->_send_request( $me->{server}, $me->{info}, $me->{proto}, $me->{request});
unless( $x ){
verbose("cannot start request");
return;
}
$x->set_callback('on_success', \&_cb_start_req, $me, $job, 1);
lib/AC/MrGamoo/Job/Request.pm view on Meta::CPAN
$x->start();
}
sub _cb_start_req {
my $io = shift;
my $evt = shift;
my $me = shift;
my $job = shift;
my $ok = shift;
debug("request finished $me->{info}");
delete $job->{request_running}{$me->{id}};
$job->_try_to_do_something()
if $ok
&& (keys %{$job->{request_pending}})
&& (keys %{$job->{request_running}} < 5); # we go faster, if we can start a few at a time
}
1;
lib/AC/MrGamoo/Job/Task.pm view on Meta::CPAN
my $me = bless {
id => $id,
info => $info,
server => $server,
created => time(),
};
$job->{task_pending}{$id} = $me;
debug(" => pending task $info->{id} => $id on $server");
return $me;
}
sub start {
my $me = shift;
my $job = shift;
my $server = $me->{server};
debug("starting task $job->{request}{jobid}/$me->{info}{id}/$me->{id} on $server");
# send request to server
my $ti = $me->{info};
my $x = $job->_send_request( $server, "task $me->{id}", {
type => 'mrgamoo_taskcreate',
msgidno => $^T,
want_reply => 1,
}, {
jobid => $job->{request}{jobid},
lib/AC/MrGamoo/Job/Task.pm view on Meta::CPAN
my $me = shift;
my $job = shift;
my $phase = shift;
my $progress = shift;
$me->{status_time} = $^T;
$me->{status_phase} = $phase;
$me->{status_amt} = $progress;
$me->{status_fail} = 1 if $phase eq 'FAILED';
debug("task is $phase $progress");
if( $phase eq 'FINISHED' ){
if( $me->{status_fail} ){
$me->failed( $job, "status fail" );
}else{
$me->finished( $job );
}
}
return 1;
}
sub failed {
my $me = shift;
my $job = shift;
my $why = shift;
debug("task failed: $why $me->{status_time}");
return if $job->something_failed();
$me->SUPER::failed($job, 'task');
$me->{info}->failed( $me, $job );
if( $why eq 'timeout' ){
$me->abort($job)
}else{
# $job->_try_to_do_something();
}
}
sub finished {
my $me = shift;
my $job = shift;
my $why = shift;
debug('task finish');
$me->SUPER::finished($job, 'task');
$me->{info}->finished( $me, $job );
$job->_try_to_do_something();
}
sub abort {
my $me = shift;
my $job = shift;
debug("aborting task $me->{id}");
AC::MrGamoo::Job::Request->new( $job,
id => unique(),
server => $me->{server},
info => "abort task $me->{id}",
proto => {
type => 'mrgamoo_taskabort',
msgidno => $^T,
want_reply => 1,
},
lib/AC/MrGamoo/Job/TaskInfo.pm view on Meta::CPAN
my $me = shift;
my $t = shift;
my $job = shift;
delete $me->{instance}{ $t->{id} };
$me->{finished} = 1;
my $outfiles = $me->{outfile};
my $server = $t->{server};
debug("task finished $me->{id} on $server");
# copy files
for my $fi (@$outfiles){
# add to file_info - file is now on one server
debug(" outfile $fi->{filename}");
$job->{file_info}{ $fi->{filename} } = {
filename => $fi->{filename},
location => [ $server ],
};
$job->{server_info}{$server}{has_files}{$fi->{filename}} = 1;
# QQQ - optionally leave final files?
push @{$job->{tmp_file}}, { filename => $fi->{filename}, server => $server };
# add to copy_pending
foreach my $s ( @{$fi->{dst}} ){
next if $job->{server_info}{$s}{has_files}{$fi->{filename}};
my $c = AC::MrGamoo::Job::XferInfo->new( $job,
id => unique(),
filename => $fi->{filename},
dst => $s,
);
next unless $c;
$c->pend($job);
debug(" => pending copy for $s");
}
}
}
sub failed {
my $me = shift;
my $t = shift;
my $job = shift;
delete $me->{instance}{ $t->{id} };
lib/AC/MrGamoo/Job/TaskInfo.pm view on Meta::CPAN
return;
}
if( $me->{retries} ++ > $MAXRETRY ){
# replan tasks
$me->replan($job);
return;
}
# retry
debug("retry task");
$me->pend($job);
}
################################################################
sub replan {
my $me = shift;
my $job = shift;
return if $me->{replaced};
lib/AC/MrGamoo/Job/TaskInfo.pm view on Meta::CPAN
}
sub _replan_altserver {
my $me = shift;
my $job = shift;
$me->{server} = $me->{altserver};
delete $me->{retries};
delete $me->{altserver};
debug("replanning task to new server");
$me->pend($job);
}
sub _replan_map {
my $me = shift;
my $job = shift;
# remove task
# divy files among servers
# create new tasks
lib/AC/MrGamoo/Job/TaskInfo.pm view on Meta::CPAN
id => $newid,
phase => $me->{phase},
infile => $newplan{$as},
replaces => $oldid,
outfile => [ map {
(my $f = $_->{filename}) =~ s/$oldid/$newid/;
{ dst => $_->{dst}, filename => $f, }
} @{$me->{outfile}} ],
server => $as,
);
debug("replan map $oldid => $newid on $as");
# keep plan up to date
$job->{plan}{taskidx}{$newid} = $new;
push @{$job->{plan}{taskplan}[0]{task}}, $new;
# move to pending queue
$new->pend($job) if $job->{phase_no} == 0;
push @new, $new;
}
lib/AC/MrGamoo/Job/Xfer.pm view on Meta::CPAN
my $me = bless {
id => $id,
info => $info,
server => $server,
created => $^T,
};
$job->{xfer_pending}{$id} = $me;
debug("pending xfer $info->{id} => $id on $server");
return $me;
}
sub start {
my $me = shift;
my $job = shift;
# send request to server
my $server = $me->{server};
my $filename = $me->{info}{filename};
debug("starting xfer $me->{id} on $server of $filename");
my $x = $job->_send_request( $server, "xfer $me->{id}", {
type => 'mrgamoo_filexfer',
msgidno => $^T,
want_reply => 1,
}, {
jobid => $job->{request}{jobid},
copyid => $me->{id},
filename => $filename,
dstname => ($me->{info}{dstname} || $filename),
lib/AC/MrGamoo/Job/Xfer.pm view on Meta::CPAN
$me->failed($job, 'network');
}
# record status rcvd from file xfer
sub update_status {
my $me = shift;
my $job = shift;
my $code = shift;
debug("xfer is $code");
$me->{status_code} = $code;
$me->{status_time} = $^T;
if( $code == 100 ){
# nop
}elsif( $code == 200 ){
$me->finished( $job );
}else{
$me->failed( $job, "status $code" );
}
}
sub failed {
my $me = shift;
my $job = shift;
my $why = shift;
debug("xfer failed: $why");
return if $job->something_failed();
$me->SUPER::failed($job, 'xfer');
$me->{info}->failed( $me, $job );
# $job->_try_to_do_something() unless $why eq 'timeout';
}
sub finished {
my $me = shift;
my $job = shift;
debug('xfer finish');
my $server = $me->{server};
my $file = $me->{info}{dstname} || $me->{info}{filename};
$me->SUPER::finished($job, 'xfer');
$me->{info}->finished( $me, $job );
# add to server_info.has_files
# add to file_info, tmp_file
$job->{server_info}{$server}{has_files}{$file} = 1;
lib/AC/MrGamoo/Job/XferInfo.pm view on Meta::CPAN
return;
}
if( $me->{retries} ++ > $MAXRETRY ){
# replan tasks
$job->_replan($server, 'xfer', $me);
return;
}
# retry
debug("retry xfer");
$me->pend($job);
}
sub finished {
my $me = shift;
my $x = shift;
my $job = shift;
delete $me->{instance}{ $x->{id} };
$me->{finished} = 1;