App-nrun
view release on metacpan or search on metacpan
lib/NRun/Worker.pm view on Meta::CPAN
#
# nrun is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with nrun. If not, see <http://www.gnu.org/licenses/>.
#
# Program: Worker.pm
# Author: Timo Benk <benk@b1-systems.de>
# Date: Wed Jul 17 19:44:13 2013 +0200
# Ident: e81f2ed28d3a5b52045231c0700113b9349472fe
# Branch: HEAD, v1.1.2, origin/master, origin/HEAD, master
#
# Changelog:--reverse --grep '^tags.*relevant':-1:%an : %ai : %s
#
# Timo Benk : 2013-04-28 17:27:31 +0200 : initial checkin
# Timo Benk : 2013-04-29 18:53:21 +0200 : introducing ncopy
# Timo Benk : 2013-05-03 13:52:25 +0200 : no output was returned on timeout
# Timo Benk : 2013-05-03 19:16:11 +0200 : no output was returned on SIGINT
# Timo Benk : 2013-05-08 10:05:39 +0200 : better signal handling implemented
# Timo Benk : 2013-05-08 13:46:36 +0200 : skip empty output when signaled USR1/USR2
# Timo Benk : 2013-05-09 07:31:52 +0200 : fix race condition in semaphore cleanup code
# Timo Benk : 2013-05-21 18:47:43 +0200 : parameter --async added
# Timo Benk : 2013-05-22 13:09:13 +0200 : option --no-logfile was broken
# Timo Benk : 2013-05-22 13:20:36 +0200 : --skip-ping-check and --skip-ns-check enabled
# Timo Benk : 2013-05-24 08:03:19 +0200 : generic mode added
# Timo Benk : 2013-06-13 13:59:01 +0200 : process output handling refined
# Timo Benk : 2013-06-14 12:38:19 +0200 : open3 will never return undef
# Timo Benk : 2013-06-20 19:33:33 +0200 : raise condition in signal handling code fixed
# Timo Benk : 2013-07-11 14:02:09 +0200 : better cleanup handling in signal handlers
#
###
# this is the base module for all worker implementations and
# it is responsible for loading the available implementations
# at runtime.
#
# a worker implements a a single remote access mechanism like ssh
# which will be used to execute commands on the remote host,
# delete files on the remote host and to copy files to the remote host.
#
# derived modules must implement the following subs's
#
# - init($cfg)
# - execute($cmd, $args)
# - delete($file)
# - copy($source, $target)
# - rcopy($source, $target)
#
# a derived module must call register() in BEGIN{}, otherwise it will not
# be available.
#
# a derived module must always write to $_self->{E} (STDERR) and
# $_self->{O} (STDOUT).
#
# all output produced by the derived worker modules must match the
# following format:
#
# HOSTNAME;[stdout|stderr];TSTAMP;PID;PID(CHILD);[debug|error|exit|output|end];"OUTPUT"
#
# this is the string which will be passed to the logger/filter implementations.
###
package NRun::Worker;
use strict;
use warnings;
use File::Basename;
use IPC::Open3;
###
# automagically load all available modules
INIT {
my $basedir = dirname($INC{"NRun/Worker.pm"}) . "/Workers";
opendir(DIR, $basedir) or die("$basedir: $!");
while (my $module = readdir(DIR)) {
if ($module =~ /\.pm$/i) {
require "$basedir/$module";
}
}
close DIR;
}
###
# all available workers will be registered here
my $workers = {};
###
# will be called by the worker modules on INIT.
#
# $_cfg - parameter hash where
# {
# 'MODE' - mode name
# 'DESC' - mode description
# 'NAME' - module name
# }
sub register {
my $_cfg = shift;
$workers->{$_cfg->{MODE}} = $_cfg;
}
###
# return all available worker modules
sub workers {
return $workers;
}
###
# create a new object.
#
# <- the new object
lib/NRun/Worker.pm view on Meta::CPAN
$_self->{handler_term} = NRun::Signal::register('TERM', \&handler_term, [ \$_self ], $$);
$_self->{O} = \*STDOUT;
$_self->{E} = \*STDERR;
}
###
# kill the currently running command
sub kill {
my $_self = shift;
if ($_self->{pid} ne "n/a") {
kill(KILL => $_self->{pid});
}
}
###
# SIGTERM signal handler.
sub handler_term {
my $_self = shift;
$$_self->kill();
print {$$_self->{O}} "$$_self->{hostname};stdout;" . time() . ";$$;$$_self->{pid};exit;\"exit code $NRun::Constants::CODE_SIGTERM;\"\n";
print {$$_self->{E}} "$$_self->{hostname};stderr;" . time() . ";$$;$$_self->{pid};error;\"SIGTERM received\"\n";
}
###
# SIGINT signal handler.
sub handler_int {
my $_self = shift;
$$_self->kill();
print {$$_self->{O}} "$$_self->{hostname};stdout;" . time() . ";$$;$$_self->{pid};exit;\"exit code $NRun::Constants::CODE_SIGINT;\"\n";
print {$$_self->{E}} "$$_self->{hostname};stderr;" . time() . ";$$;$$_self->{pid};error;\"SIGINT received\"\n";
}
###
# SIGALRM signal handler.
sub handler_alrm {
my $_self = shift;
$$_self->kill();
print {$$_self->{O}} "$$_self->{hostname};stdout;" . time() . ";$$;$$_self->{pid};exit;\"exit code $NRun::Constants::CODE_SIGALRM;\"\n";
print {$$_self->{E}} "$$_self->{hostname};stderr;" . time() . ";$$;$$_self->{pid};error;\"SIGALRM received\"\n";
}
###
# execute $_cmd.
#
# command output will be formatted the following way, line by line:
#
# HOSTNAME;[stdout|stderr];TSTAMP;PID;PID(CHILD);[debug|error|exit|output|end];"OUTPUT"
#
# $_cmd - the command to be executed
# <- the return code
sub do {
my $_self = shift;
my $_cmd = shift;
chomp($_cmd);
eval{
$_self->{pid} = open3(\*CMDIN, \*CMDOUT, \*CMDERR, "$_cmd");
};
if ($@) {
print {$_self->{E}} "$_self->{hostname};stderr;" . time() . ";$$;n/a;debug;\"exec $_cmd\"\n";
print {$_self->{E}} "$_self->{hostname};stderr;" . time() . ";$$;n/a;error;\"$@\"\n";
print {$_self->{O}} "$_self->{hostname};stdout;" . time() . ";$$;n/a;exit;\"exit code $NRun::Constants::EXECUTION_FAILED\"\n";
return $NRun::Constants::EXECUTION_FAILED;
}
print {$_self->{E}} "$_self->{hostname};stderr;" . time() . ";$$;$_self->{pid};debug;\"exec $_cmd\"\n";
my $selector = IO::Select->new();
$selector->add(\*CMDOUT, \*CMDERR);
while (my @ready = $selector->can_read()) {
foreach my $fh (@ready) {
if (fileno($fh) == fileno(CMDOUT)) {
while (my $line = <$fh>) {
chomp($line);
print {$_self->{O}} "$_self->{hostname};stdout;" . time() . ";$$;$_self->{pid};output;\"$line\"\n";
}
} elsif (fileno($fh) == fileno(CMDERR)) {
while (my $line = <$fh>) {
chomp($line);
print {$_self->{E}} "$_self->{hostname};stderr;" . time() . ";$$;$_self->{pid};output;\"$line\"\n";
}
}
$selector->remove($fh) if eof($fh);
}
}
close(CMDIN);
close(CMDOUT);
close(CMDERR);
waitpid($_self->{pid}, 0);
print {$_self->{O}} "$_self->{hostname};stdout;" . time() . ";$$;$_self->{pid};exit;\"exit code " . ($? >> 8) . "\"\n";
$_self->{pid} = "n/a";
( run in 1.029 second using v1.01-cache-2.11-cpan-d8267643d1d )