AC-MrGamoo

 view release on metacpan or  search on metacpan

eg/example.mrjob  view on Meta::CPAN

    tasktimeout => 120
</%config>
%################################################################
%# common block is prepended to all other blocks.
%# used to load modules
<%common>
    use AC::Misc;
    use AC::Dumper;
</%common>
%################################################################
%# init block runs once at startup.
%# the return value can be retrieved by other blocks.
%# used to calculate values or fetch things from a db.
<%init>
    $R->print("starting map/reduce job");

    return {
        mood    => 'joyous',
    };
</%init>
%################################################################
<%map>
<%attr>
%# override various parameters
    maxrun      => 300

eg/example.mrjob  view on Meta::CPAN

%################################################################
%# final block runs once with the results of the previous reduce.
%# used to generate report or insert to db
<%final>
<%attr>
%# override various parameters
    use_strict  => 0
    in_package  => My::Private::Space
</%attr>
<%init>
    # init sub-block runs at start of final block
    my $report;
</%init>
<%cleanup>
    # cleanup sub-block runs at end of final block

    # get the values from the init block
    my $iv = $R->initvalue();

    # stdout is tied to $R. so plain print also works.

eg/filelist.pm  view on Meta::CPAN


sub get_file_list {
    my $config = shift;

    # get files + metadata from yenta
    my $yenta = AC::Yenta::Direct->new( 'files', $YDBFILE );

    # the job config is asking for files that match:
    my $syst  = $config->{system};
    my $tmax  = $config->{end};		# time_t
    my $tmin  = $config->{start};	# time_t

    # the keys in  yenta are of the form: 20100126150139_[...]
    my $start = isotime($tmin);		# 1286819830       => 20101011T175710Z
    $start =~ s/^(\d+)T(\d+).*/$1$2/;	# 20101011T175710Z => 20101011175710


    my @files = grep {
        # does this file match the request?
        ($_->{subsystem}   eq $syst) &&
        ($_->{end_time}    >= $tmin) &&
        ($_->{start_time}  <= $tmax)
    } map {
        # get meta-data on this file. data is json encoded
        my $d = $yenta->get($_);
        $d = $d ? decode_json($d) : {};
        # convert space seperated locations to arrayref
        $d->{location} = [ (split /\s+/, $d->{location}) ];
        $d;
    } $yenta->getrange($start, undef);	# get all files from $start to now


    return \@files;
}


1;

eg/mrgamoo  view on Meta::CPAN

my %opt;
use Getopt::Long qw(:config no_ignore_case);
use AC::MrMagoo::Client;
use strict;

GetOptions(\%opt,
           "check",
           "verbose|v",
           "debug|d",
           "console",
           "start=s",
           "end=s",
          ) || exit;


my $mrm = AC::MrMagoo::Client->new( $ARGV[0],  \%opt );

if( $opt{check} ){
    $mrm->check_code();
    exit;
}

eg/mrgamoo.conf  view on Meta::CPAN

# example config
#
# file will be reloaded automagically if it changes. no need to hup or restart.

port            3504
syslog          local4
environment	prod

allow           10.200.2.0/23

basedir         /home/aclogs

seedpeer        10.200.2.3:3504

eg/readinput.pm  view on Meta::CPAN

    my $line = scalar <$fd>;
    # end of file?
    return (undef, 1) unless defined $line;

    my $d = json_decode($line);

    # filter input on date range. we could just as easily filter
    # in 'map', but doing it here, behind the scenes, keeps things
    # simpler for the jr. developers writing reports.

    return (undef, 0) if $d->{tstart} <  $R->config('start');
    return (undef, 0) if $d->{tstart} >= $R->config('end');

    return ($d, 0);
}

1;

lib/AC/MrGamoo.pm  view on Meta::CPAN


=item allow

specify networks allowed to connect.

    allow 127.0.0.1
    allow 192.168.10.0/24

=item seedpeer

specify initial peers to contact when starting. the author generally
specifies 2 on the east coast, and 2 on the west coast.

    seedpeer 192.168.10.11:3503
    seedpeer 192.168.10.12:3503

=item secret

specify a secret key used to encrypt data transfered between
systems in different datacenters.

lib/AC/MrGamoo/AC/FileList.pm  view on Meta::CPAN

);

sub get_file_list {
    my $config = shift;

    my $yenta = AC::Yenta::Direct->new( 'logfile', $YDBFILE );

    my $mode  = $config->{datamode};
    my $syst  = $config->{system};
    my $tmax  = $config->{end};
    my $tmin  = $config->{start};
    my $start = isotime($tmin);
    $start =~ s/^(\d+)T(\d+).*/$1$2/;	# 20091109T123456... => 20091109123456

    # NB: keys in the yenta logfile map are of the form: 20100126150139_eqaB5uSerdeddsOw

    $syst = undef if $syst eq '*';
    $mode = undef if $mode eq '*';
    $syst =~ s/[ ,]/\|/g;
    if( $syst ){
        $syst = qr/^($syst)$/;
    }

    debug("mode=$mode, syst=$syst, tmin=$tmin, tmax=$tmax, start=$start");
    my @files = grep {
        (!$mode || ($_->{environment} eq $mode)) &&
        (!$syst || ($_->{subsystem}   =~ $syst)) &&
        ($_->{end_time}    >= $tmin) &&
        ($_->{start_time}  <= $tmax)
    } map {
        #debug("file: $_");
        my $d = $yenta->get($_);
        $d = $d ? decode_json($d) : {};
        $d->{location} = [ map { $CONVERT{$_} || $_ } (split /\s+/, $d->{location}) ];
        $d;
    } $yenta->getrange($start, undef);

    debug("found " .scalar(@files)." files");
    return \@files;
}


1;

lib/AC/MrGamoo/AC/ReadInput.pm  view on Meta::CPAN

    my $d;
    eval { $d = parse_dancr_log($line); };
    if( $@ ){
        problem("cannot parse data in (" . $R->config('current_file') . "). cannot process\n");
        return ;
    }

    # filter input on date range. we could just as easily filter
    # in 'map', but doing here, behind the scenes, keeps things
    # simpler for the jr. developers writing reports.
    return if $d->{tstart} <  $R->config('start');
    return if $d->{tstart} >= $R->config('end');

    return ($d, 0);
}

1;

lib/AC/MrGamoo/API/Client.pm  view on Meta::CPAN

    debug("new client type: $req->{type} to $addr:$port");
    my $send = AC::MrGamoo::Protocol->encode_request( $req, $data );
    my $me   = $class->SUPER::new( $addr, $port,
                                 info	 => "client $req->{type} to $addr:$port; $info",
                                 request => $send,
                                );

    return $me;
}

sub start {
    my $me = shift;

    $me->set_callback('timeout',  \&_timeout);
    $me->set_callback('read',     \&_read);
    $me->set_callback('shutdown', \&_shutdown);

    $me->SUPER::start();
    $me->timeout_rel($TIMEOUT);
    $me->write( $me->{request} );

    return $me;
}

sub _timeout {
    my $me = shift;
    $me->shut();
}

lib/AC/MrGamoo/API/JobCreate.pm  view on Meta::CPAN

    my $content = shift;

    debug("new job $req->{jobid}");

    if( $req->{console} =~ /^:/ ){
        # fill in ip addr
        $req->{console} = $io->{from_ip} . $req->{console};
    }

    my $x = AC::MrGamoo::Job->new( %$req );
    my $r = $x ? $x->start() : undef;

    if( $r ){
        reply( 200, 'OK', $io, $proto, $req );
    }else{
        reply( 500, 'Error', $io, $proto, $req );
    }
}

1;

lib/AC/MrGamoo/API/TaskCreate.pm  view on Meta::CPAN


sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    debug("new task $req->{jobid}/$req->{taskid}");
    my $x = AC::MrGamoo::Task->new( %$req );
    my $r = $x ? $x->start() : undef;

    if( $r ){
        reply( 200, 'OK', $io, $proto, $req );
    }else{
        reply( 500, 'Error', $io, $proto, $req );
    }
}

1;

lib/AC/MrGamoo/API/Xfer.pm  view on Meta::CPAN

    my $req     = shift;
    my $content = shift;

    # validate filename
    if( $req->{filename} =~ m%/\.|^\.% ){
        reply( 500, 'Error', $io, $proto, $req );
        return;
    }

    # new retry
    debug("starting file xfer $req->{copyid} => $req->{filename}");

    # start working on the copy
    my $x = AC::MrGamoo::Retry->new(
        newobj	=> \&_mk_xfer,
        newargs => [ $req ],
        tryeach	=> $req->{location},
       );

    # reply now
    if( $x ){
        reply( 200, 'OK', $io, $proto, $req );
    }else{
        debug("sending error, xfer/retrier failed, $io->{info}");
        reply( 501, 'Error', $io, $proto, $req );
    }

    # send status when finished
    $x->set_callback('on_success', \&_yippee, $proto, $req);
    $x->set_callback('on_failure', \&_boohoo, $proto, $req);

    # start
    $x->start();
}

sub _mk_xfer {
    my $loc  = shift;
    my $req  = shift;

    my $x = AC::MrGamoo::Xfer->new(
        $req->{filename}, ($req->{dstname} || $req->{filename}), $loc, $req,
       );

lib/AC/MrGamoo/API/Xfer.pm  view on Meta::CPAN

        want_reply	=> 1,
    }, {
        jobid		=> $req->{jobid},
        copyid		=> $req->{copyid},
        status_code	=> $code,
        status_message	=> $msg,
    } );

    debug("cannot create client") unless $x;
    return unless $x;
    $x->start();

    # we don't need any reply or reply callbacks. just send + forget
}


################################################################

1;

lib/AC/MrGamoo/D.pm  view on Meta::CPAN

    # configure

    $AC::MrGamoo::CONF = AC::MrGamoo::Config->new(
        $cfile, onreload => sub {},
       );

    initlog( 'mrgamoo', (conf_value('syslog') || 'local4'), $opt->{debugall} );
    AC::MrGamoo::Debug->init( $opt->{debugall}, $AC::MrGamoo::CONF );

    daemonize(5, 'mrgamood', $opt->{argv}) unless $opt->{foreground};
    verbose("starting.");

    $SIG{CHLD} = $SIG{PIPE} = sub {};        				# ignore
    $SIG{INT}  = $SIG{TERM} = $SIG{QUIT} = \&AC::DC::IO::request_exit;  # abort

    # initialize subsystems

    my $port = $opt->{port} || conf_value('port');

    AC::MrGamoo::About->init( $port );
    AC::MrGamoo::MySelf->init( $port, $opt->{persistent_id} );
    AC::DC::IO::TCP::Server->new( $port, 'AC::MrGamoo::Server' );

    # start "cronjobs"
    AC::DC::Sched->new(
        info	=> 'check config files',
        freq	=> 30,
        func	=> sub { $AC::MrGamoo::CONF->check() },
       );

    run_and_watch(
        ($opt->{foreground} || $opt->{debugall}),
        \&AC::DC::IO::mainloop,
       );

lib/AC/MrGamoo/FileList.pm  view on Meta::CPAN

=head1 DESCRIPTION

MrGamoo only runs map/reduce jobs.
It is up to you to get the files on to the servers
and keep track of where they are. And to tell MrGamoo.

Some people keep the file meta-information in a sql database.
Some people keep the file meta-information in a yenta map.
Some people keep the file meta-information in the filesystem.

When a new job starts, your C<get_file_list> function will be
called with the job config, and should return an arrayref
of matching files along with meta-info.

Each element of the returned arrayref should be a hashref
containing at least the following fields:

=head2 filename

the name of the file, relative to the C<basedir> in your config file.

lib/AC/MrGamoo/Job.pm  view on Meta::CPAN

        phase_no	=> -1,
        file_info	=> {},
        tmp_file	=> [],
        server_info	=> {},
        task_running	=> {},
        task_pending	=> {},
        xfer_running	=> {},
        xfer_pending	=> {},
        request_running	=> {},
        request_pending	=> {},
        statistics	=> { job_start => time() },
    }, $class;

    if( $REGISTRY{ $me->{request}{jobid} } ){
        verbose("ignoring duplicate request job $me->{request}{jobid}");
        # will cause a 200 OK, so the requestor will not retry
        return $REGISTRY{ $me->{request}{jobid} };
    }

    verbose("new job: $me->{request}{jobid} ($me->{request}{traceinfo})");

lib/AC/MrGamoo/Job.pm  view on Meta::CPAN


    $me->{maxfail} = 5 * ( (keys %{$plan->{taskidx}}) + @{$plan->{copying}});

    $me->{server_info}{$_->{id}} = {} for @$servers;

    $me->_preload_file_copies();
    $REGISTRY{ $me->{request}{jobid} } = $me;
    return $me;
}

sub start {
    my $me = shift;

    debug("start job");
    $me->{euconsole}->send_msg('debug', 'starting job');
    $me->_try_to_do_something();
    1;
}

################################################################

# record status rcvd from task
sub task_status {
    my $me = _find(shift, @_);
    my %p  = @_;

lib/AC/MrGamoo/Job.pm  view on Meta::CPAN

    for my $job (values %REGISTRY){
        $job->_periodic();
    }
}

sub _periodic {
    my $me = shift;

    my @t = values %{$me->{task_running}};
    for my $t ( @t ){
        my $lt = $t->{status_time} || $t->{start_time};
        next if $^T - $lt < ($me->{options}{tasktimeout} || $TASKTIMEOUT);

        $t->failed( $me, 'timeout' );
    }

    my @c = values %{$me->{xfer_running}};
    for my $c ( @c ){
        my $lt = $c->{status_time} || $c->{start_time};
        next if $^T - $lt < ($me->{options}{xfertimeout} || $XFERTIMEOUT);

        $c->failed( $me, 'timeout' );
    }

    $me->_try_to_do_something();

    my $tr = keys %{ $me->{task_running} };
    my $tp = keys %{ $me->{task_pending} };
    my $xr = keys %{ $me->{xfer_running} };

lib/AC/MrGamoo/Job.pm  view on Meta::CPAN


    my $ph = $me->{plan}{phases}[ $me->{phase_no} ] || 'none';

    debug("status: phase $ph, task $tr / $tp, xfer $xr / $xp, reqs $rr / $rp")
      if $tr || $tp || $xr || $xp || $rr || $rp;

}

################################################################

sub _start_next_phase {
    my $me = shift;

    debug("next phase");
    $me->{phase_no} ++;
    my $tp = $me->{plan}{taskplan}[ $me->{phase_no} ];

    # finished all phases ?
    unless( $tp ){
        if( $me->{_cleanedup} ){
            $me->_finished();
            return 1;
        }else{
            $me->_cleanup_files();
            return;
        }
    }

    debug("job $me->{request}{jobid} starting phase $tp->{phase}");
    $me->{euconsole}->send_msg('debug', "starting phase $tp->{phase}");

    # move tasks to pending
    for my $t (@{$tp->{task}}){
        $t->pend($me);
    }
}

sub _maybe_start_task {
    my $me = shift;
    my $task = shift;

    #debug("maybe start task");

    my $server   = $task->{server};
    my $underway = keys %{$me->{server_info}{$server}{task_running}};
    return if $underway >= $TASKSRVRMAX;		# don't overload server
    return if $task->{start_after} > $^T;		# rate limit retries

    # RSN - check that prerequisite xfers completed
    $task->start( $me );
    return 1;
}

sub _maybe_start_xfer {
    my $me = shift;
    my $copy = shift;

    #debug("maybe start xfer");

    my $server   = $copy->{server};
    my $underway = keys %{$me->{server_info}{$server}{xfer_running}};
    return if $underway >= $XFERSRVRMAX;		# don't overload server
    return if $copy->{start_after} > $^T;		# rate limit retries

    $copy->start( $me );
    return 1;
}

sub _maybe_start_request {
    my $me  = shift;
    my $req = shift;

    $req->start( $me );
    return 1;
}

################################################################

sub _preload_file_copies {
    my $me = shift;

    # start copying files

    for my $c ( @{$me->{plan}{copying}} ){
        $c->pend($me);
    }
}


################################################################

sub _try_to_do_something {

lib/AC/MrGamoo/Job.pm  view on Meta::CPAN

    # is this phase done
    if( !(keys %{$me->{task_running}})
          && !(keys %{$me->{task_pending}})
          && !(keys %{$me->{xfer_running}})
          && !(keys %{$me->{xfer_pending}})
          && !(keys %{$me->{request_running}})
          && !(keys %{$me->{request_pending}})
         ){

        # this phase is finished
        return if $me->_start_next_phase();
    }

    if( $me->{aborted}
          && !(keys %{$me->{request_running}})
          && !(keys %{$me->{request_pending}})
         ){
        # this phase is finished
        return if $me->_start_next_phase();
    }

    # check load ave, etc
    return unless $me->_ok_to_do_more_p();

    $_trying ++;

    # are there requests that can start
    my @rp = values %{$me->{request_pending}};
    my $startreqs = $REQMAX - keys %{$me->{request_running}};
    for my $r (@rp){
        last if $startreqs <= 0;
        next unless $me->_maybe_start_request( $r );
        $startreqs --;
    }

    unless( $me->{aborted} ){

        # are there tasks that can start
        my $started = 0;
        my @tp = sort { $a->{created} <=> $b->{created} } values %{$me->{task_pending}};
        for my $t (@tp){
            $started += $me->_maybe_start_task( $t );
            last if $started >= $me->{plan}{nserver} / 4;	# keep them from getting in to lockstep
        }

        # are there copies that can start
        my @cp = sort { $a->{created} <=> $b->{created} } values %{$me->{xfer_pending}};
        for my $c (@cp){
            $me->_maybe_start_xfer( $c );
        }
    }

    # should we speculatively copy some files

    # should we speculatively retry a task


    $_trying --;
}

lib/AC/MrGamoo/Job.pm  view on Meta::CPAN

}

################################################################

sub report {

    my $txt;

    for my $j (values %REGISTRY){
        my $ph;
        $ph = 'start'     if $j->{phase_no} < 0;
        $ph ||= 'cleanup' if $j->{phase_no} >= @{$j->{plan}{phases}};
        $ph ||= $j->{plan}{phases}[ $j->{phase_no} ];

        my $tr = keys %{$j->{task_running}};
        my $tp = keys %{$j->{task_pending}};
        my $cr = keys %{$j->{copy_running}};
        my $cp = keys %{$j->{copy_pending}};
        my $rr = keys %{$j->{request_running}};
        my $rp = keys %{$j->{request_pending}};

lib/AC/MrGamoo/Job/Action.pm  view on Meta::CPAN


package AC::MrGamoo::Job::Action;
use AC::MrGamoo::Debug 'job_action';
use AC::MrGamoo::Job::Task;
use AC::MrGamoo::Job::Xfer;
use AC::MrGamoo::Job::Request;
use Time::HiRes 'time';
use strict;


sub started {
    my $me  = shift;
    my $job = shift;
    my $x   = shift;

    my $server = $me->{server};
    my $id = $me->{id};
    $me->{start_time} = time();

    # remove from _pending
    # add to _running
    # add to server_info._running

    debug("$x started $id on $server");
    delete $job->{"${x}_pending"}{$id};
    $job->{"${x}_running"}{$id} = $me;
    $job->{server_info}{$server}{"${x}_running"}{$id} = $me;
}

sub finished {
    my $me  = shift;
    my $job = shift;
    my $x   = shift;

    my $server = $me->{server};
    my $id = $me->{id};

    $me->{finished} = 1;

    # remove from _running
    # remove from server_info._running
    # add to server_info._finished
    # record timing info

    my $started = $me->{start_time};
    $job->{statistics}{"${x}_run_time"} += time() - $started;
    $job->{statistics}{"${x}_run"} ++;

    debug("$x finished $id on $server");
    delete $job->{"${x}_running"}{$id};
    delete $job->{server_info}{$server}{"${x}_running"}{$id};
    $me->{server_info}{$server}{"${x}_finished"}{$id} = 1;
}

sub failed {
    my $me  = shift;

lib/AC/MrGamoo/Job/Done.pm  view on Meta::CPAN

use File::Path 'rmtree';
use strict;

our %REGISTRY;
our $MSGID;

sub _finished {
    my $me = shift;

    debug("finished");
    $me->{statistics}{cleanup_time} = time() - $me->{statistics}{cleanup_start};

    verbose("job stats: task $me->{statistics}{task_run} $me->{statistics}{task_run_time}, " .
            "copy: $me->{statistics}{xfer_run} $me->{statistics}{xfer_run_time}, " .
            "job: $me->{statistics}{job_time}, " .
            "cleanup: $me->{statistics}{cleanup_files} $me->{statistics}{cleanup_time}");

    $me->{euconsole}->send_msg('debug', 'finished job');
    $me->{_finished} = 1;
    delete $REGISTRY{ $me->{request}{jobid} };

lib/AC/MrGamoo/Job/Done.pm  view on Meta::CPAN


    $me->{euconsole}->send_msg('stderr', 'aborted job' . ($p{reason} ? ": $p{reason}" : ''));
}

################################################################

sub _cleanup_files {
    my $me = shift;

    $me->{_cleanedup} = 1;
    $me->{statistics}{cleanup_start} = time();
    $me->{statistics}{job_time} = time() - $me->{statistics}{job_start};

    $me->{euconsole}->send_msg('debug',  'cleaning up');
    $me->{euconsole}->send_msg('finish', 'finish');

    # remove tmp files
    for my $fi (@{$me->{tmp_file}}){
        # debug("deleting $fi->{filename} from $fi->{server}");
        $me->{statistics}{cleanup_files} ++;

        AC::MrGamoo::Job::Request->new( $me,

lib/AC/MrGamoo/Job/Request.pm  view on Meta::CPAN

sub new {
    my $class = shift;
    my $job   = shift;

    my $me = bless { @_ }, $class;

    $job->{request_pending}{$me->{id}} = $me;
    return $me;
}

sub start {
    my $me  = shift;
    my $job = shift;

    debug("starting request $me->{info}");
    delete $job->{request_pending}{$me->{id}};

    my $x = $job->_send_request( $me->{server}, $me->{info}, $me->{proto}, $me->{request});

    unless( $x ){
        verbose("cannot start request");
        return;
    }

    $x->set_callback('on_success', \&_cb_start_req,  $me, $job, 1);
    $x->set_callback('on_failure', \&_cb_start_req,  $me, $job, 0);

    $job->{request_running}{$me->{id}} = $me;
    $x->start();
}

sub _cb_start_req {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;
    my $job = shift;
    my $ok  = shift;

    debug("request finished $me->{info}");
    delete $job->{request_running}{$me->{id}};

    $job->_try_to_do_something()
      if $ok
        && (keys %{$job->{request_pending}})
        && (keys %{$job->{request_running}} < 5);	# we go faster, if we can start a few at a time

}

1;

lib/AC/MrGamoo/Job/Task.pm  view on Meta::CPAN

        created => time(),
    };

    $job->{task_pending}{$id} = $me;

    debug("  => pending task $info->{id} => $id on $server");

    return $me;
}

sub start {
    my $me  = shift;
    my $job = shift;

    my $server = $me->{server};
    debug("starting task $job->{request}{jobid}/$me->{info}{id}/$me->{id} on $server");

    # send request to server
    my $ti = $me->{info};

    my $x = $job->_send_request( $server, "task $me->{id}", {
        type		=> 'mrgamoo_taskcreate',
        msgidno		=> $^T,
        want_reply	=> 1,
    }, {
        jobid		=> $job->{request}{jobid},

lib/AC/MrGamoo/Job/Task.pm  view on Meta::CPAN

        options		=> $job->{request}{options},
        initres		=> $job->{request}{initres},
        console		=> ($job->{request}{console} || ''),
        phase		=> $ti->{phase},
        infile		=> $ti->{infile},
        outfile		=> [ map { $_->{filename} } @{$ti->{outfile}} ],
        master		=> my_server_id(),
    } );

    unless( $x ){
        verbose("cannot start task");
        $me->failed($job);
        return;
    }

    # no success cb here. we will either timeout, or get a TaskStatus msg.
    $x->set_callback('on_failure', \&_cb_start_task_fail, $me, $job );

    $me->started($job, 'task');
    $x->start();
}

sub _cb_start_task_fail {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;
    my $job = shift;

    $me->failed($job, 'network');
}

sub update_status {
    my $me  = shift;

lib/AC/MrGamoo/Job/Xfer.pm  view on Meta::CPAN

        created => $^T,
    };

    $job->{xfer_pending}{$id} = $me;

    debug("pending xfer $info->{id} => $id on $server");

    return $me;
}

sub start {
    my $me  = shift;
    my $job = shift;

    # send request to server
    my $server   = $me->{server};
    my $filename = $me->{info}{filename};
    debug("starting xfer $me->{id} on $server of $filename");

    my $x = $job->_send_request( $server, "xfer $me->{id}", {
        type		=> 'mrgamoo_filexfer',
        msgidno		=> $^T,
        want_reply	=> 1,
    }, {
        jobid		=> $job->{request}{jobid},
        copyid		=> $me->{id},
        filename	=> $filename,
        dstname		=> ($me->{info}{dstname} || $filename),
        location	=> ($job->{file_info}{$filename}{location} || $me->{info}{location}),
        console		=> $job->{request}{console},
        master		=> my_server_id(),
    } );

    unless( $x ){
        verbose("cannot start xfer");
        $me->failed( $job );
        return;
    }

    # no success cb here. we will either timeout, or get a XferStatus msg.
    $x->set_callback('on_failure', \&_cb_start_xfer_fail, $me, $job );

    $me->started($job, 'xfer');
    $x->start();
}

sub _cb_start_xfer_fail {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;
    my $job = shift;

    $me->failed($job, 'network');
}

# record status rcvd from file xfer
sub update_status {

lib/AC/MrGamoo/Job/Xfer.pm  view on Meta::CPAN

    # add to server_info.has_files
    # add to file_info, tmp_file

    $job->{server_info}{$server}{has_files}{$file} = 1;
    push @{ $job->{file_info}{$file}{location} }, $server;
    push @{$job->{tmp_file}}, { filename => $file, server => $server };

    my $limit = $job->{plan}{nserver} * 1.5;
    $job->_try_to_do_something()
      if (keys %{$job->{xfer_pending}})
        && (keys %{$job->{xfer_running}} < $limit);	# we go faster, if we can start a few at a time

}


1;

lib/AC/MrGamoo/Kibitz/Client.pm  view on Meta::CPAN

our @ISA = 'AC::DC::IO::TCP::Client';

my $HDRSIZE = AC::MrGamoo::Protocol->header_size();
my $TIMEOUT = 3;
my $msgid   = $$;

sub new {
    my $class = shift;
    # addr, port, ...

    debug('starting kibitz status client');
    my $me = $class->SUPER::new( @_ );
    return unless $me;

    $me->set_callback('timeout',  \&timeout);
    $me->set_callback('read',     \&read);
    $me->set_callback('shutdown', \&shutdown);

    $me->start();

    # build request
    my $req = AC::MrGamoo::Protocol->encode_request( {
        type            => 'mrgamoo_status',
        content_length  => 0,
        want_reply      => 1,
        msgid           => $msgid++,
    }, {
        myself => AC::MrGamoo::Kibitz->about_myself(),
    } );

lib/AC/MrGamoo/Kibitz/Peers.pm  view on Meta::CPAN

    }

    debug("using seedpeer");
    return("seed/$ip:$port", $ip, $port);
}

sub _kibitz_with_random_peer {

    my( $id, $addr, $port ) = _random_peer();
    return unless $id;
    debug("starting status kibitz client to $id");

    my $ok = AC::MrGamoo::Kibitz::Client->new( $addr, $port,
                                            info        => "status client: $id",
                                            status_peer => $id,
                                           );
    __PACKAGE__->maybe_down($id, 'connect') unless $ok;
}

1;

lib/AC/MrGamoo/MySelf.pm  view on Meta::CPAN

        return [
            # use this IP for communication with servers this datacenter (same natdom)
            { ip => $privat_ip, natdom => my_datacenter() },
            # otherwise use this IP
            { ip => $public_ip },
        ]
    }

=head2 init

inialization function called at startup. typically used to lookup hostanmes, IP addresses,
and such and store them in variables to make the above functions faster.

    my $HOSTNAME;
    my $DOMAIN;
    sub init {
        $HOSTNAME = hostname();
        ($DOMAIN) = $HOSTNAME =~ /^[\.]+\.(.*)/;
    }

=head1 BUGS

lib/AC/MrGamoo/Retry.pm  view on Meta::CPAN

sub new {
    my $class = shift;

    my $me = bless { @_, tries => 0 }, $class;

    $me->{tryeach}  ||= [];
    $me->{maxtries} ||= @{ $me->{tryeach} };
    return $me;
}

sub start {
    my $me = shift;

    $me->_try();
}

################################################################

sub _try {
    my $me = shift;

    my $a = $me->{tryeach}[ $me->{tries} ];
    my $o = $me->{newobj}->( $a, @{$me->{newargs}} );
    $me->{tries} ++;

    debug("try $me->{tries}");
    return _on_failure(undef, undef, $me) unless $o;

    $o->set_callback( 'on_success', \&_on_success, $me );
    $o->set_callback( 'on_failure', \&_on_failure, $me );

    $o->start();
}

sub _on_success {
    my $x  = shift;
    my $e  = shift;
    my $me = shift;

    debug("all done!");
    return $me->run_callback( 'on_success' );
}

lib/AC/MrGamoo/Server.pm  view on Meta::CPAN

    my $fd    = shift;
    my $ip    = shift;

    unless( $AC::MrGamoo::CONF->check_acl( $ip ) ){
        verbose("rejecting connection from $ip");
        return;
    }

    my $me = $class->SUPER::new( info => 'tcp mrgamoo server', from_ip => $ip );

    $me->start($fd);
    $me->timeout_rel($TIMEOUT);
    $me->set_callback('read',    \&read);
    $me->set_callback('timeout', \&timeout);
}

sub timeout {
    my $me = shift;

    debug("connection timed out");
    $me->shut();

lib/AC/MrGamoo/Submit/Compile.pm  view on Meta::CPAN

}

sub _compile {
    my $me = shift;

    while(1){
        my $line = $me->_next();
        last unless defined $line;
        chomp $line;

        # white, comment, or start
        $line =~ s/^%#.*//;
        $line =~ s/#.*//;
        next if $line =~ /^\s*$/;

        my($tag) = $line =~ m|^<%(.*)>\s*$|;
        my $d    = $COMPILE{$tag};

        if( $d->{tag} eq 'block'){
            $me->_add_block($tag, $me->_compile_block($tag));
        }

lib/AC/MrGamoo/Task.pm  view on Meta::CPAN

        $me->{_inputsize} += $s
    }

    debug("input size: $me->{_inputsize}");

    # print STDERR "Task: ", dumper($me), "\n";
    $REGISTRY{$task} = $me;
    return $me;
}

sub start {
    my $me = shift;

    # if too many tasks are running, queue
    my $nrun = 0;
    for my $t (values %REGISTRY){
        $nrun ++ if $t->{io};
    }

    if( $nrun >= $MAXRUNNING ){
        $me->{_queueprio}    = $^T - $TSTART + $me->{_inputsize} / 1_000_000;
        $me->{status}{phase} = 'QUEUED';
        $me->{status}{amt}   = 0;
        debug("queue $me->{request}{phase} task $me->{request}{jobid}/$me->{request}{taskid} prio $me->{_queueprio}");
        return 1;
    }

    $me->_start();
}

sub _start {
    my $me = shift;

    debug("start $me->{request}{phase} task $me->{request}{jobid}/$me->{request}{taskid}");

    my $io = AC::DC::IO::Forked->new(
        \&AC::MrGamoo::Task::Running::_start_task, [ $me ],
        info	=> "task $me->{request}{jobid}/$me->{request}{taskid}",
       );

    $me->{io} = $io;
    $io->timeout_rel($TIMEOUT);
    $io->set_callback('timeout',  \&_timeout);
    $io->set_callback('read',     \&_read,     $me);
    $io->set_callback('shutdown', \&_shutdown, $me);

    $io->start();
}

sub abort {
    my $me = _find(shift, @_);

    return unless $me;
    debug("abort task $me->{request}{taskid}");
    $me->{io}->shut() if $me->{io};
    return 1;
}

lib/AC/MrGamoo/Task.pm  view on Meta::CPAN

    my $me  = shift;

    # send status to master
    $me->_send_status();

    my $task = $me->{request}{taskid};
    delete $REGISTRY{$task};

    delete $me->{io};

    periodic(1);	# try to start another task
}

sub _send_status_done {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;

    $me->{_status_underway} --;
}

lib/AC/MrGamoo/Task.pm  view on Meta::CPAN

        taskid		=> $me->{request}{taskid},
        phase		=> $me->{status}{phase},
        progress	=> $me->{status}{amt},
    } );

    return unless $x;

    $me->{_status_underway} ++;
    $x->set_callback('shutdown', \&_send_status_done, $me);

    $x->start();

}

sub attr {
    my $me = shift;
    my $bk = shift;
    my $p  = shift;

    return $bk->{attr}{$p} if $bk && $bk->{attr}{$p};
    return $me->{options}{$p};

lib/AC/MrGamoo/Task.pm  view on Meta::CPAN

    my $quick = shift;

    # how many tasks are running?
    my $nrun = 0;
    for my $t (values %REGISTRY){
        $nrun ++ if $t->{io};
    }

    return if $quick && $nrun >= $MAXRUNNING;

    # queued? send status, maybe start

    for my $t (sort { $a->{_queueprio} <=> $b->{_queueprio} } values %REGISTRY){
        next if $t->{io};

        $t->_send_status() unless $quick;

        if( $nrun < $MAXRUNNING ){
            $t->_start();
            $nrun ++;
        }
    }
}


1;

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

use Socket;
use JSON;
use strict;

my $STATUSTIME = 5;			# seconds
my $MAXRUN     = 3600;			# override with %attr maxrun
my $SORTPROG   = '/usr/bin/sort';	# override with %attr sortprog or config file
my $GZPROG     = '/usr/bin/gzcat';	# override with %attr gzprog or config file

# in child process
sub _start_task {
    my $me = shift;

    debug("start child task");
    $^T = time();
    _setup_stdio_etal();
    _setup_console( $me );
    _update_status( 'STARTING', 0 );

    # send STDOUT + STDERR to end-user console session
    $me->{R}{eu_print_stderr} = sub { eu_print_stderr( $me, @_ )  };
    $me->{R}{eu_print_stdout} = sub { eu_print_stdout( $me, @_ ) };
    $me->{R}->redirect_io();

lib/AC/MrGamoo/Xfer.pm  view on Meta::CPAN

                                 rbufsize => 65536,
                                );

    return unless $me;
    $REGISTRY{$req->{copyid}} = $me;
    debug("xfer requesting $loc:$srcname => $dstname, id $req->{copyid}");

    return $me;
}

sub start {
    my $me = shift;

    my $nrun = 0;
    for my $t (values %REGISTRY){
        $nrun ++ if $t->{fd};
    }

    if( $nrun >= $MAXRUNNING ){
        $me->{_queueprio}    = $^T;
        debug("queue xfer $me->{request}{copyid}");
        return 1;
    }

    $me->_start();

}

sub _start {
    my $me = shift;

    debug("start xfer $me->{request}{copyid}");

    $me->timeout_rel($TIMEOUT);
    $me->set_callback('timeout',  \&timeout);
    $me->set_callback('shutdown', \&shutdown);

    $me->SUPER::start();
}

sub _run_child {
    my $srcname = shift;	# on remote system
    my $dstname = shift;	# on local system
    my $tmpfile = shift;
    my $loc     = shift;
    my $req     = shift;


lib/AC/MrGamoo/Xfer.pm  view on Meta::CPAN


    delete $REGISTRY{$me->{request}{copyid}};

    debug("exitval = $me->{exitval}");
    if( !$me->{exitval} ){
        $me->run_callback('on_success');
    }else{
        $me->run_callback('on_failure');
    }

    periodic(1);	# try to start another xfer
}

sub _send_status_update {
    my $req = shift;

    debug('send xfer status');
    AC::MrGamoo::API::Xfer::tell_master( $req, 100, 'Working...' );
}


lib/AC/MrGamoo/Xfer.pm  view on Meta::CPAN

    my $quick = shift;

    # how many xfers are running?
    my $nrun = 0;
    for my $t (values %REGISTRY){
        $nrun ++ if $t->{fd};
    }

    return if $quick && $nrun >= $MAXRUNNING;

    # queued? send status, maybe start

    for my $t (sort { $a->{_queueprio} <=> $b->{_queueprio} } values %REGISTRY){
        next if $t->{fd};

        _send_status_update( $t->{request} ) unless $quick;

        if( $nrun < $MAXRUNNING ){
            $t->_start();
            $nrun ++;
        }
    }
}


1;



( run in 0.545 second using v1.01-cache-2.11-cpan-fd5d4e115d8 )