AC-MrGamoo

 view release on metacpan or  search on metacpan

LICENSE  view on Meta::CPAN

    This software may be copied and distributed under the terms
    found in the Perl "Artistic License".

    A copy of the "Artistic License" may be found in the standard
    Perl distribution.

MANIFEST  view on Meta::CPAN

lib/AC/MrGamoo.pm
lib/AC/MrGamoo/D.pm
lib/AC/MrGamoo/Job.pm
lib/AC/MrGamoo/Protocol.pm
lib/AC/MrGamoo/Submit/Compile/Block.pm
lib/AC/MrGamoo/Submit/Compile.pm
lib/AC/MrGamoo/Submit/TieIO.pm
lib/AC/MrGamoo/Submit/Request.pm
lib/AC/MrGamoo/Iter/File.pm
lib/AC/MrGamoo/Iter/Array.pm
lib/AC/MrGamoo/OutFile.pm
lib/AC/MrGamoo/Client.pm
lib/AC/MrGamoo/PeerList.pm
lib/AC/MrGamoo/API/Xfer.pm
lib/AC/MrGamoo/API/Get.pm
lib/AC/MrGamoo/API/Del.pm
lib/AC/MrGamoo/API/Chk.pm
lib/AC/MrGamoo/API/Put.pm
lib/AC/MrGamoo/API/TaskCreate.pm
lib/AC/MrGamoo/API/Client.pm
lib/AC/MrGamoo/API/XferStatus.pm
lib/AC/MrGamoo/API/Simple.pm
lib/AC/MrGamoo/API/TaskAbort.pm
lib/AC/MrGamoo/API/TaskStatus.pm
lib/AC/MrGamoo/API/HB.pm
lib/AC/MrGamoo/API/JobCreate.pm
lib/AC/MrGamoo/API/JobAbort.pm
lib/AC/MrGamoo/Default/MySelf.pm
lib/AC/MrGamoo/Default/FileList.pm
lib/AC/MrGamoo/Default/ReadInput.pm
lib/AC/MrGamoo/Task.pm
lib/AC/MrGamoo/Kibitz/Server.pm
lib/AC/MrGamoo/Kibitz/Peers.pm
lib/AC/MrGamoo/Kibitz/Client.pm
lib/AC/MrGamoo/Scriblr.pm
lib/AC/MrGamoo/ReadInput.pm
lib/AC/MrGamoo/Retry.pm
lib/AC/MrGamoo/FileList.pm
lib/AC/MrGamoo/Job/TaskInfo.pm
lib/AC/MrGamoo/Job/Util.pm
lib/AC/MrGamoo/Job/Request.pm
lib/AC/MrGamoo/Job/Task.pm
lib/AC/MrGamoo/Job/XferInfo.pm
lib/AC/MrGamoo/Job/RePlan.pm
lib/AC/MrGamoo/Job/Info.pm
lib/AC/MrGamoo/Job/Plan.pm
lib/AC/MrGamoo/Job/Done.pm
lib/AC/MrGamoo/Job/Xfer.pm
lib/AC/MrGamoo/Job/Action.pm
lib/AC/MrGamoo/Server.pm
lib/AC/MrGamoo/Iter.pm
lib/AC/MrGamoo/MySelf.pm
lib/AC/MrGamoo/About.pm
lib/AC/MrGamoo/Task/Running.pm
lib/AC/MrGamoo/Config.pm
lib/AC/MrGamoo/User.pm
lib/AC/MrGamoo/Xfer.pm
lib/AC/MrGamoo/Stats.pm
lib/AC/MrGamoo/EUConsole.pm
lib/AC/MrGamoo/Customize.pm
lib/AC/MrGamoo/Debug.pm
lib/AC/MrGamoo/Kibitz.pm
lib/AC/MrGamoo/AC/FileList.pm
lib/AC/MrGamoo/AC/ReadInput.pm
lib/AC/MrGamoo/AC/MySelf.pm
lib/AC/protobuf/std_ipport.pl
lib/AC/protobuf/mrgamoo_status.pl
lib/AC/protobuf/std_reply.pl
lib/AC/protobuf/mrgamoo.pl
lib/AC/protobuf/heartbeat.pl
lib/AC/protobuf/scrible.pl
LICENSE
proto
MANIFEST
eg/mrgamood
eg/filelist.pm
eg/readinput.pm
eg/mrgamoo
eg/mrgamoo.conf
eg/example.mrjob
eg/myself.pm
Makefile.PL
META.yml                                 Module meta-data (added by MakeMaker)

META.yml  view on Meta::CPAN

--- #YAML:1.0
name:               AC-MrGamoo
version:            1
abstract:           Map/Reduce Framework
author:
    - AdCopy <http://www.adcopy.com>
license:            perl
distribution_type:  module
configure_requires:
    ExtUtils::MakeMaker:  0
requires:
    AC::DC:               0
    Digest::SHA1:         0
    Google::ProtocolBuffers:  0
    JSON:                 0
    POSIX:                0
    Sys::Hostname:        0
    Time::HiRes:          0
no_index:
    directory:
        - t
        - inc
generated_by:       ExtUtils::MakeMaker version 6.48
meta-spec:
    url:      http://module-build.sourceforge.net/META-spec-v1.4.html
    version:  1.4

Makefile.PL  view on Meta::CPAN


use ExtUtils::MakeMaker;
WriteMakefile(
              NAME            => 'AC::MrGamoo',
              VERSION_FROM    => 'lib/AC/MrGamoo.pm',
              ABSTRACT_FROM   => 'lib/AC/MrGamoo.pm',
              AUTHOR          => 'AdCopy <http://www.adcopy.com>',
              LICENSE         => 'perl',
              PREREQ_PM       => {
                  'POSIX'                       => 0,
                  'Sys::Hostname'	        => 0,
                  'JSON'		        => 0,
                  'Digest::SHA1'	        => 0,
                  'Time::HiRes'			=> 0,
                  'Google::ProtocolBuffers'	=> 0,
		  'AC::DC'			=> 0,
              }
);

eg/example.mrjob  view on Meta::CPAN

%# -*- mason -*-
# Copyright (c) 2009 by AdCopy
# Author: Jeff Weisberg
# Created: 2009-Oct-28 11:19 (EDT)
# Function: test
#
# $Id: example.mrjob,v 1.1 2010/11/01 19:04:21 jaw Exp $

<%doc>
    map reduce example
</%doc>
%################################################################
%# provide values for configurable parameters
%# these override the defaults
%# and params specified on the command line, override these
<%config>
    system      => blargh
    tasktimeout => 120
</%config>
%################################################################
%# common block is prepended to all other blocks.
%# used to load modules
<%common>
    use AC::Misc;
    use AC::Dumper;
</%common>
%################################################################
%# init block runs once at startup.
%# the return value can be retrieved by other blocks.
%# used to calculate values or fetch things from a db.
<%init>
    $R->print("starting map/reduce job");

    return {
        mood    => 'joyous',
    };
</%init>
%################################################################
<%map>
<%attr>
%# override various parameters
    maxrun      => 300
    sortprog    => /bin/sort
</%attr>
    my $data = shift;   # one record from the input

    # return a key + a value
    return ( $data->{cmp}, 1 );
</%map>
%################################################################
<%reduce>
    my $key = shift;
    my $itr = shift;    # an iterator object

    # count
    my $n = 0;
    $itr->foreach( sub { $n ++ } );

    # return a key + a value
    return ($key, $n);
</%reduce>
%#
%# additional reduce blocks can go here
%#
%################################################################
%# final block runs once with the results of the previous reduce.
%# used to generate report or insert to db
<%final>
<%attr>
%# override various parameters
    use_strict  => 0
    in_package  => My::Private::Space
</%attr>
<%init>
    # init sub-block runs at start of final block
    my $report;
</%init>
<%cleanup>
    # cleanup sub-block runs at end of final block

    # get the values from the init block
    my $iv = $R->initvalue();

    # stdout is tied to $R. so plain print also works.

    print "report for mood: $iv->{mood}\n";
    print $report;

</%cleanup>
    my($key, $data) = @_;

    $report .= "key: $key, value: $data\n";

</%final>

eg/filelist.pm  view on Meta::CPAN

# -*- perl -*-
# example filelist

# $Id: filelist.pm,v 1.1 2010/11/01 19:04:21 jaw Exp $

package Local::MrMagoo::FileList;
use AC::ISOTime;
use AC::Yenta::Direct;
use JSON;
use strict;

my $YDBFILE = "/data/files.ydb";

sub get_file_list {
    my $config = shift;

    # get files + metadata from yenta
    my $yenta = AC::Yenta::Direct->new( 'files', $YDBFILE );

    # the job config is asking for files that match:
    my $syst  = $config->{system};
    my $tmax  = $config->{end};		# time_t
    my $tmin  = $config->{start};	# time_t

    # the keys in  yenta are of the form: 20100126150139_[...]
    my $start = isotime($tmin);		# 1286819830       => 20101011T175710Z
    $start =~ s/^(\d+)T(\d+).*/$1$2/;	# 20101011T175710Z => 20101011175710


    my @files = grep {
        # does this file match the request?
        ($_->{subsystem}   eq $syst) &&
        ($_->{end_time}    >= $tmin) &&
        ($_->{start_time}  <= $tmax)
    } map {
        # get meta-data on this file. data is json encoded
        my $d = $yenta->get($_);
        $d = $d ? decode_json($d) : {};
        # convert space seperated locations to arrayref
        $d->{location} = [ (split /\s+/, $d->{location}) ];
        $d;
    } $yenta->getrange($start, undef);	# get all files from $start to now


    return \@files;
}


1;

eg/mrgamoo  view on Meta::CPAN

#!/usr/local/bin/perl
# -*- perl -*-

# Copyright (c) 2010 by Jeff Weisberg
# Author: Jeff Weisberg <jaw @ tcp4me.com>
# Created: 2010-Jan-26 12:53 (EST)
# Function: mrmagoo job submit client
#
# $Id: mrgamoo,v 1.1 2010/11/01 19:02:57 jaw Exp $

my %opt;
use Getopt::Long qw(:config no_ignore_case);
use AC::MrMagoo::Client;
use strict;

GetOptions(\%opt,
           "check",
           "verbose|v",
           "debug|d",
           "console",
           "start=s",
           "end=s",
          ) || exit;


my $mrm = AC::MrMagoo::Client->new( $ARGV[0],  \%opt );

if( $opt{check} ){
    $mrm->check_code();
    exit;
}

$mrm->open_console() if $opt{console};

# run job
my $id = $mrm->submit( seedlist() );
die "could not run job\n" unless $id;

print STDERR "job: $id\n" if $opt{verbose};
$SIG{INT} = $SIG{QUIT} = sub{ $mrm->abort(); exit; };
$mrm->run_console() if $opt{console};

exit;

################################################################

sub seedlist {
    # determine list of servers to try
    return '10.200.2.3:3504';
}

eg/mrgamoo.conf  view on Meta::CPAN

# example config
#
# file will be reloaded automagically if it changes. no need to hup or restart.

port            3504
syslog          local4
environment	prod

allow           10.200.2.0/23

basedir         /home/aclogs

seedpeer        10.200.2.3:3504
seedpeer        10.200.2.27:3504

eg/mrgamood  view on Meta::CPAN

#!/usr/local/bin/perl
# -*- perl -*-
#
# example mrmagoo daemon

use Getopt::Std;
use AC::MrMagoo::D;
use strict;

our %OPT;
my @saved_argv = @ARGV;

getopts('c:dDfp:', \%OPT) || die "usage...\n";
# -c config file
# -d    enable all debugging
# -f    foreground
# -p port


my $mrm = AC::MrMagoo::D->new(
    class_filelist	=> 'My::Code::FileList',
    class_readinput	=> 'My::Code::ReadInput',
    class_myself	=> 'My::Code::MySelf',
   );


$mrm->daemon( $OPT{c}, {
    argv	=> \@saved_argv,
    foreground	=> $OPT{f},
    debugall	=> $OPT{d},
    port	=> $OPT{p},
} );

eg/myself.pm  view on Meta::CPAN

# -*- perl -*-
# example myself

# $Id: myself.pm,v 1.1 2010/11/01 19:04:21 jaw Exp $

package Local::MrGamoo::MySelf;
use Sys::Hostname;
use strict;

my $SERVERID;

sub init {
    my $class = shift;
    my $port  = shift;	# our tcp port
    my $id    = shift;  # from cmd line

    $SERVERID = $id;
    unless( $SERVERID ){
        (my $h = hostname()) =~ s/\.example.com//;	# remove domain
        $SERVERID = "mrm/$h";
    }
    verbose("system persistent-id: $SERVERID");
}

sub my_server_id {
    return $SERVERID;
}

1;

eg/readinput.pm  view on Meta::CPAN

# -*- perl -*-
# example readinput

# $Id: readinput.pm,v 1.1 2010/11/01 19:04:22 jaw Exp $

package Local::MrMagoo::ReadInput;
use AC::MrMagoo::User;
use JSON;
use strict;

our $R;		# exported by AC::MrMagoo::User

sub readinput {
    my $fd = shift;	# file handle

    # our file is newline delimted json data

    # read next line
    my $line = scalar <$fd>;
    # end of file?
    return (undef, 1) unless defined $line;

    my $d = json_decode($line);

    # filter input on date range. we could just as easily filter
    # in 'map', but doing it here, behind the scenes, keeps things
    # simpler for the jr. developers writing reports.

    return (undef, 0) if $d->{tstart} <  $R->config('start');
    return (undef, 0) if $d->{tstart} >= $R->config('end');

    return ($d, 0);
}

1;

lib/AC/MrGamoo.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-May-13 18:18 (EDT)
# Function: documentation
#
# $Id: MrGamoo.pm,v 1.2 2011/01/12 19:29:21 jaw Exp $

package AC::MrGamoo;
use strict;

our $VERSION = 1.0;

=head1 NAME

AC::MrGamoo - Map/Reduce Framework

=head1 SYNOPSIS

    use AC::MrGamoo::D;
    use strict;

    my $m = AC::MrGamoo::D->new( );

    $m->daemon( $configfile, {
      argv		=> \@ARGV,
      foreground	=> $OPT{f},
      debugall		=> $OPT{d},
      port		=> $OPT{p},
    } );

    exit;

=head1 CONFIG FILE

various parameters need to be specified in a config file.
if you modify the file, it will be reloaded automagically.

=over 4

=item port

specify the TCP port to use

    port 3504

=item environment

specify the environment or realm to run in, so you can run multiple
independent map/reduce networks, such as production, staging, and dev.

    environment prod

=item allow

specify networks allowed to connect.

    allow 127.0.0.1
    allow 192.168.10.0/24

=item seedpeer

specify initial peers to contact when starting. the author generally
specifies 2 on the east coast, and 2 on the west coast.

    seedpeer 192.168.10.11:3503
    seedpeer 192.168.10.12:3503

=item secret

specify a secret key used to encrypt data transfered between
systems in different datacenters.

    secret squeamish-ossifrage

=item syslog

specify a syslog facility for log messages.

    syslog local5

=item basedir

local directory to store files

    basedir         /home/data

=item debug

enable debugging for a particular section

    debug job

=back

=head1 BUGS

Too many to list here.

=head1 SEE ALSO

    AC::MrGamoo::Client

=head1 AUTHOR

    Jeff Weisberg - http://www.solvemedia.com/

=cut



1;

lib/AC/MrGamoo/AC/FileList.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-14 17:04 (EST)
# Function: get list of files to map
#
# $Id: FileList.pm,v 1.3 2010/11/10 16:24:38 jaw Exp $

package AC::MrGamoo::AC::FileList;
use AC::MrGamoo::Debug 'files';
use AC::ISOTime;
use AC::Yenta::Direct;
use JSON;
use strict;

my $YDBFILE = "/home/acdata/logfile.ydb";

# return an array of:
#   {
#     filename    => www/2010/01/17/23/5943_prod_5x2N5qyerdeddsNi
#     location    => [ scrib@a2be021bd31c, scrib@a2be021ad31c ]
#     size        => 10863
#     [anything else]
#   }

# convert legacy scriblr ids
my %CONVERT = (
    'scrib@a2be021ad31c' => 'mrm@gefiltefish1-r3.ccsphl',
    'scrib@a2be021bd31c' => 'mrm@gefiltefish1-r4.ccsphl',
    'scrib@a2be021cd31c' => 'mrm@gefiltefish2-r3.ccsphl',
    'scrib@a2be021dd31c' => 'mrm@gefiltefish2-r4.ccsphl',
    'scrib@a2be021ed31c' => 'mrm@gefiltefish3-r3.ccsphl',
    'scrib@a2be021fd31c' => 'mrm@gefiltefish3-r4.ccsphl',
    'scrib@a2be0220d31c' => 'mrm@gefiltefish4-r3.ccsphl',
    'scrib@a2be0221d31c' => 'mrm@gefiltefish4-r4.ccsphl',
);

sub get_file_list {
    my $config = shift;

    my $yenta = AC::Yenta::Direct->new( 'logfile', $YDBFILE );

    my $mode  = $config->{datamode};
    my $syst  = $config->{system};
    my $tmax  = $config->{end};
    my $tmin  = $config->{start};
    my $start = isotime($tmin);
    $start =~ s/^(\d+)T(\d+).*/$1$2/;	# 20091109T123456... => 20091109123456

    # NB: keys in the yenta logfile map are of the form: 20100126150139_eqaB5uSerdeddsOw

    $syst = undef if $syst eq '*';
    $mode = undef if $mode eq '*';
    $syst =~ s/[ ,]/\|/g;
    if( $syst ){
        $syst = qr/^($syst)$/;
    }

    debug("mode=$mode, syst=$syst, tmin=$tmin, tmax=$tmax, start=$start");
    my @files = grep {
        (!$mode || ($_->{environment} eq $mode)) &&
        (!$syst || ($_->{subsystem}   =~ $syst)) &&
        ($_->{end_time}    >= $tmin) &&
        ($_->{start_time}  <= $tmax)
    } map {
        #debug("file: $_");
        my $d = $yenta->get($_);
        $d = $d ? decode_json($d) : {};
        $d->{location} = [ map { $CONVERT{$_} || $_ } (split /\s+/, $d->{location}) ];
        $d;
    } $yenta->getrange($start, undef);

    debug("found " .scalar(@files)." files");
    return \@files;
}


1;

lib/AC/MrGamoo/AC/MySelf.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-26 17:23 (EST)
# Function: 
#
# $Id: MySelf.pm,v 1.1 2010/11/01 18:41:49 jaw Exp $

package AC::MrGamoo::AC::MySelf;
use AC::MrGamoo::Config;
use AC::MrGamoo::Debug;
use AC::DataCenter;	# provides my_network_info, my_datacenter
use Sys::Hostname;
use strict;

my $SERVERID;

sub init {
    my $class = shift;
    my $port  = shift;	# not used
    my $id    = shift;

    $SERVERID = $id;
    unless( $SERVERID ){
        (my $h = hostname()) =~ s/\.adcopy.*//;
        my $v = conf_value('environment');
        $SERVERID = 'mrm';
        $SERVERID .= '/' . $v unless $v eq 'prod';
        $SERVERID .= '@' . $h;
    }
    verbose("system persistent-id: $SERVERID");
}

sub my_server_id {
    return $SERVERID;
}

1;

lib/AC/MrGamoo/AC/ReadInput.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-26 12:04 (EST)
# Function: read input - dancr log format
#
# $Id: ReadInput.pm,v 1.1 2010/11/01 18:41:49 jaw Exp $

package AC::MrGamoo::AC::ReadInput;
use AC::Logfile;
use AC::Daemon;
use AC::MrGamoo::User;
use strict;

our $R;		# exported by AC::MrGamoo::User

sub readinput {
    my $fd = shift;

    my $line = scalar <$fd>;
    return (undef, 1) unless defined $line;

    my $d;
    eval { $d = parse_dancr_log($line); };
    if( $@ ){
        problem("cannot parse data in (" . $R->config('current_file') . "). cannot process\n");
        return ;
    }

    # filter input on date range. we could just as easily filter
    # in 'map', but doing here, behind the scenes, keeps things
    # simpler for the jr. developers writing reports.
    return if $d->{tstart} <  $R->config('start');
    return if $d->{tstart} >= $R->config('end');

    return ($d, 0);
}

1;

lib/AC/MrGamoo/API/Chk.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-19 18:18 (EST)
# Function: 
#
# $Id: Chk.pm,v 1.1 2010/11/01 18:41:50 jaw Exp $

package AC::MrGamoo::API::Chk;
use AC::MrGamoo::Debug 'api_del';
use AC::MrGamoo::API::Simple;
use AC::MrGamoo::Scriblr;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    my $file = filename($req->{filename});

    if( $file && -f $file ){
        reply( 500, 'Error', $io, $proto, $req );
    }else{
        reply( 200, 'OK', $io, $proto, $req );
    }
}

1;

lib/AC/MrGamoo/API/Client.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-14 14:06 (EST)
# Function: send requests to server
#
# $Id: Client.pm,v 1.1 2010/11/01 18:41:50 jaw Exp $

package AC::MrGamoo::API::Client;
use AC::MrGamoo::Debug 'client';
use AC::MrGamoo::Protocol;
use AC::DC::IO::TCP::Client;
use Socket;
our @ISA = 'AC::DC::IO::TCP::Client';

use strict;

my $TIMEOUT  = 15;

sub new {
    my $class = shift;
    my $addr  = shift;
    my $port  = shift;
    my $info  = shift;
    my $req   = shift;
    my $data  = shift;

    debug("new client type: $req->{type} to $addr:$port");
    my $send = AC::MrGamoo::Protocol->encode_request( $req, $data );
    my $me   = $class->SUPER::new( $addr, $port,
                                 info	 => "client $req->{type} to $addr:$port; $info",
                                 request => $send,
                                );

    return $me;
}

sub start {
    my $me = shift;

    $me->set_callback('timeout',  \&_timeout);
    $me->set_callback('read',     \&_read);
    $me->set_callback('shutdown', \&_shutdown);

    $me->SUPER::start();
    $me->timeout_rel($TIMEOUT);
    $me->write( $me->{request} );

    return $me;
}

sub _timeout {
    my $me = shift;
    $me->shut();
}

sub _shutdown {
    my $me = shift;

    if( $me->{status_ok} ){
        $me->run_callback('on_success', { result => $me->{result} } );
    }else{
        $me->run_callback('on_failure');
    }
}

sub _read {
    my $me  = shift;
    my $evt = shift;

    debug("recvd reply to $me->{info}");

    my($proto, $data, $content) = read_protocol_no_content( $me, $evt );
    return unless $proto;

    # check response
    if( $proto->{is_error} ){
        return $me->_uhoh("rcvd error response");
    }

    $proto->{data} = AC::MrGamoo::Protocol->decode_reply($proto, $data);
    debug("recvd reply to $me->{info} - $proto->{data}{status_code} $proto->{data}{status_message}");

    if( $proto->{data}{status_code} != 200 ){
        return $me->_uh_oh("recvd error reply $proto->{data}{status_code} $proto->{data}{status_message}");
    }

    $me->{result}    = $proto;
    $me->{status_ok} = 1;
    $me->shut();
}

sub _uh_oh {
    my $me  = shift;
    my $msg = shift;

    debug("error $msg");
    $me->run_callback('error', { error => $msg } );
    $me->shut();
}


1;

lib/AC/MrGamoo/API/Del.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-13 12:35 (EST)
# Function: 
#
# $Id: Del.pm,v 1.1 2010/11/01 18:41:51 jaw Exp $

package AC::MrGamoo::API::Del;
use AC::MrGamoo::Debug 'api_del';
use AC::MrGamoo::API::Simple;
use AC::MrGamoo::Scriblr;
use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    # validate filename
    my $file = filename($req->{filename});
    debug("deleting file $file");

    if( $file && -f $file ){
        unlink $file;
    }

    if( -f $file ){
        reply( 500, 'Error', $io, $proto, $req );
    }else{
        reply( 200, 'OK', $io, $proto, $req );
    }
}

1;

lib/AC/MrGamoo/API/Get.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-19 18:21 (EST)
# Function: 
#
# $Id: Get.pm,v 1.1 2010/11/01 18:41:51 jaw Exp $

package AC::MrGamoo::API::Get;
use AC::MrGamoo::Debug 'api_get';
use AC::MrGamoo::Config;
use AC::MrGamoo::API::Simple;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::Scriblr;
use AC::SHA1File;
use Fcntl;
use POSIX;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    return unless $proto->{want_reply};
    in_background( \&_get_file, $io, $proto, $req, $content );
}

sub _get_file {
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    my $file = filename($req->{filename});
    my $fd = $io->{fd};
    fcntl($fd, F_SETFL, 0);	# unset nbio

    return nbfd_reply(404, "not found", $fd, $proto, $req) unless -f $file;
    open(F, $file) || return nbfd_reply(500, 'error', $fd, $proto, $req);
    my $size = (stat($file))[7];
    my $sha1 = sha1_file($file);

    debug("get file '$file' size $size");

    # send header
    my $gb  = ACPScriblReply->encode( { status_code => 200, status_message => 'OK', hash_sha1 => $sha1 } );
    my $hdr = AC::MrGamoo::Protocol->encode_header(
        type		=> $proto->{type},
        msgidno		=> $proto->{msgidno},
        is_reply	=> 1,
        data_length	=> length($gb),
        content_length	=> $size,
       );
    my $buf = $hdr . $gb;

    syswrite( $fd, $buf );

    # stream
    AC::MrGamoo::Protocol->sendfile($fd, \*F, $size);
}


1;

lib/AC/MrGamoo/API/HB.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Dec-21 17:08 (EST)
# Function: respond to heartbeat requests
#
# $Id: HB.pm,v 1.1 2010/11/01 18:41:51 jaw Exp $

package AC::MrGamoo::API::HB;
use AC::MrGamoo::Debug 'hb';
use AC::MrGamoo::Config;
use AC::MrGamoo::Stats;
use AC::MrGamoo::About;
use AC::MrGamoo::MySelf;

use Sys::Hostname;

require 'AC/protobuf/heartbeat.pl';
use strict;

my $HOSTNAME = hostname();

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

    my $response = AC::MrGamoo::Protocol->encode_reply( {
        type            => 'heartbeat_request',
        msgid           => $proto->{msgid},
        is_reply        => 1,
    }, {
        status_code	=> 200,
        status_message	=> 'Honky Dory',
        hostname	=> $HOSTNAME,
        subsystem	=> 'mrgamoo',
        environment	=> conf_value('environment'),
        port		=> my_port(),
        timestamp	=> time(),
        sort_metric	=> loadave(),
        server_id	=> my_server_id(),
        process_id	=> $$,
    } );

    debug("sending hb reply");
    $io->write_and_shut( $response );

}



1;

lib/AC/MrGamoo/API/JobAbort.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-13 12:25 (EST)
# Function: 
#
# $Id: JobAbort.pm,v 1.1 2010/11/01 18:41:52 jaw Exp $

package AC::MrGamoo::API::JobAbort;
use AC::MrGamoo::Debug 'api_job';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;

use AC::MrGamoo::API::Simple;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;


    debug("abort job $req->{jobid}");

    my $r = AC::MrGamoo::Job->abort( jobid => $req->{jobid} );

    if( $r ){
        reply( 200, 'OK', $io, $proto, $req );
    }else{
        reply( 500, 'Error', $io, $proto, $req );
    }
}

1;

lib/AC/MrGamoo/API/JobCreate.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-13 12:11 (EST)
# Function: 
#
# $Id: JobCreate.pm,v 1.1 2010/11/01 18:41:52 jaw Exp $

package AC::MrGamoo::API::JobCreate;
use AC::MrGamoo::Debug 'api_job';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;

use AC::MrGamoo::API::Simple;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    debug("new job $req->{jobid}");

    if( $req->{console} =~ /^:/ ){
        # fill in ip addr
        $req->{console} = $io->{from_ip} . $req->{console};
    }

    my $x = AC::MrGamoo::Job->new( %$req );
    my $r = $x ? $x->start() : undef;

    if( $r ){
        reply( 200, 'OK', $io, $proto, $req );
    }else{
        reply( 500, 'Error', $io, $proto, $req );
    }
}

1;

lib/AC/MrGamoo/API/Put.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-19 18:21 (EST)
# Function: 
#
# $Id: Put.pm,v 1.1 2010/11/01 18:41:52 jaw Exp $

package AC::MrGamoo::API::Put;
use AC::MrGamoo::Debug 'api_put';
use AC::MrGamoo::Config;
use AC::MrGamoo::API::Simple;
use AC::SHA1File;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::Scriblr;
use File::Path;
use POSIX;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    if( conf_value('scriblr') =~ /no|off/i ){
        reply( 500, 'Error', $io, $proto, $req );
        return;
    }

    in_background( \&_put_file, $io, $proto, $req, $content );
}

sub _put_file {
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    my $file = filename($req->{filename});
    my $fd = $io->{fd};
    fcntl($fd, F_SETFL, 0);	# unset nbio

    my($dir) = $file =~ m|^(.+)/[^/]+$|;

    # mkpath
    eval{ mkpath($dir, undef, 0755) };

    # open tmp
    my $tmp = "$file.tmp";
    unless( open(F, "> $tmp") ){
        problem("open file failed: $!");
        return nbfd_reply(500, 'error', $fd, $proto, $req);
    }

    # read + write
    my $size = $proto->{content_length};
    my $sha1 = $req->{hash_sha1};

    verbose("put file '$file' size $size");

    if( $content ){
        syswrite( F, $content );
        $size -= length($content);
    }

    eval {
        my $chk = AC::MrGamoo::Protocol->sendfile(\*F, $fd, $size, 10);
        close F;
        die "file size mismatch\n" unless (stat($tmp))[7] == $proto->{content_length};
        die "SHA1 check failed\n" if $sha1 && $sha1 ne $chk;
    };
    if(my $e = $@){
        unlink $tmp;
        verbose("error: $e");
        nbfd_reply(500, 'error', $fd, $proto, $req);
        return;
    }

    rename $tmp, $file;

    nbfd_reply(200, 'OK', $fd, $proto, $req);
}

1;

lib/AC/MrGamoo/API/Simple.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-12 18:15 (EST)
# Function: common to most API handlers
#
# $Id: Simple.pm,v 1.1 2010/11/01 18:41:52 jaw Exp $

package AC::MrGamoo::API::Simple;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::Debug 'api';
use AC::Import;
use POSIX;

require 'AC/protobuf/std_reply.pl';
use strict;

our @EXPORT = qw(reply on_success on_failure in_background nbfd_reply);

sub reply {
    my $code    = shift;
    my $msg     = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;

    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

    my $response = AC::MrGamoo::Protocol->encode_reply( {
        type            => $proto->{type},
        msgid           => $proto->{msgid},
        is_reply        => 1,
    }, {
        status_code	=> $code,
        status_message	=> $msg,
    } );

    debug("sending $code reply for $proto->{type} on $io->{info}");
    $io->write_and_shut( $response );
}

sub nbfd_reply {
    my $code    = shift;
    my $msg     = shift;
    my $fd      = shift;
    my $proto   = shift;
    my $req     = shift;

    return unless $proto->{want_reply};

    my $response = AC::MrGamoo::Protocol->encode_reply( {
        type            => $proto->{type},
        msgid           => $proto->{msgid},
        is_reply        => 1,
    }, {
        status_code	=> $code,
        status_message	=> $msg,
    } );

    debug("sending $code reply for $proto->{type} (from bkg)");
    syswrite( $fd, $response );
}

sub on_success {
    my $x  = shift;
    my $e  = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;

    reply( 200, 'OK', $io, $proto, $req );
}

sub on_failure {
    my $x  = shift;
    my $e  = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;

    reply( 500, 'Error', $io, $proto, $req );
}


sub in_background {
    my $func    = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;

    my $pid = fork();

    if( !defined($pid) ){
        problem("cannot fork: $!");
        reply( 500, 'Error', $io, $proto, $req );
        return ;
    }elsif( $pid ){
        # parent
        $io->shut();
        waitpid $pid, 0;
        return;
    }else{
        # child
        my $gpid = fork();

        if( $gpid ){
            # parent
            _exit(0);
        }else{
            # orphaned child
            eval {
                $func->($io, $proto, $req, @_);
            };
            if(my $e = $@){
                chomp $e;
                verbose("child error: $e");
                _exit(1);
            }
            _exit(0);
        }
    }
}

1;

lib/AC/MrGamoo/API/TaskAbort.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-13 12:25 (EST)
# Function: 
#
# $Id: TaskAbort.pm,v 1.1 2010/11/01 18:41:53 jaw Exp $

package AC::MrGamoo::API::TaskAbort;
use AC::MrGamoo::Debug 'api_task';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;

use AC::MrGamoo::API::Simple;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;


    debug("abort task $req->{jobid}/$req->{taskid}");

    my $r = AC::MrGamoo::Task->abort( jobid => $req->{jobid}, taskid => $req->{taskid} );

    if( $r ){
        reply( 200, 'OK', $io, $proto, $req );
    }else{
        reply( 500, 'Error', $io, $proto, $req );
    }
}

1;

lib/AC/MrGamoo/API/TaskCreate.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-13 12:11 (EST)
# Function: 
#
# $Id: TaskCreate.pm,v 1.3 2011/01/10 15:23:00 jaw Exp $

package AC::MrGamoo::API::TaskCreate;
use AC::MrGamoo::Debug 'api_task';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;

use AC::MrGamoo::API::Simple;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    debug("new task $req->{jobid}/$req->{taskid}");
    my $x = AC::MrGamoo::Task->new( %$req );
    my $r = $x ? $x->start() : undef;

    if( $r ){
        reply( 200, 'OK', $io, $proto, $req );
    }else{
        reply( 500, 'Error', $io, $proto, $req );
    }
}

1;

lib/AC/MrGamoo/API/TaskStatus.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-13 12:40 (EST)
# Function: update our knowledge about a remote task
#
# $Id: TaskStatus.pm,v 1.1 2010/11/01 18:41:53 jaw Exp $

package AC::MrGamoo::API::TaskStatus;
use AC::MrGamoo::Debug 'api_job';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;

use AC::MrGamoo::API::Simple;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;


    debug("updating task status $req->{jobid}/$req->{taskid}");

    my $r = AC::MrGamoo::Job->task_status( %$req );

    if( $r ){
        reply( 200, 'OK', $io, $proto, $req );
    }else{
        reply( 500, 'Error', $io, $proto, $req );
    }
}

1;

lib/AC/MrGamoo/API/Xfer.pm  view on Meta::CPAN

#!/usr/local/bin/perl
# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-07 17:20 (EST)
# Function: 
#
# $Id: Xfer.pm,v 1.1 2010/11/01 18:41:53 jaw Exp $

package AC::MrGamoo::API::Xfer;
use AC::MrGamoo::Debug 'api_xfer';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;
use AC::MrGamoo::PeerList;

use AC::MrGamoo::API::Simple;

use strict;

my $MSGID = $$;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    # validate filename
    if( $req->{filename} =~ m%/\.|^\.% ){
        reply( 500, 'Error', $io, $proto, $req );
        return;
    }

    # new retry
    debug("starting file xfer $req->{copyid} => $req->{filename}");

    # start working on the copy
    my $x = AC::MrGamoo::Retry->new(
        newobj	=> \&_mk_xfer,
        newargs => [ $req ],
        tryeach	=> $req->{location},
       );

    # reply now
    if( $x ){
        reply( 200, 'OK', $io, $proto, $req );
    }else{
        debug("sending error, xfer/retrier failed, $io->{info}");
        reply( 501, 'Error', $io, $proto, $req );
    }

    # send status when finished
    $x->set_callback('on_success', \&_yippee, $proto, $req);
    $x->set_callback('on_failure', \&_boohoo, $proto, $req);

    # start
    $x->start();
}

sub _mk_xfer {
    my $loc  = shift;
    my $req  = shift;

    my $x = AC::MrGamoo::Xfer->new(
        $req->{filename}, ($req->{dstname} || $req->{filename}), $loc, $req,
       );

    return $x;
}

################################################################

sub _yippee {
    my $x  = shift;
    my $e  = shift;
    my $proto = shift;
    my $req   = shift;

    tell_master( $req, 200, 'OK' );
}

sub _boohoo {
    my $x  = shift;
    my $e  = shift;
    my $proto = shift;
    my $req   = shift;

    debug("boohoo - xfer failed $req->{copyid}");
    tell_master( $req, 500, 'Failed' );
}

sub tell_master {
    my $req   = shift;
    my $code  = shift;
    my $msg   = shift;

    my($addr, $port) = get_peer_addr_from_id( $req->{master} );
    debug("sending xfer status update for $req->{copyid} => $code => $req->{master}");
    debug("cannot find addr") unless $addr;
    return unless $addr;

    my $x = AC::MrGamoo::API::Client->new( $addr, $port, "xfer $req->{copyid}", {
        type		=> 'mrgamoo_xferstatus',
        msgidno		=> $MSGID++,
        want_reply	=> 1,
    }, {
        jobid		=> $req->{jobid},
        copyid		=> $req->{copyid},
        status_code	=> $code,
        status_message	=> $msg,
    } );

    debug("cannot create client") unless $x;
    return unless $x;
    $x->start();

    # we don't need any reply or reply callbacks. just send + forget
}


################################################################

1;

lib/AC/MrGamoo/API/XferStatus.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-13 12:40 (EST)
# Function: update our knowledge about a remote task
#
# $Id: XferStatus.pm,v 1.1 2010/11/01 18:41:54 jaw Exp $

package AC::MrGamoo::API::XferStatus;
use AC::MrGamoo::Debug 'api_job';
use AC::MrGamoo::Config;
use AC::MrGamoo::Protocol;

use AC::MrGamoo::API::Simple;

use strict;

sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;


    debug("updating xfer status $req->{jobid}/$req->{copyid}");

    my $r = AC::MrGamoo::Job->xfer_status( %$req );

    if( $r ){
        reply( 200, 'OK', $io, $proto, $req );
    }else{
        reply( 500, 'Error', $io, $proto, $req );
    }
}

1;

lib/AC/MrGamoo/About.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-26 11:29 (EST)
# Function: 
#
# $Id: About.pm,v 1.1 2010/11/01 18:41:39 jaw Exp $

package AC::MrGamoo::About;
use AC::Import;
use strict;

our @EXPORT = 'my_port';
my $port;

sub init {
    my $class = shift;
    $port = shift;
}

sub my_port { $port }

1;

 view all matches for this distribution
 view release on metacpan -  search on metacpan

( run in 0.748 second using v1.00-cache-2.02-grep-82fe00e-cpan-1925d2aa809 )