AC-MrGamoo

 view release on metacpan or  search on metacpan

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

        $me->{master} = { addr => $addr, port => $port };
        $ok = 1;
    }else{
        # pick server
        $ok = $me->_pick_master_and_send( $req, $seed );
    }

    return $ok ? $me->{id} : undef;
}

sub abort {
    my $me = shift;

    return unless $me->{master};
    my $res = $me->_submit_to( $me->{master}{addr}, $me->{master}{port}, AC::MrGamoo::Protocol->encode_request( {
        type		=> 'mrgamoo_jobabort',
        msgidno		=> $^T,
        want_reply	=> 1,
    }, {
        jobid		=> $me->{id},
    }));

}

################################################################

sub _pick_master_and_send {
    my $me   = shift;
    my $req  = shift;
    my $seed = shift;

    my @serverlist;

    my $listreq = AC::MrGamoo::Protocol->encode_request( {
        type		=> 'mrgamoo_status',
        msgidno		=> $^T,
        want_reply	=> 1,
    }, {});

    # get the full list of servers
    # contact each seed passed in above, until we get a reply
    for my $s ( @$seed ){
        my($addr, $port) = split /:/, $s;
        $me->{fdebug}->("attempting to fetch server list from $addr:$port");
        eval {
            alarm(1);
            my $reply = AC::MrGamoo::Protocol->send_request( inet_aton($addr), $port, $listreq, $me->{fdebug} );
            my $res   = AC::MrGamoo::Protocol->decode_reply($reply);
            alarm(0);
            my $list = $res->{status};
            @serverlist = @$list if $list && @$list;
        };
        last if @serverlist;
    }

    # sort+filter list
    @serverlist = sort { ($a->{sort_metric} <=> $b->{sort_metric}) || int(rand(3)) - 1 }
      grep { $_->{status} == 200 } @serverlist;

    # try all addresses
    # RSN - sort addresslist in a Peers::pick_best_addr_for_peer() like manner?

    my @addrlist = map { @{$_->{ip}} } @serverlist;

    for my $ip (@addrlist){
        my $addr = inet_itoa($ip->{ipv4});
        my $res;
        eval {
            alarm(30);
            $res = $me->_submit_to( $addr, $ip->{port}, $req );
            alarm(0);
        };
        next unless $res && $res->{status_code} == 200;
        $me->{master} = { addr => $addr, port => $ip->{port} };
        return 1;
    }
    return ;
}

sub _submit_to {
    my $me   = shift;
    my $addr = shift;
    my $port = shift;
    my $req  = shift;

    $me->{fdebug}->("sending job to $addr:$port");
    my $reply = AC::MrGamoo::Protocol->send_request( inet_aton($addr), $port, $req, $me->{fdebug}, 120 );
    my $res   = AC::MrGamoo::Protocol->decode_reply($reply);

    return $res;
}

################################################################

sub check_code {
    my $me = shift;

    my $mr = $me->{program};
    my $nr = @{ $mr->{content}{reduce} };

    $me->_check('map');
    $me->_check('reduce', $_) for (0 .. $nr - 1);
    $me->_check('final');

    return 1;
}

sub _check {
    my $me = shift;
    my $mr = $me->{program};

    my $prog = $mr->compile(@_);
    eval "sub $prog";
    die $@ if $@;
}


1;



( run in 1.543 second using v1.01-cache-2.11-cpan-d8267643d1d )