view release on metacpan or search on metacpan
This software may be copied and distributed under the terms
found in the Perl "Artistic License".
A copy of the "Artistic License" may be found in the standard
Perl distribution.
lib/AC/MrGamoo.pm
lib/AC/MrGamoo/D.pm
lib/AC/MrGamoo/Job.pm
lib/AC/MrGamoo/Protocol.pm
lib/AC/MrGamoo/Submit/Compile/Block.pm
lib/AC/MrGamoo/Submit/Compile.pm
lib/AC/MrGamoo/Submit/TieIO.pm
lib/AC/MrGamoo/Submit/Request.pm
lib/AC/MrGamoo/Iter/File.pm
lib/AC/MrGamoo/Iter/Array.pm
lib/AC/MrGamoo/OutFile.pm
lib/AC/MrGamoo/Client.pm
lib/AC/MrGamoo/PeerList.pm
lib/AC/MrGamoo/API/Xfer.pm
lib/AC/MrGamoo/API/Get.pm
lib/AC/MrGamoo/API/Del.pm
lib/AC/MrGamoo/API/Chk.pm
lib/AC/MrGamoo/API/Put.pm
lib/AC/MrGamoo/API/TaskCreate.pm
lib/AC/MrGamoo/API/Client.pm
lib/AC/MrGamoo/API/XferStatus.pm
lib/AC/MrGamoo/API/Simple.pm
lib/AC/MrGamoo/API/TaskAbort.pm
lib/AC/MrGamoo/API/TaskStatus.pm
lib/AC/MrGamoo/API/HB.pm
lib/AC/MrGamoo/API/JobCreate.pm
lib/AC/MrGamoo/API/JobAbort.pm
lib/AC/MrGamoo/Default/MySelf.pm
lib/AC/MrGamoo/Default/FileList.pm
lib/AC/MrGamoo/Default/ReadInput.pm
lib/AC/MrGamoo/Task.pm
lib/AC/MrGamoo/Kibitz/Server.pm
lib/AC/MrGamoo/Kibitz/Peers.pm
lib/AC/MrGamoo/Kibitz/Client.pm
lib/AC/MrGamoo/Scriblr.pm
lib/AC/MrGamoo/ReadInput.pm
lib/AC/MrGamoo/Retry.pm
lib/AC/MrGamoo/FileList.pm
lib/AC/MrGamoo/Job/TaskInfo.pm
lib/AC/MrGamoo/Job/Util.pm
lib/AC/MrGamoo/Job/Request.pm
lib/AC/MrGamoo/Job/Task.pm
lib/AC/MrGamoo/Job/XferInfo.pm
lib/AC/MrGamoo/Job/RePlan.pm
lib/AC/MrGamoo/Job/Info.pm
lib/AC/MrGamoo/Job/Plan.pm
lib/AC/MrGamoo/Job/Done.pm
lib/AC/MrGamoo/Job/Xfer.pm
lib/AC/MrGamoo/Job/Action.pm
lib/AC/MrGamoo/Server.pm
lib/AC/MrGamoo/Iter.pm
lib/AC/MrGamoo/MySelf.pm
lib/AC/MrGamoo/About.pm
lib/AC/MrGamoo/Task/Running.pm
lib/AC/MrGamoo/Config.pm
lib/AC/MrGamoo/User.pm
lib/AC/MrGamoo/Xfer.pm
lib/AC/MrGamoo/Stats.pm
lib/AC/MrGamoo/EUConsole.pm
lib/AC/MrGamoo/Customize.pm
lib/AC/MrGamoo/Debug.pm
lib/AC/MrGamoo/Kibitz.pm
lib/AC/MrGamoo/AC/FileList.pm
lib/AC/MrGamoo/AC/ReadInput.pm
lib/AC/MrGamoo/AC/MySelf.pm
lib/AC/protobuf/std_ipport.pl
lib/AC/protobuf/mrgamoo_status.pl
lib/AC/protobuf/std_reply.pl
lib/AC/protobuf/mrgamoo.pl
lib/AC/protobuf/heartbeat.pl
lib/AC/protobuf/scrible.pl
LICENSE
proto
MANIFEST
eg/mrgamood
eg/filelist.pm
eg/readinput.pm
eg/mrgamoo
eg/mrgamoo.conf
eg/example.mrjob
eg/myself.pm
Makefile.PL
META.yml Module meta-data (added by MakeMaker)
--- #YAML:1.0
name: AC-MrGamoo
version: 1
abstract: Map/Reduce Framework
author:
- AdCopy <http://www.adcopy.com>
license: perl
distribution_type: module
configure_requires:
ExtUtils::MakeMaker: 0
requires:
AC::DC: 0
Digest::SHA1: 0
Google::ProtocolBuffers: 0
JSON: 0
POSIX: 0
Sys::Hostname: 0
Time::HiRes: 0
no_index:
directory:
- t
- inc
generated_by: ExtUtils::MakeMaker version 6.48
meta-spec:
url: http://module-build.sourceforge.net/META-spec-v1.4.html
version: 1.4
Makefile.PL view on Meta::CPAN
use ExtUtils::MakeMaker;
WriteMakefile(
NAME => 'AC::MrGamoo',
VERSION_FROM => 'lib/AC/MrGamoo.pm',
ABSTRACT_FROM => 'lib/AC/MrGamoo.pm',
AUTHOR => 'AdCopy <http://www.adcopy.com>',
LICENSE => 'perl',
PREREQ_PM => {
'POSIX' => 0,
'Sys::Hostname' => 0,
'JSON' => 0,
'Digest::SHA1' => 0,
'Time::HiRes' => 0,
'Google::ProtocolBuffers' => 0,
'AC::DC' => 0,
}
);
eg/example.mrjob view on Meta::CPAN
%# -*- mason -*-
# Copyright (c) 2009 by AdCopy
# Author: Jeff Weisberg
# Created: 2009-Oct-28 11:19 (EDT)
# Function: test
#
# $Id: example.mrjob,v 1.1 2010/11/01 19:04:21 jaw Exp $
<%doc>
map reduce example
</%doc>
%################################################################
%# provide values for configurable parameters
%# these override the defaults
%# and params specified on the command line, override these
<%config>
system => blargh
tasktimeout => 120
</%config>
%################################################################
%# common block is prepended to all other blocks.
%# used to load modules
<%common>
use AC::Misc;
use AC::Dumper;
</%common>
%################################################################
%# init block runs once at startup.
%# the return value can be retrieved by other blocks.
%# used to calculate values or fetch things from a db.
<%init>
$R->print("starting map/reduce job");
return {
mood => 'joyous',
};
</%init>
%################################################################
<%map>
<%attr>
%# override various parameters
maxrun => 300
sortprog => /bin/sort
</%attr>
my $data = shift; # one record from the input
# return a key + a value
return ( $data->{cmp}, 1 );
</%map>
%################################################################
<%reduce>
my $key = shift;
my $itr = shift; # an iterator object
# count
my $n = 0;
$itr->foreach( sub { $n ++ } );
# return a key + a value
return ($key, $n);
</%reduce>
%#
%# additional reduce blocks can go here
%#
%################################################################
%# final block runs once with the results of the previous reduce.
%# used to generate report or insert to db
<%final>
<%attr>
%# override various parameters
use_strict => 0
in_package => My::Private::Space
</%attr>
<%init>
# init sub-block runs at start of final block
my $report;
</%init>
<%cleanup>
# cleanup sub-block runs at end of final block
# get the values from the init block
my $iv = $R->initvalue();
# stdout is tied to $R. so plain print also works.
print "report for mood: $iv->{mood}\n";
print $report;
</%cleanup>
my($key, $data) = @_;
$report .= "key: $key, value: $data\n";
</%final>
eg/filelist.pm view on Meta::CPAN
# -*- perl -*-
# example filelist
# $Id: filelist.pm,v 1.1 2010/11/01 19:04:21 jaw Exp $
package Local::MrMagoo::FileList;
use AC::ISOTime;
use AC::Yenta::Direct;
use JSON;
use strict;
my $YDBFILE = "/data/files.ydb";
sub get_file_list {
my $config = shift;
# get files + metadata from yenta
my $yenta = AC::Yenta::Direct->new( 'files', $YDBFILE );
# the job config is asking for files that match:
my $syst = $config->{system};
my $tmax = $config->{end}; # time_t
my $tmin = $config->{start}; # time_t
# the keys in yenta are of the form: 20100126150139_[...]
my $start = isotime($tmin); # 1286819830 => 20101011T175710Z
$start =~ s/^(\d+)T(\d+).*/$1$2/; # 20101011T175710Z => 20101011175710
my @files = grep {
# does this file match the request?
($_->{subsystem} eq $syst) &&
($_->{end_time} >= $tmin) &&
($_->{start_time} <= $tmax)
} map {
# get meta-data on this file. data is json encoded
my $d = $yenta->get($_);
$d = $d ? decode_json($d) : {};
# convert space seperated locations to arrayref
$d->{location} = [ (split /\s+/, $d->{location}) ];
$d;
} $yenta->getrange($start, undef); # get all files from $start to now
return \@files;
}
1;
#!/usr/local/bin/perl
# -*- perl -*-
# Copyright (c) 2010 by Jeff Weisberg
# Author: Jeff Weisberg <jaw @ tcp4me.com>
# Created: 2010-Jan-26 12:53 (EST)
# Function: mrmagoo job submit client
#
# $Id: mrgamoo,v 1.1 2010/11/01 19:02:57 jaw Exp $
my %opt;
use Getopt::Long qw(:config no_ignore_case);
use AC::MrMagoo::Client;
use strict;
GetOptions(\%opt,
"check",
"verbose|v",
"debug|d",
"console",
"start=s",
"end=s",
) || exit;
my $mrm = AC::MrMagoo::Client->new( $ARGV[0], \%opt );
if( $opt{check} ){
$mrm->check_code();
exit;
}
$mrm->open_console() if $opt{console};
# run job
my $id = $mrm->submit( seedlist() );
die "could not run job\n" unless $id;
print STDERR "job: $id\n" if $opt{verbose};
$SIG{INT} = $SIG{QUIT} = sub{ $mrm->abort(); exit; };
$mrm->run_console() if $opt{console};
exit;
################################################################
sub seedlist {
# determine list of servers to try
return '10.200.2.3:3504';
}
eg/mrgamoo.conf view on Meta::CPAN
# example config
#
# file will be reloaded automagically if it changes. no need to hup or restart.
port 3504
syslog local4
environment prod
allow 10.200.2.0/23
basedir /home/aclogs
seedpeer 10.200.2.3:3504
seedpeer 10.200.2.27:3504
eg/mrgamood view on Meta::CPAN
#!/usr/local/bin/perl
# -*- perl -*-
#
# example mrmagoo daemon
use Getopt::Std;
use AC::MrMagoo::D;
use strict;
our %OPT;
my @saved_argv = @ARGV;
getopts('c:dDfp:', \%OPT) || die "usage...\n";
# -c config file
# -d enable all debugging
# -f foreground
# -p port
my $mrm = AC::MrMagoo::D->new(
class_filelist => 'My::Code::FileList',
class_readinput => 'My::Code::ReadInput',
class_myself => 'My::Code::MySelf',
);
$mrm->daemon( $OPT{c}, {
argv => \@saved_argv,
foreground => $OPT{f},
debugall => $OPT{d},
port => $OPT{p},
} );
eg/myself.pm view on Meta::CPAN
# -*- perl -*-
# example myself
# $Id: myself.pm,v 1.1 2010/11/01 19:04:21 jaw Exp $
package Local::MrGamoo::MySelf;
use Sys::Hostname;
use strict;
my $SERVERID;
sub init {
my $class = shift;
my $port = shift; # our tcp port
my $id = shift; # from cmd line
$SERVERID = $id;
unless( $SERVERID ){
(my $h = hostname()) =~ s/\.example.com//; # remove domain
$SERVERID = "mrm/$h";
}
verbose("system persistent-id: $SERVERID");
}
sub my_server_id {
return $SERVERID;
}
1;
eg/readinput.pm view on Meta::CPAN
# -*- perl -*-
# example readinput
# $Id: readinput.pm,v 1.1 2010/11/01 19:04:22 jaw Exp $
package Local::MrMagoo::ReadInput;
use AC::MrMagoo::User;
use JSON;
use strict;
our $R; # exported by AC::MrMagoo::User
sub readinput {
my $fd = shift; # file handle
# our file is newline delimted json data
# read next line
my $line = scalar <$fd>;
# end of file?
return (undef, 1) unless defined $line;
my $d = json_decode($line);
# filter input on date range. we could just as easily filter
# in 'map', but doing it here, behind the scenes, keeps things
# simpler for the jr. developers writing reports.
return (undef, 0) if $d->{tstart} < $R->config('start');
return (undef, 0) if $d->{tstart} >= $R->config('end');
return ($d, 0);
}
1;
lib/AC/MrGamoo.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-May-13 18:18 (EDT)
# Function: documentation
#
# $Id: MrGamoo.pm,v 1.2 2011/01/12 19:29:21 jaw Exp $
package AC::MrGamoo;
use strict;
our $VERSION = 1.0;
=head1 NAME
AC::MrGamoo - Map/Reduce Framework
=head1 SYNOPSIS
use AC::MrGamoo::D;
use strict;
my $m = AC::MrGamoo::D->new( );
$m->daemon( $configfile, {
argv => \@ARGV,
foreground => $OPT{f},
debugall => $OPT{d},
port => $OPT{p},
} );
exit;
=head1 CONFIG FILE
various parameters need to be specified in a config file.
if you modify the file, it will be reloaded automagically.
=over 4
=item port
specify the TCP port to use
port 3504
=item environment
specify the environment or realm to run in, so you can run multiple
independent map/reduce networks, such as production, staging, and dev.
environment prod
=item allow
specify networks allowed to connect.
allow 127.0.0.1
allow 192.168.10.0/24
=item seedpeer
specify initial peers to contact when starting. the author generally
specifies 2 on the east coast, and 2 on the west coast.
seedpeer 192.168.10.11:3503
seedpeer 192.168.10.12:3503
=item secret
specify a secret key used to encrypt data transfered between
systems in different datacenters.
secret squeamish-ossifrage
=item syslog
specify a syslog facility for log messages.
syslog local5
=item basedir
local directory to store files
basedir /home/data
=item debug
enable debugging for a particular section
debug job
=back
=head1 BUGS
Too many to list here.
=head1 SEE ALSO
AC::MrGamoo::Client
=head1 AUTHOR
Jeff Weisberg - http://www.solvemedia.com/
=cut
1;
lib/AC/MrGamoo/AC/FileList.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-14 17:04 (EST)
# Function: get list of files to map
#
# $Id: FileList.pm,v 1.3 2010/11/10 16:24:38 jaw Exp $
package AC::MrGamoo::AC::FileList;
use AC::MrGamoo::Debug 'files';
use AC::ISOTime;
use AC::Yenta::Direct;
use JSON;
use strict;
my $YDBFILE = "/home/acdata/logfile.ydb";
# return an array of:
# {
# filename => www/2010/01/17/23/5943_prod_5x2N5qyerdeddsNi
# location => [ scrib@a2be021bd31c, scrib@a2be021ad31c ]
# size => 10863
# [anything else]
# }
# convert legacy scriblr ids
my %CONVERT = (
'scrib@a2be021ad31c' => 'mrm@gefiltefish1-r3.ccsphl',
'scrib@a2be021bd31c' => 'mrm@gefiltefish1-r4.ccsphl',
'scrib@a2be021cd31c' => 'mrm@gefiltefish2-r3.ccsphl',
'scrib@a2be021dd31c' => 'mrm@gefiltefish2-r4.ccsphl',
'scrib@a2be021ed31c' => 'mrm@gefiltefish3-r3.ccsphl',
'scrib@a2be021fd31c' => 'mrm@gefiltefish3-r4.ccsphl',
'scrib@a2be0220d31c' => 'mrm@gefiltefish4-r3.ccsphl',
'scrib@a2be0221d31c' => 'mrm@gefiltefish4-r4.ccsphl',
);
sub get_file_list {
my $config = shift;
my $yenta = AC::Yenta::Direct->new( 'logfile', $YDBFILE );
my $mode = $config->{datamode};
my $syst = $config->{system};
my $tmax = $config->{end};
my $tmin = $config->{start};
my $start = isotime($tmin);
$start =~ s/^(\d+)T(\d+).*/$1$2/; # 20091109T123456... => 20091109123456
# NB: keys in the yenta logfile map are of the form: 20100126150139_eqaB5uSerdeddsOw
$syst = undef if $syst eq '*';
$mode = undef if $mode eq '*';
$syst =~ s/[ ,]/\|/g;
if( $syst ){
$syst = qr/^($syst)$/;
}
debug("mode=$mode, syst=$syst, tmin=$tmin, tmax=$tmax, start=$start");
my @files = grep {
(!$mode || ($_->{environment} eq $mode)) &&
(!$syst || ($_->{subsystem} =~ $syst)) &&
($_->{end_time} >= $tmin) &&
($_->{start_time} <= $tmax)
} map {
#debug("file: $_");
my $d = $yenta->get($_);
$d = $d ? decode_json($d) : {};
$d->{location} = [ map { $CONVERT{$_} || $_ } (split /\s+/, $d->{location}) ];
$d;
} $yenta->getrange($start, undef);
debug("found " .scalar(@files)." files");
return \@files;
}
1;
lib/AC/MrGamoo/AC/MySelf.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-26 17:23 (EST)
# Function:
#
# $Id: MySelf.pm,v 1.1 2010/11/01 18:41:49 jaw Exp $
package AC::MrGamoo::AC::MySelf;
use AC::MrGamoo::Config;
use AC::MrGamoo::Debug;
use AC::DataCenter; # provides my_network_info, my_datacenter
use Sys::Hostname;
use strict;
my $SERVERID;
sub init {
my $class = shift;
my $port = shift; # not used
my $id = shift;
$SERVERID = $id;
unless( $SERVERID ){
(my $h = hostname()) =~ s/\.adcopy.*//;
my $v = conf_value('environment');
$SERVERID = 'mrm';
$SERVERID .= '/' . $v unless $v eq 'prod';
$SERVERID .= '@' . $h;
}
verbose("system persistent-id: $SERVERID");
}
sub my_server_id {
return $SERVERID;
}
1;
lib/AC/MrGamoo/AC/ReadInput.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-26 12:04 (EST)
# Function: read input - dancr log format
#
# $Id: ReadInput.pm,v 1.1 2010/11/01 18:41:49 jaw Exp $
package AC::MrGamoo::AC::ReadInput;
use AC::Logfile;
use AC::Daemon;
use AC::MrGamoo::User;
use strict;
our $R; # exported by AC::MrGamoo::User
sub readinput {
my $fd = shift;
my $line = scalar <$fd>;
return (undef, 1) unless defined $line;
my $d;
eval { $d = parse_dancr_log($line); };
if( $@ ){
problem("cannot parse data in (" . $R->config('current_file') . "). cannot process\n");
return ;
}
# filter input on date range. we could just as easily filter
# in 'map', but doing here, behind the scenes, keeps things
# simpler for the jr. developers writing reports.
return if $d->{tstart} < $R->config('start');
return if $d->{tstart} >= $R->config('end');
return ($d, 0);
}
1;
lib/AC/MrGamoo/API/Chk.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-19 18:18 (EST)
# Function:
#
# $Id: Chk.pm,v 1.1 2010/11/01 18:41:50 jaw Exp $
package AC::MrGamoo::API::Chk;
use AC::MrGamoo::Debug 'api_del';
use AC::MrGamoo::API::Simple;
use AC::MrGamoo::Scriblr;
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
my $file = filename($req->{filename});
if( $file && -f $file ){
reply( 500, 'Error', $io, $proto, $req );
}else{
reply( 200, 'OK', $io, $proto, $req );
}
}
1;
lib/AC/MrGamoo/API/Client.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-14 14:06 (EST)
# Function: send requests to server
#
# $Id: Client.pm,v 1.1 2010/11/01 18:41:50 jaw Exp $
package AC::MrGamoo::API::Client;
use AC::MrGamoo::Debug 'client';
use AC::MrGamoo::Protocol;
use AC::DC::IO::TCP::Client;
use Socket;
our @ISA = 'AC::DC::IO::TCP::Client';
use strict;
my $TIMEOUT = 15;
sub new {
my $class = shift;
my $addr = shift;
my $port = shift;
my $info = shift;
my $req = shift;
my $data = shift;
debug("new client type: $req->{type} to $addr:$port");
my $send = AC::MrGamoo::Protocol->encode_request( $req, $data );
my $me = $class->SUPER::new( $addr, $port,
info => "client $req->{type} to $addr:$port; $info",
request => $send,
);
return $me;
}
sub start {
my $me = shift;
$me->set_callback('timeout', \&_timeout);
$me->set_callback('read', \&_read);
$me->set_callback('shutdown', \&_shutdown);
$me->SUPER::start();
$me->timeout_rel($TIMEOUT);
$me->write( $me->{request} );
return $me;
}
sub _timeout {
my $me = shift;
$me->shut();
}
sub _shutdown {
my $me = shift;
if( $me->{status_ok} ){
$me->run_callback('on_success', { result => $me->{result} } );
}else{
$me->run_callback('on_failure');
}
}
sub _read {
my $me = shift;
my $evt = shift;
debug("recvd reply to $me->{info}");
my($proto, $data, $content) = read_protocol_no_content( $me, $evt );
return unless $proto;
# check response
if( $proto->{is_error} ){
return $me->_uhoh("rcvd error response");
}
$proto->{data} = AC::MrGamoo::Protocol->decode_reply($proto, $data);
debug("recvd reply to $me->{info} - $proto->{data}{status_code} $proto->{data}{status_message}");
if( $proto->{data}{status_code} != 200 ){
return $me->_uh_oh("recvd error reply $proto->{data}{status_code} $proto->{data}{status_message}");
}
$me->{result} = $proto;
$me->{status_ok} = 1;
$me->shut();
}
sub _uh_oh {
my $me = shift;
my $msg = shift;
debug("error $msg");
$me->run_callback('error', { error => $msg } );
$me->shut();
}
1;
lib/AC/MrGamoo/API/Del.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-13 12:35 (EST)
# Function:
#
# $Id: Del.pm,v 1.1 2010/11/01 18:41:51 jaw Exp $
package AC::MrGamoo::API::Del;
use AC::MrGamoo::Debug 'api_del';
use AC::MrGamoo::API::Simple;
use AC::MrGamoo::Scriblr;
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
# validate filename
my $file = filename($req->{filename});
debug("deleting file $file");
if( $file && -f $file ){
unlink $file;
}
if( -f $file ){
reply( 500, 'Error', $io, $proto, $req );
}else{
reply( 200, 'OK', $io, $proto, $req );
}
}
1;
lib/AC/MrGamoo/API/Get.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-19 18:21 (EST)
# Function:
#
# $Id: Get.pm,v 1.1 2010/11/01 18:41:51 jaw Exp $
package AC::MrGamoo::API::Get;
use AC::MrGamoo::Debug 'api_get';
use AC::MrGamoo::Config;
use AC::MrGamoo::API::Simple;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::Scriblr;
use AC::SHA1File;
use Fcntl;
use POSIX;
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
return unless $proto->{want_reply};
in_background( \&_get_file, $io, $proto, $req, $content );
}
sub _get_file {
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
my $file = filename($req->{filename});
my $fd = $io->{fd};
fcntl($fd, F_SETFL, 0); # unset nbio
return nbfd_reply(404, "not found", $fd, $proto, $req) unless -f $file;
open(F, $file) || return nbfd_reply(500, 'error', $fd, $proto, $req);
my $size = (stat($file))[7];
my $sha1 = sha1_file($file);
debug("get file '$file' size $size");
# send header
my $gb = ACPScriblReply->encode( { status_code => 200, status_message => 'OK', hash_sha1 => $sha1 } );
my $hdr = AC::MrGamoo::Protocol->encode_header(
type => $proto->{type},
msgidno => $proto->{msgidno},
is_reply => 1,
data_length => length($gb),
content_length => $size,
);
my $buf = $hdr . $gb;
syswrite( $fd, $buf );
# stream
AC::MrGamoo::Protocol->sendfile($fd, \*F, $size);
}
1;
lib/AC/MrGamoo/API/HB.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Dec-21 17:08 (EST)
# Function: respond to heartbeat requests
#
# $Id: HB.pm,v 1.1 2010/11/01 18:41:51 jaw Exp $
package AC::MrGamoo::API::HB;
use AC::MrGamoo::Debug 'hb';
use AC::MrGamoo::Config;
use AC::MrGamoo::Stats;
use AC::MrGamoo::About;
use AC::MrGamoo::MySelf;
use Sys::Hostname;
require 'AC/protobuf/heartbeat.pl';
use strict;
my $HOSTNAME = hostname();
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
unless( $proto->{want_reply} ){
$io->shut();
return;
}
my $response = AC::MrGamoo::Protocol->encode_reply( {
type => 'heartbeat_request',
msgid => $proto->{msgid},
is_reply => 1,
}, {
status_code => 200,
status_message => 'Honky Dory',
hostname => $HOSTNAME,
subsystem => 'mrgamoo',
environment => conf_value('environment'),
port => my_port(),
timestamp => time(),
sort_metric => loadave(),
server_id => my_server_id(),
process_id => $$,
} );
debug("sending hb reply");
$io->write_and_shut( $response );
}
1;
lib/AC/MrGamoo/API/JobAbort.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-13 12:25 (EST)
# Function:
#
# $Id: JobAbort.pm,v 1.1 2010/11/01 18:41:52 jaw Exp $
package AC::MrGamoo::API::JobAbort;
use AC::MrGamoo::Debug 'api_job';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::API::Simple;
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
debug("abort job $req->{jobid}");
my $r = AC::MrGamoo::Job->abort( jobid => $req->{jobid} );
if( $r ){
reply( 200, 'OK', $io, $proto, $req );
}else{
reply( 500, 'Error', $io, $proto, $req );
}
}
1;
lib/AC/MrGamoo/API/JobCreate.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-13 12:11 (EST)
# Function:
#
# $Id: JobCreate.pm,v 1.1 2010/11/01 18:41:52 jaw Exp $
package AC::MrGamoo::API::JobCreate;
use AC::MrGamoo::Debug 'api_job';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::API::Simple;
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
debug("new job $req->{jobid}");
if( $req->{console} =~ /^:/ ){
# fill in ip addr
$req->{console} = $io->{from_ip} . $req->{console};
}
my $x = AC::MrGamoo::Job->new( %$req );
my $r = $x ? $x->start() : undef;
if( $r ){
reply( 200, 'OK', $io, $proto, $req );
}else{
reply( 500, 'Error', $io, $proto, $req );
}
}
1;
lib/AC/MrGamoo/API/Put.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-19 18:21 (EST)
# Function:
#
# $Id: Put.pm,v 1.1 2010/11/01 18:41:52 jaw Exp $
package AC::MrGamoo::API::Put;
use AC::MrGamoo::Debug 'api_put';
use AC::MrGamoo::Config;
use AC::MrGamoo::API::Simple;
use AC::SHA1File;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::Scriblr;
use File::Path;
use POSIX;
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
if( conf_value('scriblr') =~ /no|off/i ){
reply( 500, 'Error', $io, $proto, $req );
return;
}
in_background( \&_put_file, $io, $proto, $req, $content );
}
sub _put_file {
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
my $file = filename($req->{filename});
my $fd = $io->{fd};
fcntl($fd, F_SETFL, 0); # unset nbio
my($dir) = $file =~ m|^(.+)/[^/]+$|;
# mkpath
eval{ mkpath($dir, undef, 0755) };
# open tmp
my $tmp = "$file.tmp";
unless( open(F, "> $tmp") ){
problem("open file failed: $!");
return nbfd_reply(500, 'error', $fd, $proto, $req);
}
# read + write
my $size = $proto->{content_length};
my $sha1 = $req->{hash_sha1};
verbose("put file '$file' size $size");
if( $content ){
syswrite( F, $content );
$size -= length($content);
}
eval {
my $chk = AC::MrGamoo::Protocol->sendfile(\*F, $fd, $size, 10);
close F;
die "file size mismatch\n" unless (stat($tmp))[7] == $proto->{content_length};
die "SHA1 check failed\n" if $sha1 && $sha1 ne $chk;
};
if(my $e = $@){
unlink $tmp;
verbose("error: $e");
nbfd_reply(500, 'error', $fd, $proto, $req);
return;
}
rename $tmp, $file;
nbfd_reply(200, 'OK', $fd, $proto, $req);
}
1;
lib/AC/MrGamoo/API/Simple.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-12 18:15 (EST)
# Function: common to most API handlers
#
# $Id: Simple.pm,v 1.1 2010/11/01 18:41:52 jaw Exp $
package AC::MrGamoo::API::Simple;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::Debug 'api';
use AC::Import;
use POSIX;
require 'AC/protobuf/std_reply.pl';
use strict;
our @EXPORT = qw(reply on_success on_failure in_background nbfd_reply);
sub reply {
my $code = shift;
my $msg = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
unless( $proto->{want_reply} ){
$io->shut();
return;
}
my $response = AC::MrGamoo::Protocol->encode_reply( {
type => $proto->{type},
msgid => $proto->{msgid},
is_reply => 1,
}, {
status_code => $code,
status_message => $msg,
} );
debug("sending $code reply for $proto->{type} on $io->{info}");
$io->write_and_shut( $response );
}
sub nbfd_reply {
my $code = shift;
my $msg = shift;
my $fd = shift;
my $proto = shift;
my $req = shift;
return unless $proto->{want_reply};
my $response = AC::MrGamoo::Protocol->encode_reply( {
type => $proto->{type},
msgid => $proto->{msgid},
is_reply => 1,
}, {
status_code => $code,
status_message => $msg,
} );
debug("sending $code reply for $proto->{type} (from bkg)");
syswrite( $fd, $response );
}
sub on_success {
my $x = shift;
my $e = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
reply( 200, 'OK', $io, $proto, $req );
}
sub on_failure {
my $x = shift;
my $e = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
reply( 500, 'Error', $io, $proto, $req );
}
sub in_background {
my $func = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $pid = fork();
if( !defined($pid) ){
problem("cannot fork: $!");
reply( 500, 'Error', $io, $proto, $req );
return ;
}elsif( $pid ){
# parent
$io->shut();
waitpid $pid, 0;
return;
}else{
# child
my $gpid = fork();
if( $gpid ){
# parent
_exit(0);
}else{
# orphaned child
eval {
$func->($io, $proto, $req, @_);
};
if(my $e = $@){
chomp $e;
verbose("child error: $e");
_exit(1);
}
_exit(0);
}
}
}
1;
lib/AC/MrGamoo/API/TaskAbort.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-13 12:25 (EST)
# Function:
#
# $Id: TaskAbort.pm,v 1.1 2010/11/01 18:41:53 jaw Exp $
package AC::MrGamoo::API::TaskAbort;
use AC::MrGamoo::Debug 'api_task';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::API::Simple;
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
debug("abort task $req->{jobid}/$req->{taskid}");
my $r = AC::MrGamoo::Task->abort( jobid => $req->{jobid}, taskid => $req->{taskid} );
if( $r ){
reply( 200, 'OK', $io, $proto, $req );
}else{
reply( 500, 'Error', $io, $proto, $req );
}
}
1;
lib/AC/MrGamoo/API/TaskCreate.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-13 12:11 (EST)
# Function:
#
# $Id: TaskCreate.pm,v 1.3 2011/01/10 15:23:00 jaw Exp $
package AC::MrGamoo::API::TaskCreate;
use AC::MrGamoo::Debug 'api_task';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::API::Simple;
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
debug("new task $req->{jobid}/$req->{taskid}");
my $x = AC::MrGamoo::Task->new( %$req );
my $r = $x ? $x->start() : undef;
if( $r ){
reply( 200, 'OK', $io, $proto, $req );
}else{
reply( 500, 'Error', $io, $proto, $req );
}
}
1;
lib/AC/MrGamoo/API/TaskStatus.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-13 12:40 (EST)
# Function: update our knowledge about a remote task
#
# $Id: TaskStatus.pm,v 1.1 2010/11/01 18:41:53 jaw Exp $
package AC::MrGamoo::API::TaskStatus;
use AC::MrGamoo::Debug 'api_job';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::API::Simple;
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
debug("updating task status $req->{jobid}/$req->{taskid}");
my $r = AC::MrGamoo::Job->task_status( %$req );
if( $r ){
reply( 200, 'OK', $io, $proto, $req );
}else{
reply( 500, 'Error', $io, $proto, $req );
}
}
1;
lib/AC/MrGamoo/API/Xfer.pm view on Meta::CPAN
#!/usr/local/bin/perl
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-07 17:20 (EST)
# Function:
#
# $Id: Xfer.pm,v 1.1 2010/11/01 18:41:53 jaw Exp $
package AC::MrGamoo::API::Xfer;
use AC::MrGamoo::Debug 'api_xfer';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::PeerList;
use AC::MrGamoo::API::Simple;
use strict;
my $MSGID = $$;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
# validate filename
if( $req->{filename} =~ m%/\.|^\.% ){
reply( 500, 'Error', $io, $proto, $req );
return;
}
# new retry
debug("starting file xfer $req->{copyid} => $req->{filename}");
# start working on the copy
my $x = AC::MrGamoo::Retry->new(
newobj => \&_mk_xfer,
newargs => [ $req ],
tryeach => $req->{location},
);
# reply now
if( $x ){
reply( 200, 'OK', $io, $proto, $req );
}else{
debug("sending error, xfer/retrier failed, $io->{info}");
reply( 501, 'Error', $io, $proto, $req );
}
# send status when finished
$x->set_callback('on_success', \&_yippee, $proto, $req);
$x->set_callback('on_failure', \&_boohoo, $proto, $req);
# start
$x->start();
}
sub _mk_xfer {
my $loc = shift;
my $req = shift;
my $x = AC::MrGamoo::Xfer->new(
$req->{filename}, ($req->{dstname} || $req->{filename}), $loc, $req,
);
return $x;
}
################################################################
sub _yippee {
my $x = shift;
my $e = shift;
my $proto = shift;
my $req = shift;
tell_master( $req, 200, 'OK' );
}
sub _boohoo {
my $x = shift;
my $e = shift;
my $proto = shift;
my $req = shift;
debug("boohoo - xfer failed $req->{copyid}");
tell_master( $req, 500, 'Failed' );
}
sub tell_master {
my $req = shift;
my $code = shift;
my $msg = shift;
my($addr, $port) = get_peer_addr_from_id( $req->{master} );
debug("sending xfer status update for $req->{copyid} => $code => $req->{master}");
debug("cannot find addr") unless $addr;
return unless $addr;
my $x = AC::MrGamoo::API::Client->new( $addr, $port, "xfer $req->{copyid}", {
type => 'mrgamoo_xferstatus',
msgidno => $MSGID++,
want_reply => 1,
}, {
jobid => $req->{jobid},
copyid => $req->{copyid},
status_code => $code,
status_message => $msg,
} );
debug("cannot create client") unless $x;
return unless $x;
$x->start();
# we don't need any reply or reply callbacks. just send + forget
}
################################################################
1;
lib/AC/MrGamoo/API/XferStatus.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-13 12:40 (EST)
# Function: update our knowledge about a remote task
#
# $Id: XferStatus.pm,v 1.1 2010/11/01 18:41:54 jaw Exp $
package AC::MrGamoo::API::XferStatus;
use AC::MrGamoo::Debug 'api_job';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::API::Simple;
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $req = shift;
my $content = shift;
debug("updating xfer status $req->{jobid}/$req->{copyid}");
my $r = AC::MrGamoo::Job->xfer_status( %$req );
if( $r ){
reply( 200, 'OK', $io, $proto, $req );
}else{
reply( 500, 'Error', $io, $proto, $req );
}
}
1;
lib/AC/MrGamoo/About.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-26 11:29 (EST)
# Function:
#
# $Id: About.pm,v 1.1 2010/11/01 18:41:39 jaw Exp $
package AC::MrGamoo::About;
use AC::Import;
use strict;
our @EXPORT = 'my_port';
my $port;
sub init {
my $class = shift;
$port = shift;
}
sub my_port { $port }
1;
view all matches for this distributionview release on metacpan - search on metacpan