AC-MrGamoo

 view release on metacpan or  search on metacpan

eg/mrgamoo  view on Meta::CPAN

# $Id: mrgamoo,v 1.1 2010/11/01 19:02:57 jaw Exp $

my %opt;
use Getopt::Long qw(:config no_ignore_case);
use AC::MrMagoo::Client;
use strict;

GetOptions(\%opt,
           "check",
           "verbose|v",
           "debug|d",
           "console",
           "start=s",
           "end=s",
          ) || exit;


my $mrm = AC::MrMagoo::Client->new( $ARGV[0],  \%opt );

if( $opt{check} ){
    $mrm->check_code();

eg/mrgamood  view on Meta::CPAN


use Getopt::Std;
use AC::MrMagoo::D;
use strict;

our %OPT;
my @saved_argv = @ARGV;

getopts('c:dDfp:', \%OPT) || die "usage...\n";
# -c config file
# -d    enable all debugging
# -f    foreground
# -p port


my $mrm = AC::MrMagoo::D->new(
    class_filelist	=> 'My::Code::FileList',
    class_readinput	=> 'My::Code::ReadInput',
    class_myself	=> 'My::Code::MySelf',
   );


$mrm->daemon( $OPT{c}, {
    argv	=> \@saved_argv,
    foreground	=> $OPT{f},
    debugall	=> $OPT{d},
    port	=> $OPT{p},
} );

lib/AC/MrGamoo.pm  view on Meta::CPAN

=head1 SYNOPSIS

    use AC::MrGamoo::D;
    use strict;

    my $m = AC::MrGamoo::D->new( );

    $m->daemon( $configfile, {
      argv		=> \@ARGV,
      foreground	=> $OPT{f},
      debugall		=> $OPT{d},
      port		=> $OPT{p},
    } );

    exit;

=head1 CONFIG FILE

various parameters need to be specified in a config file.
if you modify the file, it will be reloaded automagically.

lib/AC/MrGamoo.pm  view on Meta::CPAN

specify a syslog facility for log messages.

    syslog local5

=item basedir

local directory to store files

    basedir         /home/data

=item debug

enable debugging for a particular section

    debug job

=back

=head1 BUGS

Too many to list here.

=head1 SEE ALSO

    AC::MrGamoo::Client

lib/AC/MrGamoo/AC/FileList.pm  view on Meta::CPAN


    # NB: keys in the yenta logfile map are of the form: 20100126150139_eqaB5uSerdeddsOw

    $syst = undef if $syst eq '*';
    $mode = undef if $mode eq '*';
    $syst =~ s/[ ,]/\|/g;
    if( $syst ){
        $syst = qr/^($syst)$/;
    }

    debug("mode=$mode, syst=$syst, tmin=$tmin, tmax=$tmax, start=$start");
    my @files = grep {
        (!$mode || ($_->{environment} eq $mode)) &&
        (!$syst || ($_->{subsystem}   =~ $syst)) &&
        ($_->{end_time}    >= $tmin) &&
        ($_->{start_time}  <= $tmax)
    } map {
        #debug("file: $_");
        my $d = $yenta->get($_);
        $d = $d ? decode_json($d) : {};
        $d->{location} = [ map { $CONVERT{$_} || $_ } (split /\s+/, $d->{location}) ];
        $d;
    } $yenta->getrange($start, undef);

    debug("found " .scalar(@files)." files");
    return \@files;
}


1;

lib/AC/MrGamoo/API/Client.pm  view on Meta::CPAN

my $TIMEOUT  = 15;

sub new {
    my $class = shift;
    my $addr  = shift;
    my $port  = shift;
    my $info  = shift;
    my $req   = shift;
    my $data  = shift;

    debug("new client type: $req->{type} to $addr:$port");
    my $send = AC::MrGamoo::Protocol->encode_request( $req, $data );
    my $me   = $class->SUPER::new( $addr, $port,
                                 info	 => "client $req->{type} to $addr:$port; $info",
                                 request => $send,
                                );

    return $me;
}

sub start {

lib/AC/MrGamoo/API/Client.pm  view on Meta::CPAN

        $me->run_callback('on_success', { result => $me->{result} } );
    }else{
        $me->run_callback('on_failure');
    }
}

sub _read {
    my $me  = shift;
    my $evt = shift;

    debug("recvd reply to $me->{info}");

    my($proto, $data, $content) = read_protocol_no_content( $me, $evt );
    return unless $proto;

    # check response
    if( $proto->{is_error} ){
        return $me->_uhoh("rcvd error response");
    }

    $proto->{data} = AC::MrGamoo::Protocol->decode_reply($proto, $data);
    debug("recvd reply to $me->{info} - $proto->{data}{status_code} $proto->{data}{status_message}");

    if( $proto->{data}{status_code} != 200 ){
        return $me->_uh_oh("recvd error reply $proto->{data}{status_code} $proto->{data}{status_message}");
    }

    $me->{result}    = $proto;
    $me->{status_ok} = 1;
    $me->shut();
}

sub _uh_oh {
    my $me  = shift;
    my $msg = shift;

    debug("error $msg");
    $me->run_callback('error', { error => $msg } );
    $me->shut();
}


1;

lib/AC/MrGamoo/API/Del.pm  view on Meta::CPAN


sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    # validate filename
    my $file = filename($req->{filename});
    debug("deleting file $file");

    if( $file && -f $file ){
        unlink $file;
    }

    if( -f $file ){
        reply( 500, 'Error', $io, $proto, $req );
    }else{
        reply( 200, 'OK', $io, $proto, $req );
    }

lib/AC/MrGamoo/API/Get.pm  view on Meta::CPAN


    my $file = filename($req->{filename});
    my $fd = $io->{fd};
    fcntl($fd, F_SETFL, 0);	# unset nbio

    return nbfd_reply(404, "not found", $fd, $proto, $req) unless -f $file;
    open(F, $file) || return nbfd_reply(500, 'error', $fd, $proto, $req);
    my $size = (stat($file))[7];
    my $sha1 = sha1_file($file);

    debug("get file '$file' size $size");

    # send header
    my $gb  = ACPScriblReply->encode( { status_code => 200, status_message => 'OK', hash_sha1 => $sha1 } );
    my $hdr = AC::MrGamoo::Protocol->encode_header(
        type		=> $proto->{type},
        msgidno		=> $proto->{msgidno},
        is_reply	=> 1,
        data_length	=> length($gb),
        content_length	=> $size,
       );

lib/AC/MrGamoo/API/HB.pm  view on Meta::CPAN

        hostname	=> $HOSTNAME,
        subsystem	=> 'mrgamoo',
        environment	=> conf_value('environment'),
        port		=> my_port(),
        timestamp	=> time(),
        sort_metric	=> loadave(),
        server_id	=> my_server_id(),
        process_id	=> $$,
    } );

    debug("sending hb reply");
    $io->write_and_shut( $response );

}



1;

lib/AC/MrGamoo/API/JobAbort.pm  view on Meta::CPAN

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;


    debug("abort job $req->{jobid}");

    my $r = AC::MrGamoo::Job->abort( jobid => $req->{jobid} );

    if( $r ){
        reply( 200, 'OK', $io, $proto, $req );
    }else{
        reply( 500, 'Error', $io, $proto, $req );
    }
}

lib/AC/MrGamoo/API/JobCreate.pm  view on Meta::CPAN


use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    debug("new job $req->{jobid}");

    if( $req->{console} =~ /^:/ ){
        # fill in ip addr
        $req->{console} = $io->{from_ip} . $req->{console};
    }

    my $x = AC::MrGamoo::Job->new( %$req );
    my $r = $x ? $x->start() : undef;

    if( $r ){

lib/AC/MrGamoo/API/Simple.pm  view on Meta::CPAN


    my $response = AC::MrGamoo::Protocol->encode_reply( {
        type            => $proto->{type},
        msgid           => $proto->{msgid},
        is_reply        => 1,
    }, {
        status_code	=> $code,
        status_message	=> $msg,
    } );

    debug("sending $code reply for $proto->{type} on $io->{info}");
    $io->write_and_shut( $response );
}

sub nbfd_reply {
    my $code    = shift;
    my $msg     = shift;
    my $fd      = shift;
    my $proto   = shift;
    my $req     = shift;

lib/AC/MrGamoo/API/Simple.pm  view on Meta::CPAN


    my $response = AC::MrGamoo::Protocol->encode_reply( {
        type            => $proto->{type},
        msgid           => $proto->{msgid},
        is_reply        => 1,
    }, {
        status_code	=> $code,
        status_message	=> $msg,
    } );

    debug("sending $code reply for $proto->{type} (from bkg)");
    syswrite( $fd, $response );
}

sub on_success {
    my $x  = shift;
    my $e  = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;

lib/AC/MrGamoo/API/TaskAbort.pm  view on Meta::CPAN

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;


    debug("abort task $req->{jobid}/$req->{taskid}");

    my $r = AC::MrGamoo::Task->abort( jobid => $req->{jobid}, taskid => $req->{taskid} );

    if( $r ){
        reply( 200, 'OK', $io, $proto, $req );
    }else{
        reply( 500, 'Error', $io, $proto, $req );
    }
}

lib/AC/MrGamoo/API/TaskCreate.pm  view on Meta::CPAN


use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    debug("new task $req->{jobid}/$req->{taskid}");
    my $x = AC::MrGamoo::Task->new( %$req );
    my $r = $x ? $x->start() : undef;

    if( $r ){
        reply( 200, 'OK', $io, $proto, $req );
    }else{
        reply( 500, 'Error', $io, $proto, $req );
    }
}

lib/AC/MrGamoo/API/TaskStatus.pm  view on Meta::CPAN

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;


    debug("updating task status $req->{jobid}/$req->{taskid}");

    my $r = AC::MrGamoo::Job->task_status( %$req );

    if( $r ){
        reply( 200, 'OK', $io, $proto, $req );
    }else{
        reply( 500, 'Error', $io, $proto, $req );
    }
}

lib/AC/MrGamoo/API/Xfer.pm  view on Meta::CPAN

    my $req     = shift;
    my $content = shift;

    # validate filename
    if( $req->{filename} =~ m%/\.|^\.% ){
        reply( 500, 'Error', $io, $proto, $req );
        return;
    }

    # new retry
    debug("starting file xfer $req->{copyid} => $req->{filename}");

    # start working on the copy
    my $x = AC::MrGamoo::Retry->new(
        newobj	=> \&_mk_xfer,
        newargs => [ $req ],
        tryeach	=> $req->{location},
       );

    # reply now
    if( $x ){
        reply( 200, 'OK', $io, $proto, $req );
    }else{
        debug("sending error, xfer/retrier failed, $io->{info}");
        reply( 501, 'Error', $io, $proto, $req );
    }

    # send status when finished
    $x->set_callback('on_success', \&_yippee, $proto, $req);
    $x->set_callback('on_failure', \&_boohoo, $proto, $req);

    # start
    $x->start();
}

lib/AC/MrGamoo/API/Xfer.pm  view on Meta::CPAN


    tell_master( $req, 200, 'OK' );
}

sub _boohoo {
    my $x  = shift;
    my $e  = shift;
    my $proto = shift;
    my $req   = shift;

    debug("boohoo - xfer failed $req->{copyid}");
    tell_master( $req, 500, 'Failed' );
}

sub tell_master {
    my $req   = shift;
    my $code  = shift;
    my $msg   = shift;

    my($addr, $port) = get_peer_addr_from_id( $req->{master} );
    debug("sending xfer status update for $req->{copyid} => $code => $req->{master}");
    debug("cannot find addr") unless $addr;
    return unless $addr;

    my $x = AC::MrGamoo::API::Client->new( $addr, $port, "xfer $req->{copyid}", {
        type		=> 'mrgamoo_xferstatus',
        msgidno		=> $MSGID++,
        want_reply	=> 1,
    }, {
        jobid		=> $req->{jobid},
        copyid		=> $req->{copyid},
        status_code	=> $code,
        status_message	=> $msg,
    } );

    debug("cannot create client") unless $x;
    return unless $x;
    $x->start();

    # we don't need any reply or reply callbacks. just send + forget
}


################################################################

1;

lib/AC/MrGamoo/API/XferStatus.pm  view on Meta::CPAN

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;


    debug("updating xfer status $req->{jobid}/$req->{copyid}");

    my $r = AC::MrGamoo::Job->xfer_status( %$req );

    if( $r ){
        reply( 200, 'OK', $io, $proto, $req );
    }else{
        reply( 500, 'Error', $io, $proto, $req );
    }
}

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

    my $src  = shift;
    my $cfg  = shift;

    my $host = hostname();
    my $user = getpwuid($<);
    my $trace = "$user/$$\@$host:" . ($from eq 'file' ? $src : 'text');

    my $me   = bless {
        traceinfo	=> $trace,
    }, $class;
    $me->{fdebug} = $cfg->{debug} ? sub{ print STDERR "@_\n" } : sub {};

    # compile job
    my $mr = AC::MrGamoo::Submit::Compile->new( $from => $src );
    $me->{program} = $mr;

    # merge job %config section with passed in config
    $mr->add_config($cfg);

    return $me;
}

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN


    while(1){
        my $buf;
        recv $fd, $buf, 65535, 0;
        my $proto = AC::MrGamoo::Protocol->decode_header($buf);
        my $data  = substr($buf, AC::MrGamoo::Protocol->header_size());
        my $req   = AC::MrGamoo::Protocol->decode_request($proto, $data);
        last if $req->{type} eq 'finish';
        print STDERR "$req->{msg}"                        if $req->{type} eq 'stderr';
        print "$req->{msg}"                               if $req->{type} eq 'stdout';
        $me->{fdebug}->("$req->{server_id}\t$req->{msg}") if $req->{type} eq 'debug';
    }
}

sub submit {
    my $me   = shift;
    my $seed = shift;	# [ "ipaddr:port", ... ]

    my $mr = $me->{program};
    my $r = AC::MrGamoo::Submit::Request->new( $mr );
    $r->{eu_print_stderr} = sub { print STDERR "@_\n" };

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

        jobid		=> $me->{id},
        options		=> to_json( $r->{config} ),
        initres		=> to_json( $initres, {allow_nonref => 1} ),
        jobsrc		=> $mr->src(),
        console		=> ($me->{console_port} ? ":$me->{console_port}" : ''),
        traceinfo	=> $me->{traceinfo},
    } );

    my $ok;
    if( my $master = $me->get_config_param('master') ){
        # use specified master (for debugging)
        my($addr, $port) = split /:/, $master;
        $me->_submit_to( $addr, $port, $req );
        $me->{master} = { addr => $addr, port => $port };
        $ok = 1;
    }else{
        # pick server
        $ok = $me->_pick_master_and_send( $req, $seed );
    }

    return $ok ? $me->{id} : undef;

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

    my $listreq = AC::MrGamoo::Protocol->encode_request( {
        type		=> 'mrgamoo_status',
        msgidno		=> $^T,
        want_reply	=> 1,
    }, {});

    # get the full list of servers
    # contact each seed passed in above, until we get a reply
    for my $s ( @$seed ){
        my($addr, $port) = split /:/, $s;
        $me->{fdebug}->("attempting to fetch server list from $addr:$port");
        eval {
            alarm(1);
            my $reply = AC::MrGamoo::Protocol->send_request( inet_aton($addr), $port, $listreq, $me->{fdebug} );
            my $res   = AC::MrGamoo::Protocol->decode_reply($reply);
            alarm(0);
            my $list = $res->{status};
            @serverlist = @$list if $list && @$list;
        };
        last if @serverlist;
    }

    # sort+filter list
    @serverlist = sort { ($a->{sort_metric} <=> $b->{sort_metric}) || int(rand(3)) - 1 }

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

    }
    return ;
}

sub _submit_to {
    my $me   = shift;
    my $addr = shift;
    my $port = shift;
    my $req  = shift;

    $me->{fdebug}->("sending job to $addr:$port");
    my $reply = AC::MrGamoo::Protocol->send_request( inet_aton($addr), $port, $req, $me->{fdebug}, 120 );
    my $res   = AC::MrGamoo::Protocol->decode_reply($reply);

    return $res;
}

################################################################

sub check_code {
    my $me = shift;

lib/AC/MrGamoo/Config.pm  view on Meta::CPAN

use Socket;
use strict;

our @ISA = 'AC::ConfigFile::Simple';
our @EXPORT = qw(conf_value);


my %CONFIG = (

    include	=> \&AC::ConfigFile::Simple::include_file,
    debug	=> \&AC::ConfigFile::Simple::parse_debug,
    allow	=> \&AC::ConfigFile::Simple::parse_allow,
    port	=> \&AC::ConfigFile::Simple::parse_keyvalue,
    environment => \&AC::ConfigFile::Simple::parse_keyvalue,
    basedir	=> \&AC::ConfigFile::Simple::parse_keyvalue,
    syslog	=> \&AC::ConfigFile::Simple::parse_keyvalue,
    seedpeer    => \&AC::ConfigFile::Simple::parse_keyarray,
    scriblr	=> \&AC::ConfigFile::Simple::parse_keyvalue,
    sortprog	=> \&AC::ConfigFile::Simple::parse_keyvalue,
    gzprog	=> \&AC::ConfigFile::Simple::parse_keyvalue,
);

lib/AC/MrGamoo/D.pm  view on Meta::CPAN

    AC::MrGamoo::FileList->customize(  $p{class_filelist} );
    AC::MrGamoo::ReadInput->customize( $p{class_readinput} );
    # ...

    return bless \$class, $class;
}

sub daemon {
    my $me    = shift;
    my $cfile = shift;
    my $opt   = shift;	# foreground, debugall, persistent_id, argv

    die "no config file specified\n" unless $cfile;

    # configure

    $AC::MrGamoo::CONF = AC::MrGamoo::Config->new(
        $cfile, onreload => sub {},
       );

    initlog( 'mrgamoo', (conf_value('syslog') || 'local4'), $opt->{debugall} );
    AC::MrGamoo::Debug->init( $opt->{debugall}, $AC::MrGamoo::CONF );

    daemonize(5, 'mrgamood', $opt->{argv}) unless $opt->{foreground};
    verbose("starting.");

    $SIG{CHLD} = $SIG{PIPE} = sub {};        				# ignore
    $SIG{INT}  = $SIG{TERM} = $SIG{QUIT} = \&AC::DC::IO::request_exit;  # abort

    # initialize subsystems

    my $port = $opt->{port} || conf_value('port');

lib/AC/MrGamoo/D.pm  view on Meta::CPAN

    AC::DC::IO::TCP::Server->new( $port, 'AC::MrGamoo::Server' );

    # start "cronjobs"
    AC::DC::Sched->new(
        info	=> 'check config files',
        freq	=> 30,
        func	=> sub { $AC::MrGamoo::CONF->check() },
       );

    run_and_watch(
        ($opt->{foreground} || $opt->{debugall}),
        \&AC::DC::IO::mainloop,
       );
}


1;

lib/AC/MrGamoo/Debug.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-22 13:21 (EST)
# Function: debugging
#
# $Id: Debug.pm,v 1.1 2010/11/01 18:41:41 jaw Exp $

package AC::MrGamoo::Debug;
use AC::DC::Debug;
our @ISA = 'AC::DC::Debug';
use strict;

1;

lib/AC/MrGamoo/Job.pm  view on Meta::CPAN

    $me->{server_info}{$_->{id}} = {} for @$servers;

    $me->_preload_file_copies();
    $REGISTRY{ $me->{request}{jobid} } = $me;
    return $me;
}

sub start {
    my $me = shift;

    debug("start job");
    $me->{euconsole}->send_msg('debug', 'starting job');
    $me->_try_to_do_something();
    1;
}

################################################################

# record status rcvd from task
sub task_status {
    my $me = _find(shift, @_);
    my %p  = @_;

lib/AC/MrGamoo/Job.pm  view on Meta::CPAN

    return unless $c;

    $c->update_status( $me, $p{status_code} );

    1;
}

################################################################

sub periodic {
    # debug("periodic check");

    $_trying = 0;
    for my $job (values %REGISTRY){
        $job->_periodic();
    }
}

sub _periodic {
    my $me = shift;

lib/AC/MrGamoo/Job.pm  view on Meta::CPAN


    my $tr = keys %{ $me->{task_running} };
    my $tp = keys %{ $me->{task_pending} };
    my $xr = keys %{ $me->{xfer_running} };
    my $xp = keys %{ $me->{xfer_pending} };
    my $rr = keys %{ $me->{request_running} };
    my $rp = keys %{ $me->{request_pending} };

    my $ph = $me->{plan}{phases}[ $me->{phase_no} ] || 'none';

    debug("status: phase $ph, task $tr / $tp, xfer $xr / $xp, reqs $rr / $rp")
      if $tr || $tp || $xr || $xp || $rr || $rp;

}

################################################################

sub _start_next_phase {
    my $me = shift;

    debug("next phase");
    $me->{phase_no} ++;
    my $tp = $me->{plan}{taskplan}[ $me->{phase_no} ];

    # finished all phases ?
    unless( $tp ){
        if( $me->{_cleanedup} ){
            $me->_finished();
            return 1;
        }else{
            $me->_cleanup_files();
            return;
        }
    }

    debug("job $me->{request}{jobid} starting phase $tp->{phase}");
    $me->{euconsole}->send_msg('debug', "starting phase $tp->{phase}");

    # move tasks to pending
    for my $t (@{$tp->{task}}){
        $t->pend($me);
    }
}

sub _maybe_start_task {
    my $me = shift;
    my $task = shift;

    #debug("maybe start task");

    my $server   = $task->{server};
    my $underway = keys %{$me->{server_info}{$server}{task_running}};
    return if $underway >= $TASKSRVRMAX;		# don't overload server
    return if $task->{start_after} > $^T;		# rate limit retries

    # RSN - check that prerequisite xfers completed
    $task->start( $me );
    return 1;
}

sub _maybe_start_xfer {
    my $me = shift;
    my $copy = shift;

    #debug("maybe start xfer");

    my $server   = $copy->{server};
    my $underway = keys %{$me->{server_info}{$server}{xfer_running}};
    return if $underway >= $XFERSRVRMAX;		# don't overload server
    return if $copy->{start_after} > $^T;		# rate limit retries

    $copy->start( $me );
    return 1;
}

lib/AC/MrGamoo/Job.pm  view on Meta::CPAN

        $c->pend($me);
    }
}


################################################################

sub _try_to_do_something {
    my $me = shift;

    # debug("try something");

    return if $me->{_finished};
    return if $_trying;

    # is this phase done
    if( !(keys %{$me->{task_running}})
          && !(keys %{$me->{task_pending}})
          && !(keys %{$me->{xfer_running}})
          && !(keys %{$me->{xfer_pending}})
          && !(keys %{$me->{request_running}})

lib/AC/MrGamoo/Job/Action.pm  view on Meta::CPAN

    my $x   = shift;

    my $server = $me->{server};
    my $id = $me->{id};
    $me->{start_time} = time();

    # remove from _pending
    # add to _running
    # add to server_info._running

    debug("$x started $id on $server");
    delete $job->{"${x}_pending"}{$id};
    $job->{"${x}_running"}{$id} = $me;
    $job->{server_info}{$server}{"${x}_running"}{$id} = $me;
}

sub finished {
    my $me  = shift;
    my $job = shift;
    my $x   = shift;

lib/AC/MrGamoo/Job/Action.pm  view on Meta::CPAN


    # remove from _running
    # remove from server_info._running
    # add to server_info._finished
    # record timing info

    my $started = $me->{start_time};
    $job->{statistics}{"${x}_run_time"} += time() - $started;
    $job->{statistics}{"${x}_run"} ++;

    debug("$x finished $id on $server");
    delete $job->{"${x}_running"}{$id};
    delete $job->{server_info}{$server}{"${x}_running"}{$id};
    $me->{server_info}{$server}{"${x}_finished"}{$id} = 1;
}

sub failed {
    my $me  = shift;
    my $job = shift;
    my $x   = shift;

lib/AC/MrGamoo/Job/Done.pm  view on Meta::CPAN

package AC::MrGamoo::Job;
use File::Path 'rmtree';
use strict;

our %REGISTRY;
our $MSGID;

sub _finished {
    my $me = shift;

    debug("finished");
    $me->{statistics}{cleanup_time} = time() - $me->{statistics}{cleanup_start};

    verbose("job stats: task $me->{statistics}{task_run} $me->{statistics}{task_run_time}, " .
            "copy: $me->{statistics}{xfer_run} $me->{statistics}{xfer_run_time}, " .
            "job: $me->{statistics}{job_time}, " .
            "cleanup: $me->{statistics}{cleanup_files} $me->{statistics}{cleanup_time}");

    $me->{euconsole}->send_msg('debug', 'finished job');
    $me->{_finished} = 1;
    delete $REGISTRY{ $me->{request}{jobid} };

    # do something?
}

################################################################

sub abort {
    my $me = _find(shift, @_);
    my %p = @_;

    return unless $me;

    debug("abort job $me->{request}{jobid}");

    $me->{task_pending} = {};
    $me->{xfer_pending} = {};

    $me->_cleanup_tasks();
    $me->_cleanup_files();

    return if $me->{aborted};
    $me->{aborted}       = 1;

lib/AC/MrGamoo/Job/Done.pm  view on Meta::CPAN


################################################################

sub _cleanup_files {
    my $me = shift;

    $me->{_cleanedup} = 1;
    $me->{statistics}{cleanup_start} = time();
    $me->{statistics}{job_time} = time() - $me->{statistics}{job_start};

    $me->{euconsole}->send_msg('debug',  'cleaning up');
    $me->{euconsole}->send_msg('finish', 'finish');

    # remove tmp files
    for my $fi (@{$me->{tmp_file}}){
        # debug("deleting $fi->{filename} from $fi->{server}");
        $me->{statistics}{cleanup_files} ++;

        AC::MrGamoo::Job::Request->new( $me,
            id		=> unique(),
            server	=> $fi->{server},
            info	=> "delete $fi->{filename} from $fi->{server}",
            proto	=> {
                type		=> 'mrgamoo_filedel',
                msgidno		=> $^T,
                want_reply	=> 1,

lib/AC/MrGamoo/Job/Done.pm  view on Meta::CPAN


        if( -f $p ){
            # there should not be files here. remove.
            unlink $p;
            next;
        }

        my $mtime = (stat($p))[9];
        next unless $^T - $mtime > 24 * 3600;

        debug("removing old dir: $d");
        rmtree( $p, undef, undef );
    }
}

AC::DC::Sched->new(
    info	=> 'clean up old dirs',
    func	=> \&_cleanup_old_tmp_dirs,
    freq	=> 3600,
   );

lib/AC/MrGamoo/Job/Plan.pm  view on Meta::CPAN

    }

    # summary
    my %task;
    for my $ts (@task){
        for my $t ( @{$ts->{task}} ){
            $task{ $t->{id} } = $t;
        }
    }

    # debug("plan: " . dumper( \@task ));

    debug("infiles: " . @$files . ", precopy: " . @$plancopy . ", maps: " . @$planmap . ", reduces: $nr x $nrp");

    return bless {
        nserver		=> scalar(@$servers),
        nreduce		=> $nr,
        copying		=> $plancopy,
        phases		=> \@phase,
        taskplan	=> \@task,
        redbin		=> $redbin,
        taskidx		=> \%task,
    }, $class;

lib/AC/MrGamoo/Job/Plan.pm  view on Meta::CPAN

          map { $_->[1] }
          sort{ $a->[0] <=> $b->[0] }
          map { [(1 + $bytes{$_}) * (1 + $load->{$_}{metric}), $_] }
          grep { $load->{$_}{use} }
            (keys %$load);

        # copy the file
        my @loc = $sa;
        push @loc, $sb if $sb;
        my $newfile = "mrtmp/j_$job->{request}{jobid}/intmp_" . unique();
        debug("no active servers have file: $f->{filename}, copying to @loc => $newfile");

        my $ff = {
            filename	=> $newfile,
            location	=> \@loc,
            size	=> $f->{size},
        };
        push @{$filemap{$sa}}, $ff;
        $bytes{$sa} += $ff->{size};

        # need to copy this file from its current location to the server(s) that will run map on it

lib/AC/MrGamoo/Job/RePlan.pm  view on Meta::CPAN


package AC::MrGamoo::Job;
use strict;

sub _replan {
    my $me     = shift;
    my $server = shift;
    my $x      = shift;
    my $obj    = shift;

    debug("replan $server, $x $obj->{id}");
    if( $x eq 'xfer' ){
        $obj = $me->_replan_xfer_fails_task( $obj );
    }

    return unless $obj;

    $obj->replan($me);
}

sub _replan_server {
    my $me     = shift;
    my $server = shift;
    my $x      = shift;
    my $obj    = shift;

    debug("replan down server $server, $x $obj->{id}");
    my $cpn = $me->{phase_no};
    $cpn ++ if $x eq 'xfer';

    my @replan;
    # replan all tasks on server
    for my $pn ($cpn .. @{$me->{plan}{phases}} - 1){
        for my $t ( @{$me->{plan}{taskplan}[$pn]{task}} ){
            next unless $t->{server} eq $server;
            push @replan, $t;
        }

lib/AC/MrGamoo/Job/RePlan.pm  view on Meta::CPAN


    my $file = $obj->{dstname} || $obj->{filename};

    for my $task (keys %{$me->{plan}{taskidx}}){
        my $ti = $me->{plan}{taskidx}{$task};
        next unless $obj->{server} eq $ti->{server};

        for my $in ( @{$ti->{infile}} ){
            next unless $in eq $file;

            debug("failed xfer $obj->{id} => fails task $task on $ti->{server}");
            return $ti;
        }
    }
}

1;

lib/AC/MrGamoo/Job/Request.pm  view on Meta::CPAN

    my $me = bless { @_ }, $class;

    $job->{request_pending}{$me->{id}} = $me;
    return $me;
}

sub start {
    my $me  = shift;
    my $job = shift;

    debug("starting request $me->{info}");
    delete $job->{request_pending}{$me->{id}};

    my $x = $job->_send_request( $me->{server}, $me->{info}, $me->{proto}, $me->{request});

    unless( $x ){
        verbose("cannot start request");
        return;
    }

    $x->set_callback('on_success', \&_cb_start_req,  $me, $job, 1);

lib/AC/MrGamoo/Job/Request.pm  view on Meta::CPAN

    $x->start();
}

sub _cb_start_req {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;
    my $job = shift;
    my $ok  = shift;

    debug("request finished $me->{info}");
    delete $job->{request_running}{$me->{id}};

    $job->_try_to_do_something()
      if $ok
        && (keys %{$job->{request_pending}})
        && (keys %{$job->{request_running}} < 5);	# we go faster, if we can start a few at a time

}

1;

lib/AC/MrGamoo/Job/Task.pm  view on Meta::CPAN


    my $me = bless {
        id	=> $id,
        info	=> $info,
        server	=> $server,
        created => time(),
    };

    $job->{task_pending}{$id} = $me;

    debug("  => pending task $info->{id} => $id on $server");

    return $me;
}

sub start {
    my $me  = shift;
    my $job = shift;

    my $server = $me->{server};
    debug("starting task $job->{request}{jobid}/$me->{info}{id}/$me->{id} on $server");

    # send request to server
    my $ti = $me->{info};

    my $x = $job->_send_request( $server, "task $me->{id}", {
        type		=> 'mrgamoo_taskcreate',
        msgidno		=> $^T,
        want_reply	=> 1,
    }, {
        jobid		=> $job->{request}{jobid},

lib/AC/MrGamoo/Job/Task.pm  view on Meta::CPAN

    my $me  = shift;
    my $job = shift;
    my $phase = shift;
    my $progress = shift;

    $me->{status_time}  = $^T;
    $me->{status_phase} = $phase;
    $me->{status_amt}   = $progress;
    $me->{status_fail}  = 1 if $phase eq 'FAILED';

    debug("task is $phase $progress");

    if( $phase eq 'FINISHED' ){
        if( $me->{status_fail} ){
            $me->failed( $job, "status fail" );
        }else{
            $me->finished( $job );
        }
    }

    return 1;
}

sub failed {
    my $me   = shift;
    my $job  = shift;
    my $why  = shift;

    debug("task failed: $why $me->{status_time}");

    return if $job->something_failed();
    $me->SUPER::failed($job, 'task');
    $me->{info}->failed( $me, $job );
    if( $why eq 'timeout' ){
        $me->abort($job)
    }else{
        # $job->_try_to_do_something();
    }
}

sub finished {
    my $me   = shift;
    my $job  = shift;
    my $why  = shift;

    debug('task finish');

    $me->SUPER::finished($job, 'task');
    $me->{info}->finished( $me, $job );

    $job->_try_to_do_something();
}

sub abort {
    my $me  = shift;
    my $job = shift;

    debug("aborting task $me->{id}");

    AC::MrGamoo::Job::Request->new( $job,
        id	=> unique(),
        server	=> $me->{server},
        info	=> "abort task $me->{id}",
        proto	=> {
            type		=> 'mrgamoo_taskabort',
            msgidno		=> $^T,
            want_reply		=> 1,
        },

lib/AC/MrGamoo/Job/TaskInfo.pm  view on Meta::CPAN

    my $me   = shift;
    my $t    = shift;
    my $job  = shift;

    delete $me->{instance}{ $t->{id} };

    $me->{finished} = 1;
    my $outfiles = $me->{outfile};
    my $server   = $t->{server};

    debug("task finished $me->{id} on $server");
    # copy files
    for my $fi (@$outfiles){
        # add to file_info - file is now on one server
        debug("  outfile $fi->{filename}");
        $job->{file_info}{ $fi->{filename} } = {
            filename	=> $fi->{filename},
            location	=> [ $server ],
        };
        $job->{server_info}{$server}{has_files}{$fi->{filename}} = 1;
        # QQQ - optionally leave final files?
        push @{$job->{tmp_file}}, { filename => $fi->{filename}, server => $server };

        # add to copy_pending
        foreach my $s ( @{$fi->{dst}} ){
            next if $job->{server_info}{$s}{has_files}{$fi->{filename}};
            my $c = AC::MrGamoo::Job::XferInfo->new( $job,
                id		=> unique(),
                filename	=> $fi->{filename},
                dst		=> $s,
               );
            next unless $c;
            $c->pend($job);
            debug("    => pending copy for $s");
        }
    }
}

sub failed {
    my $me   = shift;
    my $t    = shift;
    my $job  = shift;

    delete $me->{instance}{ $t->{id} };

lib/AC/MrGamoo/Job/TaskInfo.pm  view on Meta::CPAN

        return;
    }

    if( $me->{retries} ++ > $MAXRETRY ){
        # replan tasks
        $me->replan($job);
        return;
    }

    # retry
    debug("retry task");
    $me->pend($job);
}

################################################################

sub replan {
    my $me  = shift;
    my $job = shift;

    return if $me->{replaced};

lib/AC/MrGamoo/Job/TaskInfo.pm  view on Meta::CPAN

}

sub _replan_altserver {
    my $me  = shift;
    my $job = shift;

    $me->{server} = $me->{altserver};
    delete $me->{retries};
    delete $me->{altserver};

    debug("replanning task to new server");
    $me->pend($job);
}

sub _replan_map {
    my $me  = shift;
    my $job = shift;

    # remove task
    # divy files among servers
    # create new tasks

lib/AC/MrGamoo/Job/TaskInfo.pm  view on Meta::CPAN

            id		=> $newid,
            phase	=> $me->{phase},
            infile	=> $newplan{$as},
            replaces	=> $oldid,
            outfile	=> [ map {
                (my $f = $_->{filename}) =~ s/$oldid/$newid/;
                { dst => $_->{dst}, filename => $f, }
            } @{$me->{outfile}} ],
            server	=> $as,
        );
        debug("replan map $oldid => $newid on $as");

        # keep plan up to date
        $job->{plan}{taskidx}{$newid} = $new;
        push @{$job->{plan}{taskplan}[0]{task}}, $new;

        # move to pending queue
        $new->pend($job) if $job->{phase_no} == 0;

        push @new, $new;
    }

lib/AC/MrGamoo/Job/Xfer.pm  view on Meta::CPAN


    my $me = bless {
        id	=> $id,
        info	=> $info,
        server	=> $server,
        created => $^T,
    };

    $job->{xfer_pending}{$id} = $me;

    debug("pending xfer $info->{id} => $id on $server");

    return $me;
}

sub start {
    my $me  = shift;
    my $job = shift;

    # send request to server
    my $server   = $me->{server};
    my $filename = $me->{info}{filename};
    debug("starting xfer $me->{id} on $server of $filename");

    my $x = $job->_send_request( $server, "xfer $me->{id}", {
        type		=> 'mrgamoo_filexfer',
        msgidno		=> $^T,
        want_reply	=> 1,
    }, {
        jobid		=> $job->{request}{jobid},
        copyid		=> $me->{id},
        filename	=> $filename,
        dstname		=> ($me->{info}{dstname} || $filename),

lib/AC/MrGamoo/Job/Xfer.pm  view on Meta::CPAN


    $me->failed($job, 'network');
}

# record status rcvd from file xfer
sub update_status {
    my $me   = shift;
    my $job  = shift;
    my $code = shift;

    debug("xfer is $code");

    $me->{status_code} = $code;
    $me->{status_time} = $^T;

    if( $code == 100 ){
        # nop
    }elsif( $code == 200 ){
        $me->finished( $job );
    }else{
        $me->failed( $job, "status $code" );
    }
}

sub failed {
    my $me   = shift;
    my $job  = shift;
    my $why  = shift;

    debug("xfer failed: $why");

    return if $job->something_failed();
    $me->SUPER::failed($job, 'xfer');
    $me->{info}->failed( $me, $job );
    # $job->_try_to_do_something() unless $why eq 'timeout';
}

sub finished {
    my $me   = shift;
    my $job  = shift;

    debug('xfer finish');
    my $server = $me->{server};
    my $file   = $me->{info}{dstname} || $me->{info}{filename};

    $me->SUPER::finished($job, 'xfer');
    $me->{info}->finished( $me, $job );

    # add to server_info.has_files
    # add to file_info, tmp_file

    $job->{server_info}{$server}{has_files}{$file} = 1;

lib/AC/MrGamoo/Job/XferInfo.pm  view on Meta::CPAN

        return;
    }

    if( $me->{retries} ++ > $MAXRETRY ){
        # replan tasks
        $job->_replan($server, 'xfer', $me);
        return;
    }

    # retry
    debug("retry xfer");
    $me->pend($job);
}

sub finished {
    my $me   = shift;
    my $x    = shift;
    my $job  = shift;

    delete $me->{instance}{ $x->{id} };
    $me->{finished} = 1;



( run in 1.474 second using v1.01-cache-2.11-cpan-49f99fa48dc )