view release on metacpan or search on metacpan
--- #YAML:1.0
name: AC-MrGamoo
version: 1
abstract: Map/Reduce Framework
author:
- AdCopy <http://www.adcopy.com>
license: perl
distribution_type: module
configure_requires:
ExtUtils::MakeMaker: 0
requires:
AC::DC: 0
Digest::SHA1: 0
Google::ProtocolBuffers: 0
JSON: 0
POSIX: 0
Sys::Hostname: 0
Time::HiRes: 0
lib/AC/MrGamoo/API/Client.pm view on Meta::CPAN
my $TIMEOUT = 15;
sub new {
my $class = shift;
my $addr = shift;
my $port = shift;
my $info = shift;
my $req = shift;
my $data = shift;
debug("new client type: $req->{type} to $addr:$port");
my $send = AC::MrGamoo::Protocol->encode_request( $req, $data );
my $me = $class->SUPER::new( $addr, $port,
info => "client $req->{type} to $addr:$port; $info",
request => $send,
);
return $me;
}
sub start {
my $me = shift;
$me->set_callback('timeout', \&_timeout);
lib/AC/MrGamoo/API/Get.pm view on Meta::CPAN
return nbfd_reply(404, "not found", $fd, $proto, $req) unless -f $file;
open(F, $file) || return nbfd_reply(500, 'error', $fd, $proto, $req);
my $size = (stat($file))[7];
my $sha1 = sha1_file($file);
debug("get file '$file' size $size");
# send header
my $gb = ACPScriblReply->encode( { status_code => 200, status_message => 'OK', hash_sha1 => $sha1 } );
my $hdr = AC::MrGamoo::Protocol->encode_header(
type => $proto->{type},
msgidno => $proto->{msgidno},
is_reply => 1,
data_length => length($gb),
content_length => $size,
);
my $buf = $hdr . $gb;
syswrite( $fd, $buf );
# stream
lib/AC/MrGamoo/API/HB.pm view on Meta::CPAN
my $proto = shift;
my $req = shift;
my $content = shift;
unless( $proto->{want_reply} ){
$io->shut();
return;
}
my $response = AC::MrGamoo::Protocol->encode_reply( {
type => 'heartbeat_request',
msgid => $proto->{msgid},
is_reply => 1,
}, {
status_code => 200,
status_message => 'Honky Dory',
hostname => $HOSTNAME,
subsystem => 'mrgamoo',
environment => conf_value('environment'),
port => my_port(),
timestamp => time(),
lib/AC/MrGamoo/API/Simple.pm view on Meta::CPAN
my $io = shift;
my $proto = shift;
my $req = shift;
unless( $proto->{want_reply} ){
$io->shut();
return;
}
my $response = AC::MrGamoo::Protocol->encode_reply( {
type => $proto->{type},
msgid => $proto->{msgid},
is_reply => 1,
}, {
status_code => $code,
status_message => $msg,
} );
debug("sending $code reply for $proto->{type} on $io->{info}");
$io->write_and_shut( $response );
}
sub nbfd_reply {
my $code = shift;
my $msg = shift;
my $fd = shift;
my $proto = shift;
my $req = shift;
return unless $proto->{want_reply};
my $response = AC::MrGamoo::Protocol->encode_reply( {
type => $proto->{type},
msgid => $proto->{msgid},
is_reply => 1,
}, {
status_code => $code,
status_message => $msg,
} );
debug("sending $code reply for $proto->{type} (from bkg)");
syswrite( $fd, $response );
}
sub on_success {
my $x = shift;
my $e = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
lib/AC/MrGamoo/API/Xfer.pm view on Meta::CPAN
my $req = shift;
my $code = shift;
my $msg = shift;
my($addr, $port) = get_peer_addr_from_id( $req->{master} );
debug("sending xfer status update for $req->{copyid} => $code => $req->{master}");
debug("cannot find addr") unless $addr;
return unless $addr;
my $x = AC::MrGamoo::API::Client->new( $addr, $port, "xfer $req->{copyid}", {
type => 'mrgamoo_xferstatus',
msgidno => $MSGID++,
want_reply => 1,
}, {
jobid => $req->{jobid},
copyid => $req->{copyid},
status_code => $code,
status_message => $msg,
} );
debug("cannot create client") unless $x;
lib/AC/MrGamoo/Client.pm view on Meta::CPAN
sub run_console {
my $me = shift;
my $fd = $me->{console_fd};
while(1){
my $buf;
recv $fd, $buf, 65535, 0;
my $proto = AC::MrGamoo::Protocol->decode_header($buf);
my $data = substr($buf, AC::MrGamoo::Protocol->header_size());
my $req = AC::MrGamoo::Protocol->decode_request($proto, $data);
last if $req->{type} eq 'finish';
print STDERR "$req->{msg}" if $req->{type} eq 'stderr';
print "$req->{msg}" if $req->{type} eq 'stdout';
$me->{fdebug}->("$req->{server_id}\t$req->{msg}") if $req->{type} eq 'debug';
}
}
sub submit {
my $me = shift;
my $seed = shift; # [ "ipaddr:port", ... ]
my $mr = $me->{program};
my $r = AC::MrGamoo::Submit::Request->new( $mr );
$r->{eu_print_stderr} = sub { print STDERR "@_\n" };
$r->{eu_print_stdout} = sub { print STDERR "@_\n" };
# run init section
my $h_init = $mr->get_code( 'init' );
my $initres = ($h_init ? $h_init->{code}() : undef) || {};
$me->{id} = unique();
my $req = AC::MrGamoo::Protocol->encode_request( {
type => 'mrgamoo_jobcreate',
msgidno => $^T,
want_reply => 1,
},{
jobid => $me->{id},
options => to_json( $r->{config} ),
initres => to_json( $initres, {allow_nonref => 1} ),
jobsrc => $mr->src(),
console => ($me->{console_port} ? ":$me->{console_port}" : ''),
traceinfo => $me->{traceinfo},
} );
lib/AC/MrGamoo/Client.pm view on Meta::CPAN
}
return $ok ? $me->{id} : undef;
}
sub abort {
my $me = shift;
return unless $me->{master};
my $res = $me->_submit_to( $me->{master}{addr}, $me->{master}{port}, AC::MrGamoo::Protocol->encode_request( {
type => 'mrgamoo_jobabort',
msgidno => $^T,
want_reply => 1,
}, {
jobid => $me->{id},
}));
}
################################################################
sub _pick_master_and_send {
my $me = shift;
my $req = shift;
my $seed = shift;
my @serverlist;
my $listreq = AC::MrGamoo::Protocol->encode_request( {
type => 'mrgamoo_status',
msgidno => $^T,
want_reply => 1,
}, {});
# get the full list of servers
# contact each seed passed in above, until we get a reply
for my $s ( @$seed ){
my($addr, $port) = split /:/, $s;
$me->{fdebug}->("attempting to fetch server list from $addr:$port");
eval {
lib/AC/MrGamoo/EUConsole.pm view on Meta::CPAN
return bless {
fd => $fd,
jobid => $jobid,
msgid => (time() & 0xFFFF),
}, $class;
}
sub send_msg {
my $me = shift;
my $type = shift;
my $msg = shift;
my $fd = $me->{fd};
return unless $fd;
my $req = AC::MrGamoo::Protocol->encode_request( {
type => 'mrgamoo_diagmsg',
want_reply => 0,
msgid => $me->{msgid}++,
}, {
jobid => $me->{jobid},
server_id => my_server_id(),
type => $type,
msg => $msg,
} );
send $fd, $req, 0;
}
1;
lib/AC/MrGamoo/Job/Done.pm view on Meta::CPAN
# remove tmp files
for my $fi (@{$me->{tmp_file}}){
# debug("deleting $fi->{filename} from $fi->{server}");
$me->{statistics}{cleanup_files} ++;
AC::MrGamoo::Job::Request->new( $me,
id => unique(),
server => $fi->{server},
info => "delete $fi->{filename} from $fi->{server}",
proto => {
type => 'mrgamoo_filedel',
msgidno => $^T,
want_reply => 1,
},
request => {
filename => $fi->{filename},
},
);
}
$me->{tmp_file} = [];
lib/AC/MrGamoo/Job/Task.pm view on Meta::CPAN
my $me = shift;
my $job = shift;
my $server = $me->{server};
debug("starting task $job->{request}{jobid}/$me->{info}{id}/$me->{id} on $server");
# send request to server
my $ti = $me->{info};
my $x = $job->_send_request( $server, "task $me->{id}", {
type => 'mrgamoo_taskcreate',
msgidno => $^T,
want_reply => 1,
}, {
jobid => $job->{request}{jobid},
taskid => $me->{id},
jobsrc => $job->{request}{jobsrc},
options => $job->{request}{options},
initres => $job->{request}{initres},
console => ($job->{request}{console} || ''),
phase => $ti->{phase},
lib/AC/MrGamoo/Job/Task.pm view on Meta::CPAN
my $me = shift;
my $job = shift;
debug("aborting task $me->{id}");
AC::MrGamoo::Job::Request->new( $job,
id => unique(),
server => $me->{server},
info => "abort task $me->{id}",
proto => {
type => 'mrgamoo_taskabort',
msgidno => $^T,
want_reply => 1,
},
request => {
jobid => $job->{request}{jobid},
taskid => $me->{id},
},
);
delete $job->{"task_running"}{$me->{id}};
lib/AC/MrGamoo/Job/Xfer.pm view on Meta::CPAN
sub start {
my $me = shift;
my $job = shift;
# send request to server
my $server = $me->{server};
my $filename = $me->{info}{filename};
debug("starting xfer $me->{id} on $server of $filename");
my $x = $job->_send_request( $server, "xfer $me->{id}", {
type => 'mrgamoo_filexfer',
msgidno => $^T,
want_reply => 1,
}, {
jobid => $job->{request}{jobid},
copyid => $me->{id},
filename => $filename,
dstname => ($me->{info}{dstname} || $filename),
location => ($job->{file_info}{$filename}{location} || $me->{info}{location}),
console => $job->{request}{console},
master => my_server_id(),
lib/AC/MrGamoo/Kibitz/Client.pm view on Meta::CPAN
return unless $me;
$me->set_callback('timeout', \&timeout);
$me->set_callback('read', \&read);
$me->set_callback('shutdown', \&shutdown);
$me->start();
# build request
my $req = AC::MrGamoo::Protocol->encode_request( {
type => 'mrgamoo_status',
content_length => 0,
want_reply => 1,
msgid => $msgid++,
}, {
myself => AC::MrGamoo::Kibitz->about_myself(),
} );
# write request
$me->write( $req );
$me->timeout_rel($TIMEOUT);
lib/AC/MrGamoo/Protocol.pm view on Meta::CPAN
# for simple status queries, argus, debugging
# this is not an RFC compliant http server
sub _read_http {
my $io = shift;
my $evt = shift;
return unless $io->{rbuffer} =~ /\r?\n\r?\n/s;
my($get, $url, $http) = $io->{rbuffer} =~ /^(\S+)\s+(\S+)\s+(\S+)/;
return ( { type => 'http' }, $url );
}
1;
lib/AC/MrGamoo/Server.pm view on Meta::CPAN
}
sub read {
my $me = shift;
my $evt = shift;
my($proto, $data, $content) = read_protocol_no_content( $me, $evt );
return unless $proto;
# dispatch request
my $h = $HANDLER{ $proto->{type} };
unless( $h ){
verbose("unknown message type: $proto->{type}");
$me->shut();
return;
}
eval {
$data = AC::MrGamoo::Protocol->decode_request($proto, $data) if $data && $proto->{type} ne 'http';
};
if(my $e = $@ ){
problem("cannot decode request: $e");
$me->shut();
return;
}
debug("handling request - $proto->{type}");
if( ref $h ){
$h->( $me, $proto, $data, $content );
}else{
$h->handler( $me, $proto, $data, $content );
}
}
sub http {
my $me = shift;
lib/AC/MrGamoo/Task.pm view on Meta::CPAN
my $me = shift;
# don't kill the master with too many requests
# return if $me->{_status_underway} >= $MAXREQ;
my($addr, $port) = get_peer_addr_from_id( $me->{request}{master} );
return unless $addr;
debug("sending task status update $me->{request}{taskid} => $me->{status}{phase}");
my $x = AC::MrGamoo::API::Client->new( $addr, $port, "task $me->{request}{taskid}", {
type => 'mrgamoo_taskstatus',
msgidno => $msgid++,
want_reply => 0,
}, {
jobid => $me->{request}{jobid},
taskid => $me->{request}{taskid},
phase => $me->{status}{phase},
progress => $me->{status}{amt},
} );
return unless $x;
lib/AC/MrGamoo/Task/Running.pm view on Meta::CPAN
sub _setup_console {
my $me = shift;
debug("setup console: $me->{request}{jobid}, $me->{request}{console}");
$me->{euconsole} = AC::MrGamoo::EUConsole->new( $me->{request}{jobid}, $me->{request}{console} );
}
sub _send_eumsg {
my $me = shift;
my $type = shift;
my $msg = shift;
return unless $me->{euconsole};
$me->{euconsole}->send_msg($type, $msg);
}
sub _update_status {
my $phase = shift;
my $amt = shift;
# send status to parent process
debug("sending status @ $^T / $phase/$amt");
print STATUS "$phase $amt\n";
}
lib/AC/MrGamoo/Xfer.pm view on Meta::CPAN
my($addr, $port) = get_peer_addr_from_id( $loc );
unless( $addr ){
debug("cannot find addr for $loc");
return;
}
debug("connecting to $addr:$port");
my $req = AC::MrGamoo::Protocol->encode_request( {
type => 'scribl_get',
msgidno => $$,
want_reply => 1,
}, { filename => $srcname } );
my $p;
eval {
# connect
my $s = AC::MrGamoo::Protocol->connect_to_server( inet_aton($addr), $port );
return unless $s;
lib/AC/protobuf/mrgamoo.pl view on Meta::CPAN
'ACPMRMDiagMsg',
[
[
Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
Google::ProtocolBuffers::Constants::TYPE_STRING(),
'jobid', 1, undef
],
[
Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
Google::ProtocolBuffers::Constants::TYPE_STRING(),
'type', 2, undef
],
[
Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
Google::ProtocolBuffers::Constants::TYPE_STRING(),
'msg', 3, undef
],
[
Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
Google::ProtocolBuffers::Constants::TYPE_STRING(),
'server_id', 4, undef