AC-MrGamoo

 view release on metacpan or  search on metacpan

lib/AC/MrGamoo/AC/ReadInput.pm  view on Meta::CPAN


our $R;		# exported by AC::MrGamoo::User

sub readinput {
    my $fd = shift;

    my $line = scalar <$fd>;
    return (undef, 1) unless defined $line;

    my $d;
    eval { $d = parse_dancr_log($line); };
    if( $@ ){
        problem("cannot parse data in (" . $R->config('current_file') . "). cannot process\n");
        return ;
    }

    # filter input on date range. we could just as easily filter
    # in 'map', but doing here, behind the scenes, keeps things
    # simpler for the jr. developers writing reports.
    return if $d->{tstart} <  $R->config('start');
    return if $d->{tstart} >= $R->config('end');

lib/AC/MrGamoo/API/Put.pm  view on Meta::CPAN

    my $req     = shift;
    my $content = shift;

    my $file = filename($req->{filename});
    my $fd = $io->{fd};
    fcntl($fd, F_SETFL, 0);	# unset nbio

    my($dir) = $file =~ m|^(.+)/[^/]+$|;

    # mkpath
    eval{ mkpath($dir, undef, 0755) };

    # open tmp
    my $tmp = "$file.tmp";
    unless( open(F, "> $tmp") ){
        problem("open file failed: $!");
        return nbfd_reply(500, 'error', $fd, $proto, $req);
    }

    # read + write
    my $size = $proto->{content_length};
    my $sha1 = $req->{hash_sha1};

    verbose("put file '$file' size $size");

    if( $content ){
        syswrite( F, $content );
        $size -= length($content);
    }

    eval {
        my $chk = AC::MrGamoo::Protocol->sendfile(\*F, $fd, $size, 10);
        close F;
        die "file size mismatch\n" unless (stat($tmp))[7] == $proto->{content_length};
        die "SHA1 check failed\n" if $sha1 && $sha1 ne $chk;
    };
    if(my $e = $@){
        unlink $tmp;
        verbose("error: $e");
        nbfd_reply(500, 'error', $fd, $proto, $req);
        return;

lib/AC/MrGamoo/API/Simple.pm  view on Meta::CPAN

        return;
    }else{
        # child
        my $gpid = fork();

        if( $gpid ){
            # parent
            _exit(0);
        }else{
            # orphaned child
            eval {
                $func->($io, $proto, $req, @_);
            };
            if(my $e = $@){
                chomp $e;
                verbose("child error: $e");
                _exit(1);
            }
            _exit(0);
        }
    }

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

        type		=> 'mrgamoo_status',
        msgidno		=> $^T,
        want_reply	=> 1,
    }, {});

    # get the full list of servers
    # contact each seed passed in above, until we get a reply
    for my $s ( @$seed ){
        my($addr, $port) = split /:/, $s;
        $me->{fdebug}->("attempting to fetch server list from $addr:$port");
        eval {
            alarm(1);
            my $reply = AC::MrGamoo::Protocol->send_request( inet_aton($addr), $port, $listreq, $me->{fdebug} );
            my $res   = AC::MrGamoo::Protocol->decode_reply($reply);
            alarm(0);
            my $list = $res->{status};
            @serverlist = @$list if $list && @$list;
        };
        last if @serverlist;
    }

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

      grep { $_->{status} == 200 } @serverlist;

    # try all addresses
    # RSN - sort addresslist in a Peers::pick_best_addr_for_peer() like manner?

    my @addrlist = map { @{$_->{ip}} } @serverlist;

    for my $ip (@addrlist){
        my $addr = inet_itoa($ip->{ipv4});
        my $res;
        eval {
            alarm(30);
            $res = $me->_submit_to( $addr, $ip->{port}, $req );
            alarm(0);
        };
        next unless $res && $res->{status_code} == 200;
        $me->{master} = { addr => $addr, port => $ip->{port} };
        return 1;
    }
    return ;
}

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN

    $me->_check('final');

    return 1;
}

sub _check {
    my $me = shift;
    my $mr = $me->{program};

    my $prog = $mr->compile(@_);
    eval "sub $prog";
    die $@ if $@;
}


1;

lib/AC/MrGamoo/Customize.pm  view on Meta::CPAN

use strict;

sub customize {
    my $class  = shift;
    my $implby = shift;

    (my $default = $class) =~ s/(.*)::([^:]+)$/$1::Default::$2/;

    # load user's implemantation + default
    for my $p ($implby, $default){
        eval "require $p" if $p;
        die $@ if $@;
    }

    # import/export
    no strict;
    no warnings;
    for my $f ( @{$class . '::CUSTOM'} ){
        *{$class . '::' . $f} = ($implby && $implby->can($f)) || $default->can($f);
    }
}

lib/AC/MrGamoo/Job.pm  view on Meta::CPAN

    }

    verbose("new job: $me->{request}{jobid} ($me->{request}{traceinfo})");

    my $cf = $me->{options} = decode_json( $me->{request}{options} ) if $me->{request}{options};

    # open connection  to eu-console
    $me->{euconsole} = AC::MrGamoo::EUConsole->new( $me->{request}{jobid}, $me->{request}{console} );

    # partially compile
    eval {
        $me->{mr} = AC::MrGamoo::Submit::Compile->new( text => $me->{request}{jobsrc} );
    };
    if(my $e = $@){
        problem("cannot compile job: $e");
        return;
    }

    # RSN - get_file_list + Plan may take too long - do in sub-process

    # get file list

lib/AC/MrGamoo/Kibitz/Client.pm  view on Meta::CPAN

    my $me  = shift;
    my $evt = shift;

    debug("recvd reply");

    my($proto, $data, $content) = read_protocol_no_content( $me, $evt );
    return unless $proto;

    $me->{status_ok} = 1;

    eval {
        my $resp = AC::MrGamoo::Protocol->decode_reply( $proto, $data );
        for my $update ( @{$resp->{status}} ){
            AC::MrGamoo::Kibitz::Peers->update( $update );
        }
    };
    if(my $e = $@){
        verbose("error: $e");
    }
    $me->shut();
}

lib/AC/MrGamoo/Protocol.pm  view on Meta::CPAN


    return ($p, $data, $content);
}

sub _check_protocol {
    my $io  = shift;
    my $evt = shift;

    if( length($io->{rbuffer}) >= $HDRSIZE && !$io->{proto_header} ){
        # decode header
        eval {
            $io->{proto_header} = __PACKAGE__->decode_header( $io->{rbuffer} );
        };
        if(my $e=$@){
            verbose("cannot decode protocol header: $e");
            $io->run_callback('error', {
                cause	=> 'read',
                error	=> "cannot decode protocol: $e",
            });
            $io->shut();
            return;

lib/AC/MrGamoo/Server.pm  view on Meta::CPAN


    # dispatch request
    my $h = $HANDLER{ $proto->{type} };

    unless( $h ){
        verbose("unknown message type: $proto->{type}");
        $me->shut();
        return;
    }

    eval {
        $data = AC::MrGamoo::Protocol->decode_request($proto, $data) if $data && $proto->{type} ne 'http';
    };
    if(my $e = $@ ){
        problem("cannot decode request: $e");
        $me->shut();
        return;
    }

    debug("handling request - $proto->{type}");

lib/AC/MrGamoo/Submit/Compile.pm  view on Meta::CPAN

}

sub get_code {
    my $me   = shift;
    my $name = shift;
    my $num  = shift;

    my $prog = $me->compile( $name, $num );
    return unless $prog;

    my $c = eval $prog;
    die $@ if $@;

    return $c;
}

sub _die {
    my $me  = shift;
    my $err = shift;

    if( $me->{_lineno} ){

lib/AC/MrGamoo/Task.pm  view on Meta::CPAN

    if( $REGISTRY{$task} ){
        verbose("ignoring duplicate request task $task");
        # will cause a 200 OK, so the requestor will not retry
        return $REGISTRY{$task};
    }

    $me->{options} = decode_json( $me->{request}{options} ) if $me->{request}{options};
    $me->{initres} = from_json( $me->{request}{initres}, {allow_nonref => 1} ) if $me->{request}{initres};

    # compile
    eval {
        my $mr = AC::MrGamoo::Submit::Compile->new( text => $me->{request}{jobsrc} );
        # merge job config + opts.
        $mr->set_config($me->{options});
        $mr->set_initres($me->{initres});
        $me->{R} = AC::MrGamoo::Submit::Request->new( $mr );
        $me->{R}{config}{jobid}  = $me->{request}{jobid};
        $me->{R}{config}{taskid} = $me->{request}{taskid};
        $me->{mr} = $mr;
    };
    if(my $e = $@){

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN


    # send STDOUT + STDERR to end-user console session
    $me->{R}{eu_print_stderr} = sub { eu_print_stderr( $me, @_ )  };
    $me->{R}{eu_print_stdout} = sub { eu_print_stdout( $me, @_ ) };
    $me->{R}->redirect_io();

    my $n = $me->{request}{outfile} ? @{$me->{request}{outfile}} : 0;
    $me->{R}{func_output}   = sub{ _output_partition($me, $n, @_) };
    $me->{R}{func_progress} = sub{ _maybe_update_status($me, 'RUNNING', @_) };

    eval {
        _setup_outfiles( $me );

        if( $me->{request}{phase}     eq 'map' ){
            _do_map( $me );
        }elsif( $me->{request}{phase} eq 'final' ){
            _do_final( $me );
        }elsif( $me->{request}{phase} =~ /^reduce/ ){
            _do_reduce( $me );
        }else{
            die "unknown map/reduce phase '$me->{request}{phase}'\n";

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN


sub _setup_outfiles {
    my $me = shift;
    my @out;

    my $gz = $me->attr(undef, 'compress');
    for my $file ( @{$me->{request}{outfile}} ){
        my $f = conf_value('basedir') . '/' . $file;
        my($dir) = $f =~ m|^(.+)/[^/]+$|;

        eval{ mkpath($dir, undef, 0777) };
        push @out, AC::MrGamoo::OutFile->new( $f, $gz );
    }

    $me->{outfd} = \@out;
}

sub _close_outfiles {
    my $me = shift;

    for my $io ( @{$me->{outfd}} ){

lib/AC/MrGamoo/Xfer.pm  view on Meta::CPAN

    if( $REGISTRY{$req->{copyid}} ){
        verbose("ignoring duplicate xfer $req->{copyid}");
        return $REGISTRY{$req->{copyid}};
    }

    $dstname = conf_value('basedir') . '/' . $dstname;
    my $tmpfile = $dstname . ".$$";

    # mkpath
    my($dir) = $dstname =~ m|^(.+)/[^/]+$|;
    eval{ mkpath($dir, undef, 0755) };

    my $me = $class->SUPER::new( \&_run_child,
                                 [ $srcname, $dstname, $tmpfile, $loc, $req ],
                                 info     => "xfer $loc:$srcname",
                                 request  => $req,
                                 rbufsize => 65536,
                                );

    return unless $me;
    $REGISTRY{$req->{copyid}} = $me;

lib/AC/MrGamoo/Xfer.pm  view on Meta::CPAN


    debug("connecting to $addr:$port");

    my $req = AC::MrGamoo::Protocol->encode_request( {
        type		=> 'scribl_get',
        msgidno		=> $$,
        want_reply	=> 1,
    }, { filename => $srcname } );

    my $p;
    eval {
        # connect
        my $s = AC::MrGamoo::Protocol->connect_to_server( inet_aton($addr), $port );
        return unless $s;

        # send req
        AC::MrGamoo::Protocol->write_request($s, $req);

        # get response
        my $buf = AC::MrGamoo::Protocol->read_data($s, AC::MrGamoo::Protocol->header_size(), 30);
        $p      = AC::MrGamoo::Protocol->decode_header($buf);



( run in 1.270 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )