AC-MrGamoo

 view release on metacpan or  search on metacpan

MANIFEST  view on Meta::CPAN

proto
MANIFEST
eg/mrgamood
eg/filelist.pm
eg/readinput.pm
eg/mrgamoo
eg/mrgamoo.conf
eg/example.mrjob
eg/myself.pm
Makefile.PL
META.yml                                 Module meta-data (added by MakeMaker)

eg/example.mrjob  view on Meta::CPAN

        mood    => 'joyous',
    };
</%init>
%################################################################
<%map>
<%attr>
%# override various parameters
    maxrun      => 300
    sortprog    => /bin/sort
</%attr>
    my $data = shift;   # one record from the input

    # return a key + a value
    return ( $data->{cmp}, 1 );
</%map>
%################################################################
<%reduce>
    my $key = shift;
    my $itr = shift;    # an iterator object

    # count
    my $n = 0;
    $itr->foreach( sub { $n ++ } );

eg/example.mrjob  view on Meta::CPAN


    # get the values from the init block
    my $iv = $R->initvalue();

    # stdout is tied to $R. so plain print also works.

    print "report for mood: $iv->{mood}\n";
    print $report;

</%cleanup>
    my($key, $data) = @_;

    $report .= "key: $key, value: $data\n";

</%final>

eg/filelist.pm  view on Meta::CPAN

# example filelist

# $Id: filelist.pm,v 1.1 2010/11/01 19:04:21 jaw Exp $

package Local::MrMagoo::FileList;
use AC::ISOTime;
use AC::Yenta::Direct;
use JSON;
use strict;

my $YDBFILE = "/data/files.ydb";

sub get_file_list {
    my $config = shift;

    # get files + metadata from yenta
    my $yenta = AC::Yenta::Direct->new( 'files', $YDBFILE );

    # the job config is asking for files that match:
    my $syst  = $config->{system};
    my $tmax  = $config->{end};		# time_t
    my $tmin  = $config->{start};	# time_t

    # the keys in  yenta are of the form: 20100126150139_[...]
    my $start = isotime($tmin);		# 1286819830       => 20101011T175710Z
    $start =~ s/^(\d+)T(\d+).*/$1$2/;	# 20101011T175710Z => 20101011175710


    my @files = grep {
        # does this file match the request?
        ($_->{subsystem}   eq $syst) &&
        ($_->{end_time}    >= $tmin) &&
        ($_->{start_time}  <= $tmax)
    } map {
        # get meta-data on this file. data is json encoded
        my $d = $yenta->get($_);
        $d = $d ? decode_json($d) : {};
        # convert space seperated locations to arrayref
        $d->{location} = [ (split /\s+/, $d->{location}) ];
        $d;
    } $yenta->getrange($start, undef);	# get all files from $start to now


    return \@files;
}

eg/readinput.pm  view on Meta::CPAN

package Local::MrMagoo::ReadInput;
use AC::MrMagoo::User;
use JSON;
use strict;

our $R;		# exported by AC::MrMagoo::User

sub readinput {
    my $fd = shift;	# file handle

    # our file is newline delimted json data

    # read next line
    my $line = scalar <$fd>;
    # end of file?
    return (undef, 1) unless defined $line;

    my $d = json_decode($line);

    # filter input on date range. we could just as easily filter
    # in 'map', but doing it here, behind the scenes, keeps things

lib/AC/MrGamoo.pm  view on Meta::CPAN

=item seedpeer

specify initial peers to contact when starting. the author generally
specifies 2 on the east coast, and 2 on the west coast.

    seedpeer 192.168.10.11:3503
    seedpeer 192.168.10.12:3503

=item secret

specify a secret key used to encrypt data transfered between
systems in different datacenters.

    secret squeamish-ossifrage

=item syslog

specify a syslog facility for log messages.

    syslog local5

=item basedir

local directory to store files

    basedir         /home/data

=item debug

enable debugging for a particular section

    debug job

=back

=head1 BUGS

lib/AC/MrGamoo/AC/FileList.pm  view on Meta::CPAN

#
# $Id: FileList.pm,v 1.3 2010/11/10 16:24:38 jaw Exp $

package AC::MrGamoo::AC::FileList;
use AC::MrGamoo::Debug 'files';
use AC::ISOTime;
use AC::Yenta::Direct;
use JSON;
use strict;

my $YDBFILE = "/home/acdata/logfile.ydb";

# return an array of:
#   {
#     filename    => www/2010/01/17/23/5943_prod_5x2N5qyerdeddsNi
#     location    => [ scrib@a2be021bd31c, scrib@a2be021ad31c ]
#     size        => 10863
#     [anything else]
#   }

# convert legacy scriblr ids

lib/AC/MrGamoo/AC/FileList.pm  view on Meta::CPAN

    'scrib@a2be021fd31c' => 'mrm@gefiltefish3-r4.ccsphl',
    'scrib@a2be0220d31c' => 'mrm@gefiltefish4-r3.ccsphl',
    'scrib@a2be0221d31c' => 'mrm@gefiltefish4-r4.ccsphl',
);

sub get_file_list {
    my $config = shift;

    my $yenta = AC::Yenta::Direct->new( 'logfile', $YDBFILE );

    my $mode  = $config->{datamode};
    my $syst  = $config->{system};
    my $tmax  = $config->{end};
    my $tmin  = $config->{start};
    my $start = isotime($tmin);
    $start =~ s/^(\d+)T(\d+).*/$1$2/;	# 20091109T123456... => 20091109123456

    # NB: keys in the yenta logfile map are of the form: 20100126150139_eqaB5uSerdeddsOw

    $syst = undef if $syst eq '*';
    $mode = undef if $mode eq '*';

lib/AC/MrGamoo/AC/MySelf.pm  view on Meta::CPAN

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-26 17:23 (EST)
# Function: 
#
# $Id: MySelf.pm,v 1.1 2010/11/01 18:41:49 jaw Exp $

package AC::MrGamoo::AC::MySelf;
use AC::MrGamoo::Config;
use AC::MrGamoo::Debug;
use AC::DataCenter;	# provides my_network_info, my_datacenter
use Sys::Hostname;
use strict;

my $SERVERID;

sub init {
    my $class = shift;
    my $port  = shift;	# not used
    my $id    = shift;

lib/AC/MrGamoo/AC/ReadInput.pm  view on Meta::CPAN


sub readinput {
    my $fd = shift;

    my $line = scalar <$fd>;
    return (undef, 1) unless defined $line;

    my $d;
    eval { $d = parse_dancr_log($line); };
    if( $@ ){
        problem("cannot parse data in (" . $R->config('current_file') . "). cannot process\n");
        return ;
    }

    # filter input on date range. we could just as easily filter
    # in 'map', but doing here, behind the scenes, keeps things
    # simpler for the jr. developers writing reports.
    return if $d->{tstart} <  $R->config('start');
    return if $d->{tstart} >= $R->config('end');

    return ($d, 0);

lib/AC/MrGamoo/API/Client.pm  view on Meta::CPAN

use strict;

my $TIMEOUT  = 15;

sub new {
    my $class = shift;
    my $addr  = shift;
    my $port  = shift;
    my $info  = shift;
    my $req   = shift;
    my $data  = shift;

    debug("new client type: $req->{type} to $addr:$port");
    my $send = AC::MrGamoo::Protocol->encode_request( $req, $data );
    my $me   = $class->SUPER::new( $addr, $port,
                                 info	 => "client $req->{type} to $addr:$port; $info",
                                 request => $send,
                                );

    return $me;
}

sub start {
    my $me = shift;

lib/AC/MrGamoo/API/Client.pm  view on Meta::CPAN

        $me->run_callback('on_failure');
    }
}

sub _read {
    my $me  = shift;
    my $evt = shift;

    debug("recvd reply to $me->{info}");

    my($proto, $data, $content) = read_protocol_no_content( $me, $evt );
    return unless $proto;

    # check response
    if( $proto->{is_error} ){
        return $me->_uhoh("rcvd error response");
    }

    $proto->{data} = AC::MrGamoo::Protocol->decode_reply($proto, $data);
    debug("recvd reply to $me->{info} - $proto->{data}{status_code} $proto->{data}{status_message}");

    if( $proto->{data}{status_code} != 200 ){
        return $me->_uh_oh("recvd error reply $proto->{data}{status_code} $proto->{data}{status_message}");
    }

    $me->{result}    = $proto;
    $me->{status_ok} = 1;
    $me->shut();
}

sub _uh_oh {
    my $me  = shift;
    my $msg = shift;

lib/AC/MrGamoo/API/Get.pm  view on Meta::CPAN

    my $sha1 = sha1_file($file);

    debug("get file '$file' size $size");

    # send header
    my $gb  = ACPScriblReply->encode( { status_code => 200, status_message => 'OK', hash_sha1 => $sha1 } );
    my $hdr = AC::MrGamoo::Protocol->encode_header(
        type		=> $proto->{type},
        msgidno		=> $proto->{msgidno},
        is_reply	=> 1,
        data_length	=> length($gb),
        content_length	=> $size,
       );
    my $buf = $hdr . $gb;

    syswrite( $fd, $buf );

    # stream
    AC::MrGamoo::Protocol->sendfile($fd, \*F, $size);
}

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

}

sub run_console {
    my $me = shift;
    my $fd = $me->{console_fd};

    while(1){
        my $buf;
        recv $fd, $buf, 65535, 0;
        my $proto = AC::MrGamoo::Protocol->decode_header($buf);
        my $data  = substr($buf, AC::MrGamoo::Protocol->header_size());
        my $req   = AC::MrGamoo::Protocol->decode_request($proto, $data);
        last if $req->{type} eq 'finish';
        print STDERR "$req->{msg}"                        if $req->{type} eq 'stderr';
        print "$req->{msg}"                               if $req->{type} eq 'stdout';
        $me->{fdebug}->("$req->{server_id}\t$req->{msg}") if $req->{type} eq 'debug';
    }
}

sub submit {
    my $me   = shift;
    my $seed = shift;	# [ "ipaddr:port", ... ]

lib/AC/MrGamoo/Default/MySelf.pm  view on Meta::CPAN

}

sub my_server_id {
    return $SERVERID;
}

sub my_network_info {
    return [ { ipa => $MYIP } ];
}

sub my_datacenter {
    return 'default';
}

1;

lib/AC/MrGamoo/FileList.pm  view on Meta::CPAN

perform some limited tests without this file.

But you must provide this file in order to actually run map/reduce jobs.

=head1 DESCRIPTION

MrGamoo only runs map/reduce jobs.
It is up to you to get the files on to the servers
and keep track of where they are. And to tell MrGamoo.

Some people keep the file meta-information in a sql database.
Some people keep the file meta-information in a yenta map.
Some people keep the file meta-information in the filesystem.

When a new job starts, your C<get_file_list> function will be
called with the job config, and should return an arrayref
of matching files along with meta-info.

Each element of the returned arrayref should be a hashref
containing at least the following fields:

lib/AC/MrGamoo/Iter.pm  view on Meta::CPAN

        $me->_putback($r);
        $me->{key} = $r->[0];
        return $r->[0];
    }

    return;	# eof
}

sub next {

    my($data, $end) = _next(@_);
    if( wantarray ){
        return ($data, $end);
    }else{
        return $data;
    }
}

sub _next {
    my $me = shift;

    my $r = $me->_nextrow();
    return (undef, 1) unless $r;	# eof

    return $r->[1] if $me->{key} eq $r->[0];

lib/AC/MrGamoo/Iter/Array.pm  view on Meta::CPAN

#
# $Id: Array.pm,v 1.1 2010/11/01 18:41:55 jaw Exp $

package AC::MrGamoo::Iter::Array;
use AC::MrGamoo::Iter;
our @ISA = 'AC::MrGamoo::Iter';
use strict;

sub new {
    my $class = shift;
    my $array = shift;  # [ [key, data], ...]

    return bless {
        src	=> $array,
    }, $class;
}

sub _nextrow {
    my $me = shift;

    if( $me->{buf} ){

lib/AC/MrGamoo/Job/Plan.pm  view on Meta::CPAN

    return \@out;
}

sub _plan_map_these_servers {
    my $job     = shift;
    my $servers = shift;

    # limit number of servers?
    my $nm = ($job->{options}{maps} + 0) || @$servers;

    my %data;
    for my $s ( sort { $a->{metric} <=> $b->{metric} } @$servers ){
        $data{ $s->{id} } = { metric => $s->{metric}, use => ($nm ? 1 : 0) };
        $nm -- if $nm;
    }

    return \%data;
}

sub _plan_divy_files {
    my $job     = shift;
    my $files   = shift;
    my $servers = shift;

    my %filemap;
    my %bytes;
    my @copies;

lib/AC/MrGamoo/Kibitz.pm  view on Meta::CPAN

        my $natinfo = my_network_info();
        for my $i ( @$natinfo ){
            push @$ipinfo, { ipv4 => inet_atoi($i->{ipa}), port => my_port(), natdom => $i->{natdom} };
        }
    }

    my $status = ($^T > $STARTTIME + $STARTDELAY) ? 200 : 102;

    return {
        hostname        => $HOSTNAME,
        datacenter      => my_datacenter(),
        subsystem       => 'mrgamoo',
        environment     => conf_value('environment'),
        via             => my_server_id(),
        server_id       => my_server_id(),
        path            => '.',
        status          => $status,
        timestamp       => $^T,
        lastup          => $^T,
        ip              => $ipinfo,
        sort_metric     => loadave() * 1000,

lib/AC/MrGamoo/Kibitz/Client.pm  view on Meta::CPAN

        AC::MrGamoo::Kibitz::Peers->maybe_down( $me->{status_peer}, 'timeout' );
    }
}

sub read {
    my $me  = shift;
    my $evt = shift;

    debug("recvd reply");

    my($proto, $data, $content) = read_protocol_no_content( $me, $evt );
    return unless $proto;

    $me->{status_ok} = 1;

    eval {
        my $resp = AC::MrGamoo::Protocol->decode_reply( $proto, $data );
        for my $update ( @{$resp->{status}} ){
            AC::MrGamoo::Kibitz::Peers->update( $update );
        }
    };
    if(my $e = $@){
        verbose("error: $e");
    }
    $me->shut();
}

lib/AC/MrGamoo/Kibitz/Peers.pm  view on Meta::CPAN

use AC::MrGamoo::MySelf;
use AC::MrGamoo::Config;
use AC::DC::Sched;
use AC::Misc;
use AC::Import;
use JSON;
use strict;

our @EXPORT = qw(pick_best_addr_for_peer peer_list_all get_peer_by_id);

my $KEEPDOWN = 300;     # keep data about down servers for how long?
my $KEEPLOST = 600;     # keep data about servers we have not heard about for how long?

my %SCEPTICAL;
my %ALLPEER;
my %MAYBEDOWN;
my $natdom;
my $natinit;

AC::DC::Sched->new(
    info    => 'kibitz status',
    freq    => (conf_value('time_status_kibitz') || 5),

lib/AC/MrGamoo/Kibitz/Peers.pm  view on Meta::CPAN

sub report {

    my $all = peer_list_all();
    my $txt;
    for my $p (@$all){
        my $lu = $^T - $p->{lastup};
        my $lh = $^T - $p->{timestamp};

        $txt .= sprintf("%-30s %s %s %s %3d %7.2f %d %d\n",
                        $p->{server_id}, $p->{subsystem}, $p->{environment},
                        $p->{datacenter}, $p->{status}, $p->{sort_metric},
                        $lu, $lh,
                        );
    }
    return $txt;
}

sub report_json {

    my $all = peer_list_all();
    my @fields = qw(hostname environment subsystem datacenter server_id status sort_metric);

    return encode_json( [ map {
        my %x;
        @x{@fields} = @{$_}{@fields};
        $x{ip} = [
            map { {
                ipv4	=> inet_itoa($_->{ipv4}),
                port	=> $_->{port},
                natdom	=> $_->{natdom},
            } } @{$_->{ip}}

lib/AC/MrGamoo/MySelf.pm  view on Meta::CPAN

# Function: stub for customization
#
# $Id: MySelf.pm,v 1.1 2010/11/01 18:41:43 jaw Exp $

package AC::MrGamoo::MySelf;
use AC::MrGamoo::Customize;
use AC::Import;
use strict;

our @ISA    = 'AC::MrGamoo::Customize';
our @EXPORT = qw(my_server_id my_network_info my_datacenter);
our @CUSTOM = (@EXPORT, 'init');


1;

=head1 NAME

AC::MrGamoo::MySelf - customize mrgamoo to your own environment

=head1 SYNOPSIS

lib/AC/MrGamoo/MySelf.pm  view on Meta::CPAN


=head2 my_server_id

return a unique identity for this mrgamoo instance. typically,
something similar to the server hostname.

    sub my_server_id {
        return 'mrm@' . hostname();
    }

=head2 my_datacenter

return the name of the local datacenter. mrgamoo will use this
to determine which systems are local (same datacenter) and
which are remote (different datacenter), and will tune various
behaviors accordingly.

    sub my_datacenter {
        my($domain) = hostname() =~ /^[\.]+\.(.*)/;
        return $domain;
    }

Note: map/reduce jobs are extremely network intensive. it is not
recommended to spread your servers out. you really want them all
plugged into one big switch. one big fast switch.

=head2 my_network_info

return information about the various networks this server has.

    sub my_network_info {
        my $public_ip = inet_ntoa(scalar gethostbyname(hostname()));
        my $privat_ip = inet_ntoa(scalar gethostbyname('internal-' . hostname()));


        return [
            # use this IP for communication with servers this datacenter (same natdom)
            { ip => $privat_ip, natdom => my_datacenter() },
            # otherwise use this IP
            { ip => $public_ip },
        ]
    }

=head2 init

inialization function called at startup. typically used to lookup hostanmes, IP addresses,
and such and store them in variables to make the above functions faster.

lib/AC/MrGamoo/Protocol.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Mar-30 13:22 (EDT)
# Function: read protocol data
#
# $Id: Protocol.pm,v 1.1 2010/11/01 18:41:43 jaw Exp $

package AC::MrGamoo::Protocol;
use AC::MrGamoo::Debug 'protocol';
use AC::DC::Protocol;
use AC::Import;
use strict;

our @ISA    = 'AC::DC::Protocol';

lib/AC/MrGamoo/Protocol.pm  view on Meta::CPAN

    my $r = $MSGTYPE{$name};
    __PACKAGE__->add_msg( $name, $r->{num}, $r->{reqc}, $r->{resc});
}



sub read_protocol {
    my $io  = shift;
    my $evt = shift;

    $io->{rbuffer} .= $evt->{data};

    return read_http($io, $evt) if $io->{rbuffer} =~ /^GET/;

    my $p = _check_protocol( $io, $evt );
    return unless $p; 	# read more

    # do we have everything?
    return unless length($io->{rbuffer}) >= ($p->{data_length} + $p->{content_length} + $HDRSIZE);

    my $data    = substr($io->{rbuffer}, $HDRSIZE, $p->{data_length});
    my $content = substr($io->{rbuffer}, $HDRSIZE + $p->{data_length}, $p->{content_length});

    # content is passed as reference
    return ($p, $data, ($content ? \$content : undef));
}

sub read_protocol_no_content {
    my $io  = shift;
    my $evt = shift;

    $io->{rbuffer} .= $evt->{data};

    return _read_http($io, $evt) if $io->{rbuffer} =~ /^GET/;

    my $p = _check_protocol( $io, $evt );
    return unless $p; 	# read more

    # do we have everything?
    return unless length($io->{rbuffer}) >= ($p->{data_length} + $HDRSIZE);

    my $data    = substr($io->{rbuffer}, $HDRSIZE, $p->{data_length});
    my $content = substr($io->{rbuffer}, $HDRSIZE + $p->{data_length}, $p->{content_length});

    return ($p, $data, $content);
}

sub _check_protocol {
    my $io  = shift;
    my $evt = shift;

    if( length($io->{rbuffer}) >= $HDRSIZE && !$io->{proto_header} ){
        # decode header
        eval {
            $io->{proto_header} = __PACKAGE__->decode_header( $io->{rbuffer} );

lib/AC/MrGamoo/ReadInput.pm  view on Meta::CPAN


The default C<readinput> returns one line at a time (just like <FILE>).

If you want different behavior, you can provide a C<ReadInput> class,
or spoecify a C<readinput> block in your map/reduce job.

Your function should return an array of 2 values

=head2 record

the record data

=head2 eof

have we reached the end-of-file


=head1 BUGS

none. you write this yourself.

lib/AC/MrGamoo/Server.pm  view on Meta::CPAN

    my $me = shift;

    debug("connection timed out");
    $me->shut();
}

sub read {
    my $me  = shift;
    my $evt = shift;

    my($proto, $data, $content) = read_protocol_no_content( $me, $evt );
    return unless $proto;

    # dispatch request
    my $h = $HANDLER{ $proto->{type} };

    unless( $h ){
        verbose("unknown message type: $proto->{type}");
        $me->shut();
        return;
    }

    eval {
        $data = AC::MrGamoo::Protocol->decode_request($proto, $data) if $data && $proto->{type} ne 'http';
    };
    if(my $e = $@ ){
        problem("cannot decode request: $e");
        $me->shut();
        return;
    }

    debug("handling request - $proto->{type}");

    if( ref $h ){
        $h->( $me, $proto, $data, $content );
    }else{
        $h->handler( $me, $proto, $data, $content );
    }
}

sub http {
    my $me    = shift;
    my $proto = shift;
    my $url   = shift;

    $url =~ s|^/||;
    $url =~ s/%(..)/chr(hex($1))/eg;

lib/AC/MrGamoo/Task.pm  view on Meta::CPAN

    my $me  = shift;

    $me->{_status_underway} --;
}

sub _read {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;

    debug("read child $me->{request}{taskid}: $evt->{data}.");
    # read status msg from child
    $io->{rbuffer} .= $evt->{data};

    my @l = split /^/m, $io->{rbuffer};
    $io->{rbuffer} = '';
    for my $l (@l){
        unless( $l =~ /\n/ ){
            $io->{rbuffer} = $l;
            last;
        }

        debug("got status $l");

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

sub _close_outfiles {
    my $me = shift;

    for my $io ( @{$me->{outfd}} ){
        $io->close();
    }
    delete $me->{outfd};
}

sub _output_partition {
    my ($me, $n, $key, $data) = @_;

    # md5 is twice as fast as sha1.
    # anything written  in perl is 10 times slower
    my $hash = unpack('N', md5( $key ));
    my $p    = $hash % $n;
    my $io   = $me->{outfd}[$p];
    $io->output( encode_json( [ $key, $data ] ), "\n" );
}


# end-user's 'print' come here
sub eu_print_stdout {
    my $me = shift;

    _send_eumsg($me, 'stdout', "@_");
}

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN


        while(1){
            _maybe_update_status( $me, 'RUNNING', $linen++ );

            # read input
            my($d, $eof) = $f_read->( $fd );
            last if $eof;
            next unless defined $d;

            # map
            my($key, $data) = $f_map->( $d );
            next unless defined $key;
            _output_partition( $me, $n, $key, $data );
        }
    }

    $h_map->{cleanup}->()    if $h_map->{cleanup};
    $h_read->{cleanup}->()   if $h_read->{cleanup};
    $h_filter->{cleanup}->() if $h_filter && $h_filter->{cleanup};
}

sub _do_reduce {
    my $me = shift;

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN


    # sort
    my @cmd = _sort_cmd( $me, $h_reduce );
    open(SORT, '-|', @cmd) || die "cannot open sort pipe: $!\n";
    _sort_underway( $me, \*SORT );
    my $iter = AC::MrGamoo::Iter::File->new( \*SORT, sub{ _maybe_update_status($me, 'RUNNING', $rown++) } );

    # reduce
    while( defined(my $k = $iter->key()) ){
        _maybe_update_status( $me, 'RUNNING', $rown++ );
        my($key, $data) = $f_reduce->( $k, $iter );
        _output_partition( $me, $n, $key, $data ) if defined $key;
    }

    $h_reduce->{cleanup}() if $h_reduce->{cleanup};
}

sub _do_final {
    my $me = shift;
    my $mr = $me->{mr};

    my $h_final  = $mr->get_code('final');

lib/AC/MrGamoo/Xfer.pm  view on Meta::CPAN

    my $p;
    eval {
        # connect
        my $s = AC::MrGamoo::Protocol->connect_to_server( inet_aton($addr), $port );
        return unless $s;

        # send req
        AC::MrGamoo::Protocol->write_request($s, $req);

        # get response
        my $buf = AC::MrGamoo::Protocol->read_data($s, AC::MrGamoo::Protocol->header_size(), 30);
        $p      = AC::MrGamoo::Protocol->decode_header($buf);
        $p->{data} = AC::MrGamoo::Protocol->read_data($s, $p->{data_length}, 1);
        $p->{data} = AC::MrGamoo::Protocol->decode_reply($p);

        debug("recvd response $p->{data}{status_code}");
        return unless $p->{data}{status_code} == 200;

        # stream file to disk
        my $size = $p->{content_length};
        debug("recving file ($size B)");

        my $fd;
        unless( open( $fd, "> $tmpfile" ) ){
            verbose("cannot open output file '$tmpfile': $!");
            return;
        }

        my $chk  = _sendfile($oreq, $fd, $s, $size);
        my $sha1 = $p->{data}{hash_sha1};
        die "SHA1 check failed\n" if $sha1 && $sha1 ne $chk;
    };
    if(my $e=$@){
        debug("error: $e");
        return;
    }

    return $p;
}

lib/AC/protobuf/mrgamoo_status.pl  view on Meta::CPAN

            'ACPMRMStatus',
            [
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'hostname', 1, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'datacenter', 2, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'subsystem', 3, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'environment', 4, undef



( run in 0.457 second using v1.01-cache-2.11-cpan-4d50c553e7e )