AC-MrGamoo

 view release on metacpan or  search on metacpan

lib/AC/MrGamoo/AC/ReadInput.pm  view on Meta::CPAN


sub readinput {
    my $fd = shift;

    my $line = scalar <$fd>;
    return (undef, 1) unless defined $line;

    my $d;
    eval { $d = parse_dancr_log($line); };
    if( $@ ){
        problem("cannot parse data in (" . $R->config('current_file') . "). cannot process\n");
        return ;
    }

    # filter input on date range. we could just as easily filter
    # in 'map', but doing here, behind the scenes, keeps things
    # simpler for the jr. developers writing reports.
    return if $d->{tstart} <  $R->config('start');
    return if $d->{tstart} >= $R->config('end');

    return ($d, 0);

lib/AC/MrGamoo/API/HB.pm  view on Meta::CPAN

    }, {
        status_code	=> 200,
        status_message	=> 'Honky Dory',
        hostname	=> $HOSTNAME,
        subsystem	=> 'mrgamoo',
        environment	=> conf_value('environment'),
        port		=> my_port(),
        timestamp	=> time(),
        sort_metric	=> loadave(),
        server_id	=> my_server_id(),
        process_id	=> $$,
    } );

    debug("sending hb reply");
    $io->write_and_shut( $response );

}



1;

lib/AC/MrGamoo/FileList.pm  view on Meta::CPAN

the name of the file, relative to the C<basedir> in your config file.

    filename    => 'www/2010/01/17/23/5943_prod_5x2N5qyerdeddsNi'

=head2 location

an arrayref of servers where this file is located. the locations
should be the persistent-ids of the servers (see MySelf).

if the same file is replicated on multiple servers, mrgamoo will
be able to both intelligently determine which servers will process
which files, as well as recover from failures.

    location	=> [ 'mrm@athena.example.com', 'mrm@zeus.example.com' ]

=head2 size

this should be the size of the file, in bytes. mrgamoo will consider
the sizes of files in determining which servers will process which files.

    size	=> 10843

=head1 BUGS

none. you write this yourself.

=head1 SEE ALSO

    AC::MrGamoo

lib/AC/MrGamoo/Job.pm  view on Meta::CPAN


    # partially compile
    eval {
        $me->{mr} = AC::MrGamoo::Submit::Compile->new( text => $me->{request}{jobsrc} );
    };
    if(my $e = $@){
        problem("cannot compile job: $e");
        return;
    }

    # RSN - get_file_list + Plan may take too long - do in sub-process

    # get file list
    my $files = get_file_list( $cf );
    #print STDERR "files: ", dumper($files), "\n";

    for my $f (@$files){
        $me->{file_info}{ $f->{filename} } = $f;
    }

    # get server list

lib/AC/MrGamoo/Job/Plan.pm  view on Meta::CPAN

            next unless exists $load->{$loc};	# down?
            next unless $load->{$loc}{use};
            my $w = (1 + $bytes{$loc}) * (1 + $load->{$loc}{metric});
            if( !$best_loc || $w < $best_wgt ){
                $best_wgt = $w;
                $best_loc = $loc;
            }
        }

        if( $best_loc ){
            # a server has the file. process it there.
            push @{$filemap{$best_loc}}, $f;
            $bytes{$best_loc} += $f->{size};
            next;
        }

        # pick best 2 servers
        my($sa, $sb) =
          map { $_->[1] }
          sort{ $a->[0] <=> $b->[0] }
          map { [(1 + $bytes{$_}) * (1 + $load->{$_}{metric}), $_] }

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

use Sys::Syslog;
use Socket;
use JSON;
use strict;

my $STATUSTIME = 5;			# seconds
my $MAXRUN     = 3600;			# override with %attr maxrun
my $SORTPROG   = '/usr/bin/sort';	# override with %attr sortprog or config file
my $GZPROG     = '/usr/bin/gzcat';	# override with %attr gzprog or config file

# in child process
sub _start_task {
    my $me = shift;

    debug("start child task");
    $^T = time();
    _setup_stdio_etal();
    _setup_console( $me );
    _update_status( 'STARTING', 0 );

    # send STDOUT + STDERR to end-user console session

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

    my $msg  = shift;

    return unless $me->{euconsole};
    $me->{euconsole}->send_msg($type, $msg);
}

sub _update_status {
    my $phase = shift;
    my $amt   = shift;

    # send status to parent process
    debug("sending status @ $^T / $phase/$amt");
    print STATUS "$phase $amt\n";
}

sub _maybe_update_status {
    my $me = shift;

    $^T = time();

    return if $^T < ($me->{status_time} + $STATUSTIME);

lib/AC/protobuf/heartbeat.pl  view on Meta::CPAN

                    'hostname', 9, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'server_id', 10, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_INT32(), 
                    'process_id', 11, undef
                ],

            ],
            { 'create_accessors' => 1, 'follow_best_practice' => 1,  }
        );
    }

}
1;



( run in 0.307 second using v1.01-cache-2.11-cpan-8d75d55dd25 )