AC-MrGamoo

 view release on metacpan or  search on metacpan

lib/AC/MrGamoo/API/Client.pm  view on Meta::CPAN

                                 info	 => "client $req->{type} to $addr:$port; $info",
                                 request => $send,
                                );

    return $me;
}

sub start {
    my $me = shift;

    $me->set_callback('timeout',  \&_timeout);
    $me->set_callback('read',     \&_read);
    $me->set_callback('shutdown', \&_shutdown);

    $me->SUPER::start();
    $me->timeout_rel($TIMEOUT);
    $me->write( $me->{request} );

    return $me;
}

sub _timeout {
    my $me = shift;

lib/AC/MrGamoo/API/Get.pm  view on Meta::CPAN

}

sub _get_file {
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    my $file = filename($req->{filename});
    my $fd = $io->{fd};
    fcntl($fd, F_SETFL, 0);	# unset nbio

    return nbfd_reply(404, "not found", $fd, $proto, $req) unless -f $file;
    open(F, $file) || return nbfd_reply(500, 'error', $fd, $proto, $req);
    my $size = (stat($file))[7];
    my $sha1 = sha1_file($file);

    debug("get file '$file' size $size");

    # send header
    my $gb  = ACPScriblReply->encode( { status_code => 200, status_message => 'OK', hash_sha1 => $sha1 } );

lib/AC/MrGamoo/API/Put.pm  view on Meta::CPAN

}

sub _put_file {
    my $io      = shift;
    my $proto   = shift;
    my $req     = shift;
    my $content = shift;

    my $file = filename($req->{filename});
    my $fd = $io->{fd};
    fcntl($fd, F_SETFL, 0);	# unset nbio

    my($dir) = $file =~ m|^(.+)/[^/]+$|;

    # mkpath
    eval{ mkpath($dir, undef, 0755) };

    # open tmp
    my $tmp = "$file.tmp";
    unless( open(F, "> $tmp") ){
        problem("open file failed: $!");

lib/AC/MrGamoo/API/Xfer.pm  view on Meta::CPAN


    # reply now
    if( $x ){
        reply( 200, 'OK', $io, $proto, $req );
    }else{
        debug("sending error, xfer/retrier failed, $io->{info}");
        reply( 501, 'Error', $io, $proto, $req );
    }

    # send status when finished
    $x->set_callback('on_success', \&_yippee, $proto, $req);
    $x->set_callback('on_failure', \&_boohoo, $proto, $req);

    # start
    $x->start();
}

sub _mk_xfer {
    my $loc  = shift;
    my $req  = shift;

    my $x = AC::MrGamoo::Xfer->new(

lib/AC/MrGamoo/Client.pm  view on Meta::CPAN


    return $me;
}

sub get_config_param {
    my $me = shift;

    $me->{program}->get_config_param(@_);
}

sub set_config_param {
    my $me = shift;

    $me->{program}->set_config_param(@_);
}

sub open_console {
    my $me = shift;

    my $fd;
    socket($fd, PF_INET, SOCK_DGRAM, 0);
    bind($fd, sockaddr_in(0, INADDR_ANY));
    my $s = getsockname($fd);
    my($port, $addr) = sockaddr_in($s);

lib/AC/MrGamoo/Job/Request.pm  view on Meta::CPAN

    debug("starting request $me->{info}");
    delete $job->{request_pending}{$me->{id}};

    my $x = $job->_send_request( $me->{server}, $me->{info}, $me->{proto}, $me->{request});

    unless( $x ){
        verbose("cannot start request");
        return;
    }

    $x->set_callback('on_success', \&_cb_start_req,  $me, $job, 1);
    $x->set_callback('on_failure', \&_cb_start_req,  $me, $job, 0);

    $job->{request_running}{$me->{id}} = $me;
    $x->start();
}

sub _cb_start_req {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;
    my $job = shift;

lib/AC/MrGamoo/Job/Task.pm  view on Meta::CPAN

        master		=> my_server_id(),
    } );

    unless( $x ){
        verbose("cannot start task");
        $me->failed($job);
        return;
    }

    # no success cb here. we will either timeout, or get a TaskStatus msg.
    $x->set_callback('on_failure', \&_cb_start_task_fail, $me, $job );

    $me->started($job, 'task');
    $x->start();
}

sub _cb_start_task_fail {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;
    my $job = shift;

lib/AC/MrGamoo/Job/Xfer.pm  view on Meta::CPAN

        master		=> my_server_id(),
    } );

    unless( $x ){
        verbose("cannot start xfer");
        $me->failed( $job );
        return;
    }

    # no success cb here. we will either timeout, or get a XferStatus msg.
    $x->set_callback('on_failure', \&_cb_start_xfer_fail, $me, $job );

    $me->started($job, 'xfer');
    $x->start();
}

sub _cb_start_xfer_fail {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;
    my $job = shift;

lib/AC/MrGamoo/Kibitz/Client.pm  view on Meta::CPAN

my $msgid   = $$;

sub new {
    my $class = shift;
    # addr, port, ...

    debug('starting kibitz status client');
    my $me = $class->SUPER::new( @_ );
    return unless $me;

    $me->set_callback('timeout',  \&timeout);
    $me->set_callback('read',     \&read);
    $me->set_callback('shutdown', \&shutdown);

    $me->start();

    # build request
    my $req = AC::MrGamoo::Protocol->encode_request( {
        type            => 'mrgamoo_status',
        content_length  => 0,
        want_reply      => 1,
        msgid           => $msgid++,
    }, {

lib/AC/MrGamoo/Retry.pm  view on Meta::CPAN

sub _try {
    my $me = shift;

    my $a = $me->{tryeach}[ $me->{tries} ];
    my $o = $me->{newobj}->( $a, @{$me->{newargs}} );
    $me->{tries} ++;

    debug("try $me->{tries}");
    return _on_failure(undef, undef, $me) unless $o;

    $o->set_callback( 'on_success', \&_on_success, $me );
    $o->set_callback( 'on_failure', \&_on_failure, $me );

    $o->start();
}

sub _on_success {
    my $x  = shift;
    my $e  = shift;
    my $me = shift;

    debug("all done!");

lib/AC/MrGamoo/Server.pm  view on Meta::CPAN


    unless( $AC::MrGamoo::CONF->check_acl( $ip ) ){
        verbose("rejecting connection from $ip");
        return;
    }

    my $me = $class->SUPER::new( info => 'tcp mrgamoo server', from_ip => $ip );

    $me->start($fd);
    $me->timeout_rel($TIMEOUT);
    $me->set_callback('read',    \&read);
    $me->set_callback('timeout', \&timeout);
}

sub timeout {
    my $me = shift;

    debug("connection timed out");
    $me->shut();
}

sub read {

lib/AC/MrGamoo/Server.pm  view on Meta::CPAN

    debug("http get $base");
    my $f = $HTTP{$base};
    $f ||= \&http_notfound;
    my( $content, $code, $text ) = $f->($url);
    $code ||= 200;
    $text ||= 'OK';

    my $res = "HTTP/1.0 $code $text\r\n"
      . "Server: AC/MrGamoo\r\n"
      . "Connection: close\r\n"
      . "Content-Type: text/plain; charset=UTF-8\r\n"
      . "Content-Length: " . length($content) . "\r\n"
      . "\r\n"
      . $content ;

    $me->write_and_shut($res);
}

################################################################

sub http_notfound {

lib/AC/MrGamoo/Submit/Compile.pm  view on Meta::CPAN


    if( $d->{multi} ){
        # merge
        @{ $me->{content}{$tag} }{ keys %$cfg } = values %$cfg;
    }else{
        $me->_die("redefinition of '$tag' section") if $me->{content}{$tag};
        $me->{content}{$tag} = $cfg;
    }
}

sub set_initres {
    my $me = shift;
    my $ir = shift;

    $me->{initres} = $ir;
}

sub set_config {
    my $me  = shift;
    my $cfg = shift;

    $me->{content}{config} = $cfg;
}

sub get_config_param {
    my $me = shift;
    my $k  = shift;

    return $me->{content}{config}{$k};
}

sub set_config_param {
    my $me = shift;
    my $k  = shift;
    my $v  = shift;

    return $me->{content}{config}{$k} = $v;
}

sub _check {
    my $me = shift;

lib/AC/MrGamoo/Task.pm  view on Meta::CPAN

        return $REGISTRY{$task};
    }

    $me->{options} = decode_json( $me->{request}{options} ) if $me->{request}{options};
    $me->{initres} = from_json( $me->{request}{initres}, {allow_nonref => 1} ) if $me->{request}{initres};

    # compile
    eval {
        my $mr = AC::MrGamoo::Submit::Compile->new( text => $me->{request}{jobsrc} );
        # merge job config + opts.
        $mr->set_config($me->{options});
        $mr->set_initres($me->{initres});
        $me->{R} = AC::MrGamoo::Submit::Request->new( $mr );
        $me->{R}{config}{jobid}  = $me->{request}{jobid};
        $me->{R}{config}{taskid} = $me->{request}{taskid};
        $me->{mr} = $mr;
    };
    if(my $e = $@){
        problem("cannot compile task: $e");
        return;
    }

lib/AC/MrGamoo/Task.pm  view on Meta::CPAN


    debug("start $me->{request}{phase} task $me->{request}{jobid}/$me->{request}{taskid}");

    my $io = AC::DC::IO::Forked->new(
        \&AC::MrGamoo::Task::Running::_start_task, [ $me ],
        info	=> "task $me->{request}{jobid}/$me->{request}{taskid}",
       );

    $me->{io} = $io;
    $io->timeout_rel($TIMEOUT);
    $io->set_callback('timeout',  \&_timeout);
    $io->set_callback('read',     \&_read,     $me);
    $io->set_callback('shutdown', \&_shutdown, $me);

    $io->start();
}

sub abort {
    my $me = _find(shift, @_);

    return unless $me;
    debug("abort task $me->{request}{taskid}");
    $me->{io}->shut() if $me->{io};

lib/AC/MrGamoo/Task.pm  view on Meta::CPAN

    }, {
        jobid		=> $me->{request}{jobid},
        taskid		=> $me->{request}{taskid},
        phase		=> $me->{status}{phase},
        progress	=> $me->{status}{amt},
    } );

    return unless $x;

    $me->{_status_underway} ++;
    $x->set_callback('shutdown', \&_send_status_done, $me);

    $x->start();

}

sub attr {
    my $me = shift;
    my $bk = shift;
    my $p  = shift;

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

my $MAXRUN     = 3600;			# override with %attr maxrun
my $SORTPROG   = '/usr/bin/sort';	# override with %attr sortprog or config file
my $GZPROG     = '/usr/bin/gzcat';	# override with %attr gzprog or config file

# in child process
sub _start_task {
    my $me = shift;

    debug("start child task");
    $^T = time();
    _setup_stdio_etal();
    _setup_console( $me );
    _update_status( 'STARTING', 0 );

    # send STDOUT + STDERR to end-user console session
    $me->{R}{eu_print_stderr} = sub { eu_print_stderr( $me, @_ )  };
    $me->{R}{eu_print_stdout} = sub { eu_print_stdout( $me, @_ ) };
    $me->{R}->redirect_io();

    my $n = $me->{request}{outfile} ? @{$me->{request}{outfile}} : 0;
    $me->{R}{func_output}   = sub{ _output_partition($me, $n, @_) };
    $me->{R}{func_progress} = sub{ _maybe_update_status($me, 'RUNNING', @_) };

    eval {
        _setup_outfiles( $me );

        if( $me->{request}{phase}     eq 'map' ){
            _do_map( $me );
        }elsif( $me->{request}{phase} eq 'final' ){
            _do_final( $me );
        }elsif( $me->{request}{phase} =~ /^reduce/ ){
            _do_reduce( $me );
        }else{
            die "unknown map/reduce phase '$me->{request}{phase}'\n";
        }

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

        _send_eumsg($me, 'stderr', "ERROR: $myid - $e");
        _update_status( 'FAILED', 0 );
    }

    _close_outfiles( $me );
    _update_status( 'FINISHED', 0 );
    debug("finish child task");
    exit(0);
}

sub _setup_stdio_etal {

    # move socket to parent from STDOUT -> STATUS
    # so user code doesn't trample

    open( STATUS, ">&STDOUT" );
    close STDOUT; open( STDOUT, ">/dev/null");
    close STDIN;  open( STDIN,  "/dev/null");
    select STATUS; $| = 1; select STDOUT;
    $SIG{CHLD} = sub{};
    $SIG{ALRM} = sub{ die "timeout\n" };
    openlog('mrgamoo', 'ndelay, pid', (conf_value('syslog') || 'local4'));

    alarm( $MAXRUN );
}

sub _setup_console {
    my $me = shift;

    debug("setup console: $me->{request}{jobid}, $me->{request}{console}");
    $me->{euconsole} = AC::MrGamoo::EUConsole->new( $me->{request}{jobid}, $me->{request}{console} );
}

sub _send_eumsg {
    my $me   = shift;
    my $type = shift;
    my $msg  = shift;

    return unless $me->{euconsole};
    $me->{euconsole}->send_msg($type, $msg);

lib/AC/MrGamoo/Task/Running.pm  view on Meta::CPAN

sub _maybe_update_status {
    my $me = shift;

    $^T = time();

    return if $^T < ($me->{status_time} + $STATUSTIME);
    $me->{status_time} = $^T;
    _update_status( @_ );
}

sub _setup_outfiles {
    my $me = shift;
    my @out;

    my $gz = $me->attr(undef, 'compress');
    for my $file ( @{$me->{request}{outfile}} ){
        my $f = conf_value('basedir') . '/' . $file;
        my($dir) = $f =~ m|^(.+)/[^/]+$|;

        eval{ mkpath($dir, undef, 0777) };
        push @out, AC::MrGamoo::OutFile->new( $f, $gz );

lib/AC/MrGamoo/Xfer.pm  view on Meta::CPAN

    $me->_start();

}

sub _start {
    my $me = shift;

    debug("start xfer $me->{request}{copyid}");

    $me->timeout_rel($TIMEOUT);
    $me->set_callback('timeout',  \&timeout);
    $me->set_callback('shutdown', \&shutdown);

    $me->SUPER::start();
}

sub _run_child {
    my $srcname = shift;	# on remote system
    my $dstname = shift;	# on local system
    my $tmpfile = shift;
    my $loc     = shift;
    my $req     = shift;



( run in 0.847 second using v1.01-cache-2.11-cpan-49f99fa48dc )