AC-MrGamoo

 view release on metacpan or  search on metacpan

META.yml  view on Meta::CPAN

--- #YAML:1.0
name:               AC-MrGamoo
version:            1
abstract:           Map/Reduce Framework
author:
    - AdCopy <http://www.adcopy.com>
license:            perl
distribution_type:  module
configure_requires:
    ExtUtils::MakeMaker:  0
requires:
    AC::DC:               0
    Digest::SHA1:         0
    Google::ProtocolBuffers:  0
    JSON:                 0
    POSIX:                0
    Sys::Hostname:        0
    Time::HiRes:          0

lib/AC/MrGamoo/API/Client.pm  view on Meta::CPAN

my $TIMEOUT  = 15;

sub new {
    my $class = shift;
    my $addr  = shift;
    my $port  = shift;
    my $info  = shift;
    my $req   = shift;
    my $data  = shift;

    debug("new client type: $req->{type} to $addr:$port");
    my $send = AC::MrGamoo::Protocol->encode_request( $req, $data );
    my $me   = $class->SUPER::new( $addr, $port,
                                 info	 => "client $req->{type} to $addr:$port; $info",
                                 request => $send,
                                );

    return $me;
}

sub start {
    my $me = shift;

    $me->set_callback('timeout',  \&_timeout);

lib/AC/MrGamoo/API/Get.pm  view on Meta::CPAN

    return nbfd_reply(404, "not found", $fd, $proto, $req) unless -f $file;
    open(F, $file) || return nbfd_reply(500, 'error', $fd, $proto, $req);
    my $size = (stat($file))[7];
    my $sha1 = sha1_file($file);

    debug("get file '$file' size $size");

    # send header
    my $gb  = ACPScriblReply->encode( { status_code => 200, status_message => 'OK', hash_sha1 => $sha1 } );
    my $hdr = AC::MrGamoo::Protocol->encode_header(
        type		=> $proto->{type},
        msgidno		=> $proto->{msgidno},
        is_reply	=> 1,
        data_length	=> length($gb),
        content_length	=> $size,
       );
    my $buf = $hdr . $gb;

    syswrite( $fd, $buf );

    # stream

lib/AC/MrGamoo/API/HB.pm  view on Meta::CPAN

    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

    my $response = AC::MrGamoo::Protocol->encode_reply( {
        type            => 'heartbeat_request',
        msgid           => $proto->{msgid},
        is_reply        => 1,
    }, {
        status_code	=> 200,
        status_message	=> 'Honky Dory',
        hostname	=> $HOSTNAME,
        subsystem	=> 'mrgamoo',
        environment	=> conf_value('environment'),
        port		=> my_port(),
        timestamp	=> time(),

lib/AC/MrGamoo/API/Simple.pm  view on Meta::CPAN

    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;

    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

    my $response = AC::MrGamoo::Protocol->encode_reply( {
        type            => $proto->{type},
        msgid           => $proto->{msgid},
        is_reply        => 1,
    }, {
        status_code	=> $code,
        status_message	=> $msg,
    } );

    debug("sending $code reply for $proto->{type} on $io->{info}");
    $io->write_and_shut( $response );
}

sub nbfd_reply {
    my $code    = shift;
    my $msg     = shift;
    my $fd      = shift;
    my $proto   = shift;
    my $req     = shift;

    return unless $proto->{want_reply};

    my $response = AC::MrGamoo::Protocol->encode_reply( {
        type            => $proto->{type},
        msgid           => $proto->{msgid},
        is_reply        => 1,
    }, {
        status_code	=> $code,
        status_message	=> $msg,
    } );

    debug("sending $code reply for $proto->{type} (from bkg)");
    syswrite( $fd, $response );
}

sub on_success {
    my $x  = shift;
    my $e  = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;

lib/AC/MrGamoo/API/Xfer.pm  view on Meta::CPAN

    my $req   = shift;
    my $code  = shift;
    my $msg   = shift;

    my($addr, $port) = get_peer_addr_from_id( $req->{master} );
    debug("sending xfer status update for $req->{copyid} => $code => $req->{master}");
    debug("cannot find addr") unless $addr;
    return unless $addr;

    my $x = AC::MrGamoo::API::Client->new( $addr, $port, "xfer $req->{copyid}", {
        type		=> 'mrgamoo_xferstatus',
        msgidno		=> $MSGID++,
        want_reply	=> 1,
    }, {
        jobid		=> $req->{jobid},
        copyid		=> $req->{copyid},
        status_code	=> $code,
        status_message	=> $msg,
    } );

    debug("cannot create client") unless $x;

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

sub run_console {
    my $me = shift;
    my $fd = $me->{console_fd};

    while(1){
        my $buf;
        recv $fd, $buf, 65535, 0;
        my $proto = AC::MrGamoo::Protocol->decode_header($buf);
        my $data  = substr($buf, AC::MrGamoo::Protocol->header_size());
        my $req   = AC::MrGamoo::Protocol->decode_request($proto, $data);
        last if $req->{type} eq 'finish';
        print STDERR "$req->{msg}"                        if $req->{type} eq 'stderr';
        print "$req->{msg}"                               if $req->{type} eq 'stdout';
        $me->{fdebug}->("$req->{server_id}\t$req->{msg}") if $req->{type} eq 'debug';
    }
}

sub submit {
    my $me   = shift;
    my $seed = shift;	# [ "ipaddr:port", ... ]

    my $mr = $me->{program};
    my $r = AC::MrGamoo::Submit::Request->new( $mr );
    $r->{eu_print_stderr} = sub { print STDERR "@_\n" };
    $r->{eu_print_stdout} = sub { print STDERR "@_\n" };

    # run init section
    my $h_init   = $mr->get_code( 'init' );
    my $initres  = ($h_init ? $h_init->{code}() : undef) || {};

    $me->{id} = unique();
    my $req = AC::MrGamoo::Protocol->encode_request( {
        type		=> 'mrgamoo_jobcreate',
        msgidno		=> $^T,
        want_reply	=> 1,
    },{
        jobid		=> $me->{id},
        options		=> to_json( $r->{config} ),
        initres		=> to_json( $initres, {allow_nonref => 1} ),
        jobsrc		=> $mr->src(),
        console		=> ($me->{console_port} ? ":$me->{console_port}" : ''),
        traceinfo	=> $me->{traceinfo},
    } );

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

    }

    return $ok ? $me->{id} : undef;
}

sub abort {
    my $me = shift;

    return unless $me->{master};
    my $res = $me->_submit_to( $me->{master}{addr}, $me->{master}{port}, AC::MrGamoo::Protocol->encode_request( {
        type		=> 'mrgamoo_jobabort',
        msgidno		=> $^T,
        want_reply	=> 1,
    }, {
        jobid		=> $me->{id},
    }));

}

################################################################

sub _pick_master_and_send {
    my $me   = shift;
    my $req  = shift;
    my $seed = shift;

    my @serverlist;

    my $listreq = AC::MrGamoo::Protocol->encode_request( {
        type		=> 'mrgamoo_status',
        msgidno		=> $^T,
        want_reply	=> 1,
    }, {});

    # get the full list of servers
    # contact each seed passed in above, until we get a reply
    for my $s ( @$seed ){
        my($addr, $port) = split /:/, $s;
        $me->{fdebug}->("attempting to fetch server list from $addr:$port");
        eval {

lib/AC/MrGamoo/EUConsole.pm  view on Meta::CPAN


    return bless {
        fd 	=> $fd,
        jobid	=> $jobid,
        msgid 	=> (time() & 0xFFFF),
    }, $class;
}

sub send_msg {
    my $me = shift;
    my $type = shift;
    my $msg  = shift;

    my $fd = $me->{fd};
    return unless $fd;

    my $req = AC::MrGamoo::Protocol->encode_request( {
        type		=> 'mrgamoo_diagmsg',
        want_reply	=> 0,
        msgid		=> $me->{msgid}++,
    }, {
        jobid		=> $me->{jobid},
        server_id	=> my_server_id(),
        type		=> $type,
        msg		=> $msg,
    } );

    send $fd, $req, 0;
}

1;

lib/AC/MrGamoo/Job/Done.pm  view on Meta::CPAN

    # remove tmp files
    for my $fi (@{$me->{tmp_file}}){
        # debug("deleting $fi->{filename} from $fi->{server}");
        $me->{statistics}{cleanup_files} ++;

        AC::MrGamoo::Job::Request->new( $me,
            id		=> unique(),
            server	=> $fi->{server},
            info	=> "delete $fi->{filename} from $fi->{server}",
            proto	=> {
                type		=> 'mrgamoo_filedel',
                msgidno		=> $^T,
                want_reply	=> 1,
            },
            request 	=> {
                filename	=> $fi->{filename},
            },
        );
    }

    $me->{tmp_file} = [];

lib/AC/MrGamoo/Job/Task.pm  view on Meta::CPAN

    my $me  = shift;
    my $job = shift;

    my $server = $me->{server};
    debug("starting task $job->{request}{jobid}/$me->{info}{id}/$me->{id} on $server");

    # send request to server
    my $ti = $me->{info};

    my $x = $job->_send_request( $server, "task $me->{id}", {
        type		=> 'mrgamoo_taskcreate',
        msgidno		=> $^T,
        want_reply	=> 1,
    }, {
        jobid		=> $job->{request}{jobid},
        taskid		=> $me->{id},
        jobsrc		=> $job->{request}{jobsrc},
        options		=> $job->{request}{options},
        initres		=> $job->{request}{initres},
        console		=> ($job->{request}{console} || ''),
        phase		=> $ti->{phase},

lib/AC/MrGamoo/Job/Task.pm  view on Meta::CPAN

    my $me  = shift;
    my $job = shift;

    debug("aborting task $me->{id}");

    AC::MrGamoo::Job::Request->new( $job,
        id	=> unique(),
        server	=> $me->{server},
        info	=> "abort task $me->{id}",
        proto	=> {
            type		=> 'mrgamoo_taskabort',
            msgidno		=> $^T,
            want_reply		=> 1,
        },
        request	=> {
            jobid		=> $job->{request}{jobid},
            taskid		=> $me->{id},
        },
    );

    delete $job->{"task_running"}{$me->{id}};

lib/AC/MrGamoo/Job/Xfer.pm  view on Meta::CPAN

sub start {
    my $me  = shift;
    my $job = shift;

    # send request to server
    my $server   = $me->{server};
    my $filename = $me->{info}{filename};
    debug("starting xfer $me->{id} on $server of $filename");

    my $x = $job->_send_request( $server, "xfer $me->{id}", {
        type		=> 'mrgamoo_filexfer',
        msgidno		=> $^T,
        want_reply	=> 1,
    }, {
        jobid		=> $job->{request}{jobid},
        copyid		=> $me->{id},
        filename	=> $filename,
        dstname		=> ($me->{info}{dstname} || $filename),
        location	=> ($job->{file_info}{$filename}{location} || $me->{info}{location}),
        console		=> $job->{request}{console},
        master		=> my_server_id(),

lib/AC/MrGamoo/Kibitz/Client.pm  view on Meta::CPAN

    return unless $me;

    $me->set_callback('timeout',  \&timeout);
    $me->set_callback('read',     \&read);
    $me->set_callback('shutdown', \&shutdown);

    $me->start();

    # build request
    my $req = AC::MrGamoo::Protocol->encode_request( {
        type            => 'mrgamoo_status',
        content_length  => 0,
        want_reply      => 1,
        msgid           => $msgid++,
    }, {
        myself => AC::MrGamoo::Kibitz->about_myself(),
    } );

    # write request
    $me->write( $req );
    $me->timeout_rel($TIMEOUT);

lib/AC/MrGamoo/Protocol.pm  view on Meta::CPAN


# for simple status queries, argus, debugging
# this is not an RFC compliant http server
sub _read_http {
    my $io  = shift;
    my $evt = shift;

    return unless $io->{rbuffer} =~ /\r?\n\r?\n/s;
    my($get, $url, $http) = $io->{rbuffer} =~ /^(\S+)\s+(\S+)\s+(\S+)/;

    return ( { type => 'http' }, $url );
}


1;

lib/AC/MrGamoo/Server.pm  view on Meta::CPAN

}

sub read {
    my $me  = shift;
    my $evt = shift;

    my($proto, $data, $content) = read_protocol_no_content( $me, $evt );
    return unless $proto;

    # dispatch request
    my $h = $HANDLER{ $proto->{type} };

    unless( $h ){
        verbose("unknown message type: $proto->{type}");
        $me->shut();
        return;
    }

    eval {
        $data = AC::MrGamoo::Protocol->decode_request($proto, $data) if $data && $proto->{type} ne 'http';
    };
    if(my $e = $@ ){
        problem("cannot decode request: $e");
        $me->shut();
        return;
    }

    debug("handling request - $proto->{type}");

    if( ref $h ){
        $h->( $me, $proto, $data, $content );
    }else{
        $h->handler( $me, $proto, $data, $content );
    }
}

sub http {
    my $me    = shift;

lib/AC/MrGamoo/Task.pm  view on Meta::CPAN

    my $me = shift;

    # don't kill the master with too many requests
    # return if $me->{_status_underway} >= $MAXREQ;

    my($addr, $port) = get_peer_addr_from_id( $me->{request}{master} );
    return unless $addr;

    debug("sending task status update $me->{request}{taskid} => $me->{status}{phase}");
    my $x = AC::MrGamoo::API::Client->new( $addr, $port, "task $me->{request}{taskid}", {
        type		=> 'mrgamoo_taskstatus',
        msgidno		=> $msgid++,
        want_reply	=> 0,
    }, {
        jobid		=> $me->{request}{jobid},
        taskid		=> $me->{request}{taskid},
        phase		=> $me->{status}{phase},
        progress	=> $me->{status}{amt},
    } );

    return unless $x;

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN


sub _setup_console {
    my $me = shift;

    debug("setup console: $me->{request}{jobid}, $me->{request}{console}");
    $me->{euconsole} = AC::MrGamoo::EUConsole->new( $me->{request}{jobid}, $me->{request}{console} );
}

sub _send_eumsg {
    my $me   = shift;
    my $type = shift;
    my $msg  = shift;

    return unless $me->{euconsole};
    $me->{euconsole}->send_msg($type, $msg);
}

sub _update_status {
    my $phase = shift;
    my $amt   = shift;

    # send status to parent process
    debug("sending status @ $^T / $phase/$amt");
    print STATUS "$phase $amt\n";
}

lib/AC/MrGamoo/Xfer.pm  view on Meta::CPAN


    my($addr, $port) = get_peer_addr_from_id( $loc );
    unless( $addr ){
        debug("cannot find addr for $loc");
        return;
    }

    debug("connecting to $addr:$port");

    my $req = AC::MrGamoo::Protocol->encode_request( {
        type		=> 'scribl_get',
        msgidno		=> $$,
        want_reply	=> 1,
    }, { filename => $srcname } );

    my $p;
    eval {
        # connect
        my $s = AC::MrGamoo::Protocol->connect_to_server( inet_aton($addr), $port );
        return unless $s;

lib/AC/protobuf/mrgamoo.pl  view on Meta::CPAN

            'ACPMRMDiagMsg',
            [
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'jobid', 1, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'type', 2, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'msg', 3, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'server_id', 4, undef



( run in 0.702 second using v1.01-cache-2.11-cpan-39bf76dae61 )