RocksDB
view release on metacpan or search on metacpan
vendor/rocksdb/build_tools/gnu_parallel view on Meta::CPAN
init_run_jobs();
my $sem;
if($Global::semaphore) {
$sem = acquire_semaphore();
}
$SIG{TERM} = \&start_no_new_jobs;
start_more_jobs();
if(not $opt::pipepart) {
if($opt::pipe) {
spreadstdin();
}
}
::debug("init", "Start draining\n");
drain_job_queue();
::debug("init", "Done draining\n");
reaper();
::debug("init", "Done reaping\n");
if($opt::pipe and @opt::a) {
for my $job (@Global::tee_jobs) {
unlink $job->fh(2,"name");
$job->set_fh(2,"name","");
$job->print();
unlink $job->fh(1,"name");
}
}
::debug("init", "Cleaning\n");
cleanup();
if($Global::semaphore) {
$sem->release();
}
for(keys %Global::sshmaster) {
kill "TERM", $_;
}
::debug("init", "Halt\n");
if($opt::halt_on_error) {
wait_and_exit($Global::halt_on_error_exitstatus);
} else {
wait_and_exit(min(undef_as_zero($Global::exitstatus),254));
}
sub __PIPE_MODE__ {}
sub pipe_part_files {
# Input:
# $file = the file to read
# Returns:
# @commands that will cat_partial each part
my ($file) = @_;
my $buf = "";
my $header = find_header(\$buf,open_or_exit($file));
# find positions
my @pos = find_split_positions($file,$opt::blocksize,length $header);
# Make @cat_partials
my @cat_partials = ();
for(my $i=0; $i<$#pos; $i++) {
push @cat_partials, cat_partial($file, 0, length($header), $pos[$i], $pos[$i+1]);
}
# Remote exec should look like:
# ssh -oLogLevel=quiet lo 'eval `echo $SHELL | grep "/t\{0,1\}csh" > /dev/null && echo setenv PARALLEL_SEQ '$PARALLEL_SEQ'\; setenv PARALLEL_PID '$PARALLEL_PID' || echo PARALLEL_SEQ='$PARALLEL_SEQ'\;export PARALLEL_SEQ\; PARALLEL_PID='$PARA...
# ssh -tt not allowed. Remote will die due to broken pipe anyway.
# TODO test remote with --fifo / --cat
return @cat_partials;
}
sub find_header {
# Input:
# $buf_ref = reference to read-in buffer
# $fh = filehandle to read from
# Uses:
# $opt::header
# $opt::blocksize
# Returns:
# $header string
my ($buf_ref, $fh) = @_;
my $header = "";
if($opt::header) {
if($opt::header eq ":") { $opt::header = "(.*\n)"; }
# Number = number of lines
$opt::header =~ s/^(\d+)$/"(.*\n)"x$1/e;
while(read($fh,substr($$buf_ref,length $$buf_ref,0),$opt::blocksize)) {
if($$buf_ref=~s/^($opt::header)//) {
$header = $1;
last;
}
}
}
return $header;
}
sub find_split_positions {
# Input:
# $file = the file to read
# $block = (minimal) --block-size of each chunk
# $headerlen = length of header to be skipped
# Uses:
# $opt::recstart
# $opt::recend
# Returns:
# @positions of block start/end
my($file, $block, $headerlen) = @_;
my $size = -s $file;
$block = int $block;
# The optimal dd blocksize for mint, redhat, solaris, openbsd = 2^17..2^20
# The optimal dd blocksize for freebsd = 2^15..2^17
my $dd_block_size = 131072; # 2^17
my @pos;
my ($recstart,$recend) = recstartrecend();
my $recendrecstart = $recend.$recstart;
my $fh = ::open_or_exit($file);
push(@pos,$headerlen);
for(my $pos = $block+$headerlen; $pos < $size; $pos += $block) {
my $buf;
seek($fh, $pos, 0) || die;
while(read($fh,substr($buf,length $buf,0),$dd_block_size)) {
if($opt::regexp) {
# If match /$recend$recstart/ => Record position
if($buf =~ /(.*$recend)$recstart/os) {
my $i = length($1);
push(@pos,$pos+$i);
vendor/rocksdb/build_tools/gnu_parallel view on Meta::CPAN
# myfunc=() {...
# Post-shellshock style bash function:
# BASH_FUNC_myfunc()=() {...
#
# Uses:
# $Global::envvar = eval string that will set variables in both bash and csh
# $Global::envwarn = If functions are used: Give warning in csh
# $Global::envvarlen = length of $Global::envvar
# @opt::env
# $Global::shell
# %ENV
# Returns: N/A
$Global::envvar = "";
$Global::envwarn = "";
my @vars = ('parallel_bash_environment');
for my $varstring (@opt::env) {
# Split up --env VAR1,VAR2
push @vars, split /,/, $varstring;
}
if(grep { /^_$/ } @vars) {
# --env _
# Include all vars that are not in a clean environment
if(open(my $vars_fh, "<", $ENV{'HOME'} . "/.parallel/ignored_vars")) {
my @ignore = <$vars_fh>;
chomp @ignore;
my %ignore;
@ignore{@ignore} = @ignore;
close $vars_fh;
push @vars, grep { not defined $ignore{$_} } keys %ENV;
@vars = grep { not /^_$/ } @vars;
} else {
::error("Run '$Global::progname --record-env' in a clean environment first.\n");
::wait_and_exit(255);
}
}
# Duplicate vars as BASH functions to include post-shellshock functions.
# So --env myfunc should also look for BASH_FUNC_myfunc()
@vars = map { $_, "BASH_FUNC_$_()" } @vars;
# Keep only defined variables
@vars = grep { defined($ENV{$_}) } @vars;
# Pre-shellshock style bash function:
# myfunc=() { echo myfunc
# }
# Post-shellshock style bash function:
# BASH_FUNC_myfunc()=() { echo myfunc
# }
my @bash_functions = grep { substr($ENV{$_},0,4) eq "() {" } @vars;
my @non_functions = grep { substr($ENV{$_},0,4) ne "() {" } @vars;
if(@bash_functions) {
# Functions are not supported for all shells
if($Global::shell !~ m:/(bash|rbash|zsh|rzsh|dash|ksh):) {
::warning("Shell functions may not be supported in $Global::shell\n");
}
}
# Pre-shellschock names are without ()
my @bash_pre_shellshock = grep { not /\(\)/ } @bash_functions;
# Post-shellschock names are with ()
my @bash_post_shellshock = grep { /\(\)/ } @bash_functions;
my @qcsh = (map { my $a=$_; "setenv $a " . env_quote($ENV{$a}) }
grep { not /^parallel_bash_environment$/ } @non_functions);
my @qbash = (map { my $a=$_; "export $a=" . env_quote($ENV{$a}) }
@non_functions, @bash_pre_shellshock);
push @qbash, map { my $a=$_; "eval $a\"\$$a\"" } @bash_pre_shellshock;
push @qbash, map { /BASH_FUNC_(.*)\(\)/; "$1 $ENV{$_}" } @bash_post_shellshock;
#ssh -tt -oLogLevel=quiet lo 'eval `echo PARALLEL_SEQ='$PARALLEL_SEQ'\;export PARALLEL_SEQ\; PARALLEL_PID='$PARALLEL_PID'\;export PARALLEL_PID` ;' tty\ \>/dev/null\ \&\&\ stty\ isig\ -onlcr\ -echo\;echo\ \$SHELL\ \|\ grep\ \"/t\\\{0,1\\\}csh\"\...
#'\"\\\}\ \|\|\ myfunc\(\)\ \{\ \ echo\ a'
#'\}\ \;myfunc\ 1;
# Check if any variables contain \n
if(my @v = map { s/BASH_FUNC_(.*)\(\)/$1/; $_ } grep { $ENV{$_}=~/\n/ } @vars) {
# \n is bad for csh and will cause it to fail.
$Global::envwarn = ::shell_quote_scalar(q{echo $SHELL | egrep "/t?csh" > /dev/null && echo CSH/TCSH DO NOT SUPPORT newlines IN VARIABLES/FUNCTIONS. Unset }."@v".q{ && exec false;}."\n\n") . $Global::envwarn;
}
if(not @qcsh) { push @qcsh, "true"; }
if(not @qbash) { push @qbash, "true"; }
# Create lines like:
# echo $SHELL | grep "/t\\{0,1\\}csh" >/dev/null && setenv V1 val1 && setenv V2 val2 || export V1=val1 && export V2=val2 ; echo "$V1$V2"
if(@vars) {
$Global::envvar .=
join"",
(q{echo $SHELL | grep "/t\\{0,1\\}csh" > /dev/null && }
. join(" && ", @qcsh)
. q{ || }
. join(" && ", @qbash)
.q{;});
if($ENV{'parallel_bash_environment'}) {
$Global::envvar .= 'eval "$parallel_bash_environment";'."\n";
}
}
$Global::envvarlen = length $Global::envvar;
}
sub open_joblog {
# Open joblog as specified by --joblog
# Uses:
# $opt::resume
# $opt::resume_failed
# $opt::joblog
# $opt::results
# $Global::job_already_run
# %Global::fd
my $append = 0;
if(($opt::resume or $opt::resume_failed)
and
not ($opt::joblog or $opt::results)) {
::error("--resume and --resume-failed require --joblog or --results.\n");
::wait_and_exit(255);
}
if($opt::joblog) {
if($opt::resume || $opt::resume_failed) {
if(open(my $joblog_fh, "<", $opt::joblog)) {
# Read the joblog
$append = <$joblog_fh>; # If there is a header: Open as append later
my $joblog_regexp;
if($opt::resume_failed) {
# Make a regexp that only matches commands with exit+signal=0
# 4 host 1360490623.067 3.445 1023 1222 0 0 command
$joblog_regexp='^(\d+)(?:\t[^\t]+){5}\t0\t0\t';
} else {
# Just match the job number
$joblog_regexp='^(\d+)';
}
while(<$joblog_fh>) {
if(/$joblog_regexp/o) {
# This is 30% faster than set_job_already_run($1);
vec($Global::job_already_run,($1||0),1) = 1;
} elsif(not /\d+\s+[^\s]+\s+([0-9.]+\s+){6}/) {
::error("Format of '$opt::joblog' is wrong: $_");
::wait_and_exit(255);
}
}
close $joblog_fh;
}
}
if($append) {
# Append to joblog
if(not open($Global::joblog, ">>", $opt::joblog)) {
vendor/rocksdb/build_tools/gnu_parallel view on Meta::CPAN
my ($dummy_fh, $tmpfile) = ::tmpfile(SUFFIX => ".chr");
# Unlink to avoid leaving files if --dry-run or --sshlogin
unlink $tmpfile;
$command =
# Exit value:
# empty input = true
# some input = exit val from command
qq{ sh -c 'dd bs=1 count=1 of=$tmpfile 2>/dev/null'; }.
qq{ test \! -s "$tmpfile" && rm -f "$tmpfile" && exec true; }.
qq{ (cat $tmpfile; rm $tmpfile; cat - ) | }.
"($command);";
}
if($opt::tmux) {
# Wrap command with 'tmux'
$command = $self->tmux_wrap($command);
}
$self->{'wrapped'} = $command;
}
return $self->{'wrapped'};
}
sub set_sshlogin {
my $self = shift;
my $sshlogin = shift;
$self->{'sshlogin'} = $sshlogin;
delete $self->{'sshlogin_wrap'}; # If sshlogin is changed the wrap is wrong
delete $self->{'wrapped'};
}
sub sshlogin {
my $self = shift;
return $self->{'sshlogin'};
}
sub sshlogin_wrap {
# Wrap the command with the commands needed to run remotely
# Returns:
# $self->{'sshlogin_wrap'} = command wrapped with ssh+transfer commands
my $self = shift;
my $command = shift;
if(not defined $self->{'sshlogin_wrap'}) {
my $sshlogin = $self->sshlogin();
my $sshcmd = $sshlogin->sshcommand();
my $serverlogin = $sshlogin->serverlogin();
my ($pre,$post,$cleanup)=("","","");
if($serverlogin eq ":") {
# No transfer neeeded
$self->{'sshlogin_wrap'} = $command;
} else {
# --transfer
$pre .= $self->sshtransfer();
# --return
$post .= $self->sshreturn();
# --cleanup
$post .= $self->sshcleanup();
if($post) {
# We need to save the exit status of the job
$post = '_EXIT_status=$?; ' . $post . ' exit $_EXIT_status;';
}
# If the remote login shell is (t)csh then use 'setenv'
# otherwise use 'export'
# We cannot use parse_env_var(), as PARALLEL_SEQ changes
# for each command
my $parallel_env =
($Global::envwarn
. q{ 'eval `echo $SHELL | grep "/t\\{0,1\\}csh" > /dev/null }
. q{ && echo setenv PARALLEL_SEQ '$PARALLEL_SEQ'\; }
. q{ setenv PARALLEL_PID '$PARALLEL_PID' }
. q{ || echo PARALLEL_SEQ='$PARALLEL_SEQ'\;export PARALLEL_SEQ\; }
. q{ PARALLEL_PID='$PARALLEL_PID'\;export PARALLEL_PID` ;' });
my $remote_pre = "";
my $ssh_options = "";
if(($opt::pipe or $opt::pipepart) and $opt::ctrlc
or
not ($opt::pipe or $opt::pipepart) and not $opt::noctrlc) {
# TODO Determine if this is needed
# Propagating CTRL-C to kill remote jobs requires
# remote jobs to be run with a terminal.
$ssh_options = "-tt -oLogLevel=quiet";
# $ssh_options = "";
# tty - check if we have a tty.
# stty:
# -onlcr - make output 8-bit clean
# isig - pass CTRL-C as signal
# -echo - do not echo input
$remote_pre .= ::shell_quote_scalar('tty >/dev/null && stty isig -onlcr -echo;');
}
if($opt::workdir) {
my $wd = ::shell_quote_file($self->workdir());
$remote_pre .= ::shell_quote_scalar("mkdir -p ") . $wd .
::shell_quote_scalar("; cd ") . $wd .
# exit 255 (instead of exec false) would be the correct thing,
# but that fails on tcsh
::shell_quote_scalar(qq{ || exec false;});
}
# This script is to solve the problem of
# * not mixing STDERR and STDOUT
# * terminating with ctrl-c
# It works on Linux but not Solaris
# Finishes on Solaris, but wrong exit code:
# $SIG{CHLD} = sub {exit ($?&127 ? 128+($?&127) : 1+$?>>8)};
# Hangs on Solaris, but correct exit code on Linux:
# $SIG{CHLD} = sub { $done = 1 };
# $p->poll;
my $signal_script = "perl -e '".
q{
use IO::Poll;
$SIG{CHLD} = sub { $done = 1 };
$p = IO::Poll->new;
$p->mask(STDOUT, POLLHUP);
$pid=fork; unless($pid) {setpgrp; exec $ENV{SHELL}, "-c", @ARGV; die "exec: $!\n"}
$p->poll;
kill SIGHUP, -${pid} unless $done;
wait; exit ($?&127 ? 128+($?&127) : 1+$?>>8)
} . "' ";
$signal_script =~ s/\s+/ /g;
$self->{'sshlogin_wrap'} =
($pre
. "$sshcmd $ssh_options $serverlogin $parallel_env "
. $remote_pre
# . ::shell_quote_scalar($signal_script . ::shell_quote_scalar($command))
. ::shell_quote_scalar($command)
. ";"
. $post);
}
}
return $self->{'sshlogin_wrap'};
( run in 0.628 second using v1.01-cache-2.11-cpan-39bf76dae61 )