AC-MrGamoo

 view release on metacpan or  search on metacpan

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-25 16:32 (EST)
# Function: m/r client
#
# $Id: Client.pm,v 1.5 2011/01/18 17:19:12 jaw Exp $

package AC::MrGamoo::Client;
use AC::MrGamoo::Submit::Compile;
use AC::MrGamoo::Submit::Request;
use AC::MrGamoo::Debug;
use AC::Daemon;
use AC::Conf;
use AC::Misc;
use AC::MrGamoo::Protocol;
use JSON;
use Sys::Hostname;
use Socket;

require 'AC/protobuf/mrgamoo.pl';
require 'AC/protobuf/mrgamoo_status.pl';
require 'AC/protobuf/std_reply.pl';
use strict;


sub new {
    my $class = shift;
    my $from = shift;	# file | text
    my $src  = shift;
    my $cfg  = shift;

    my $host = hostname();
    my $user = getpwuid($<);
    my $trace = "$user/$$\@$host:" . ($from eq 'file' ? $src : 'text');

    my $me   = bless {
        traceinfo	=> $trace,
    }, $class;
    $me->{fdebug} = $cfg->{debug} ? sub{ print STDERR "@_\n" } : sub {};

    # compile job
    my $mr = AC::MrGamoo::Submit::Compile->new( $from => $src );
    $me->{program} = $mr;

    # merge job %config section with passed in config
    $mr->add_config($cfg);

    return $me;
}

sub get_config_param {
    my $me = shift;

    $me->{program}->get_config_param(@_);
}

sub set_config_param {
    my $me = shift;

    $me->{program}->set_config_param(@_);
}

sub open_console {
    my $me = shift;

    my $fd;
    socket($fd, PF_INET, SOCK_DGRAM, 0);
    bind($fd, sockaddr_in(0, INADDR_ANY));
    my $s = getsockname($fd);
    my($port, $addr) = sockaddr_in($s);

    $me->{console_fd}   = $fd;
    $me->{console_port} = $port;
}

sub run_console {
    my $me = shift;
    my $fd = $me->{console_fd};

    while(1){
        my $buf;
        recv $fd, $buf, 65535, 0;
        my $proto = AC::MrGamoo::Protocol->decode_header($buf);
        my $data  = substr($buf, AC::MrGamoo::Protocol->header_size());
        my $req   = AC::MrGamoo::Protocol->decode_request($proto, $data);
        last if $req->{type} eq 'finish';
        print STDERR "$req->{msg}"                        if $req->{type} eq 'stderr';
        print "$req->{msg}"                               if $req->{type} eq 'stdout';
        $me->{fdebug}->("$req->{server_id}\t$req->{msg}") if $req->{type} eq 'debug';
    }
}

sub submit {
    my $me   = shift;
    my $seed = shift;	# [ "ipaddr:port", ... ]

    my $mr = $me->{program};
    my $r = AC::MrGamoo::Submit::Request->new( $mr );
    $r->{eu_print_stderr} = sub { print STDERR "@_\n" };
    $r->{eu_print_stdout} = sub { print STDERR "@_\n" };

    # run init section
    my $h_init   = $mr->get_code( 'init' );
    my $initres  = ($h_init ? $h_init->{code}() : undef) || {};

    $me->{id} = unique();
    my $req = AC::MrGamoo::Protocol->encode_request( {
        type		=> 'mrgamoo_jobcreate',
        msgidno		=> $^T,
        want_reply	=> 1,
    },{
        jobid		=> $me->{id},
        options		=> to_json( $r->{config} ),
        initres		=> to_json( $initres, {allow_nonref => 1} ),
        jobsrc		=> $mr->src(),
        console		=> ($me->{console_port} ? ":$me->{console_port}" : ''),
        traceinfo	=> $me->{traceinfo},
    } );

    my $ok;
    if( my $master = $me->get_config_param('master') ){
        # use specified master (for debugging)
        my($addr, $port) = split /:/, $master;
        $me->_submit_to( $addr, $port, $req );
        $me->{master} = { addr => $addr, port => $port };
        $ok = 1;
    }else{
        # pick server
        $ok = $me->_pick_master_and_send( $req, $seed );
    }

    return $ok ? $me->{id} : undef;
}

sub abort {
    my $me = shift;

    return unless $me->{master};
    my $res = $me->_submit_to( $me->{master}{addr}, $me->{master}{port}, AC::MrGamoo::Protocol->encode_request( {
        type		=> 'mrgamoo_jobabort',
        msgidno		=> $^T,
        want_reply	=> 1,
    }, {
        jobid		=> $me->{id},
    }));

}

################################################################

sub _pick_master_and_send {
    my $me   = shift;
    my $req  = shift;
    my $seed = shift;

    my @serverlist;

    my $listreq = AC::MrGamoo::Protocol->encode_request( {
        type		=> 'mrgamoo_status',
        msgidno		=> $^T,
        want_reply	=> 1,
    }, {});



( run in 2.997 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )