Fluent-AgentLite
view release on metacpan or search on metacpan
bin/fluent-agent-lite view on Meta::CPAN
#!/usr/bin/env perl
use strict;
use warnings;
use English;
use Fcntl;
use FindBin;
use lib "$FindBin::Bin/../lib";
use lib "$FindBin::Bin/../extlib/lib/perl5";
use Log::Minimal;
local $Log::Minimal::PRINT = sub {
my ( $time, $type, $message, $trace,$raw_message) = @_;
warn "$time [$type] ($PID) $message at $trace\n";
};
use Fluent::AgentLite;
my $HUPPED = undef;
$SIG{ HUP } = sub { $HUPPED = 1; }; # to reconnect
my $TERMINATED = undef;
$SIG{ INT } = $SIG{ TERM } = sub { $TERMINATED = 1; }; # to terminate
my $checker_terminated = sub { $TERMINATED; };
my $checker_reconnect = sub {
if (shift) {
$HUPPED = undef;
} else {
$HUPPED or $TERMINATED;
}
};
use Getopt::Std qw//;
my %commandline_options;
Getopt::Std::getopts('f:p:s:b:n:t:i:l:P:S:d:k:jvFh', \%commandline_options);
sub HELP_MESSAGE {
print <<EOF;
Usage: fluent-agent-lite [options] TAG TARGET_FILE PRIMARY_SERVER[:PORT] [SECONDARY_SERVER[:PORT]]
fluent-agent-lite -p SERVER_LIST_FILE [-s SERVER_LIST_FILE] [options] TAG TARGET_FILE
port default: 24224
Options:
-f FIELDNAME fieldname of fluentd log message attribute (DEFAULT: message)
-p LIST_PATH primary servers list (server[:port] per line, random selected one server)
-s LIST_PATH secondary servers list (server[:port] per line, random selected one server)
-b BUF_SIZE log tailing buffer size (DEFAULT: 1MB)
-n NICE tail process nice (DEFAULT: 0)
-t TAIL_PATH tail path (DEFAULT: /usr/bin/tail)
-i SECONDS tail -F sleep interval (GNU tail ONLY, DEFAULT: tail default)
-l LOG_PATH log file path (DEFAULT: /tmp/fluent-agent.log)
-P TAG:DATA send a ping message per minute with specified TAG and DATA (DEFAULT: not to send)
(see also: fluent-plugin-ping-message)
-S SECONDS ping message interval seconds (DEFAULT: 60)
-d DRAIN_LOG_TAG emits drain log to fluentd: messages per drain/send (DEFAULT: not to emits)
-k KEEPALIVE_TIME connection keepalive time in seconds. 0 means infinity (DEFAULT: 1800, minimum: 120)
-j use JSON for message structure in transfering (highly experimental)
-v output logs of level debug and info (DEFAULT: warn/crit only)
-F force start even if input file is not found
-h print this message
EOF
exit 0;
}
if ($commandline_options{h}) { HELP_MESSAGE; }
$Log::Minimal::AUTODUMP = 1;
if ($commandline_options{v}) {
$ENV{LM_DEBUG} = 1;
$Log::Minimal::LOG_LEVEL = 'DEBUG';
} else {
$Log::Minimal::LOG_LEVEL = "WARN";
}
sub server_entry {
my $arg = shift;
my ($s,$p) = split(/:/, $arg);
$p = 24224 if not defined $p;
return [$s, $p];
}
sub load_server_list {
my $path = shift;
my @list;
open(my $fd, $path);
while(my $line = <$fd>) {
chomp $line;
push @list, server_entry($line) if length($line) > 0;
}
close($fd);
return @list;
}
my $output_tag = shift @ARGV;
my $input_file = shift @ARGV;
my $fieldname = 'message';
if ($commandline_options{f}) {
$fieldname = $commandline_options{f};
}
my @primary_servers;
my @secondary_servers;
if (scalar(@ARGV) > 0) {
push @primary_servers, server_entry(shift @ARGV);
push @secondary_servers, server_entry(shift @ARGV) if scalar(@ARGV) > 0;
}
if ($commandline_options{p}) {
die "primary server list file not exists" unless -f $commandline_options{p};
push @primary_servers, load_server_list($commandline_options{p});
}
if ($commandline_options{s}) {
die "secondary server list file not exists" unless -f $commandline_options{s};
push @secondary_servers, load_server_list($commandline_options{s});
}
my $buffer_size = 1024 * 1024;
if ($commandline_options{b} and $commandline_options{b} > 0) {
$buffer_size = $commandline_options{b};
}
my $nice = undef;
if (defined $commandline_options{n}) {
$nice = $commandline_options{n};
}
my $tail_path = '/usr/bin/tail';
if ($commandline_options{t}) {
die "not executable file specified as tail" unless -x $commandline_options{t};
$tail_path = $commandline_options{t};
}
my $sleep_interval = undef;
if ($commandline_options{i}) {
my $tail_interval_valid = (system('tail -s 1 /dev/null > /dev/null 2>&1') == 0);
die "tail interval specified, but your tail doesn't have -s option" unless $tail_interval_valid;
$sleep_interval = $commandline_options{i};
}
my $logpath = '/tmp/fluent-agent.log';
if ($commandline_options{l}) {
$logpath = $commandline_options{l};
}
my $ping_message = undef;
if ($commandline_options{P}) {
$commandline_options{P} =~ /^([^:]+):(.+)$/;
$ping_message = +{
tag => $1,
data => $2 . ' ' . $input_file ,
interval => ($commandline_options{S} or 60),
};
}
my $drain_log_tag = undef;
if ($commandline_options{d}) {
$drain_log_tag = $commandline_options{d};
}
my $keepalive_time = undef;
if (defined $commandline_options{k} and $commandline_options{k} >= 0) {
$keepalive_time = $commandline_options{k};
}
my $output_format = undef; # default: MessagePack
if ($commandline_options{j}) {
$output_format = 'json';
}
my $force_start = undef;
if ($commandline_options{F}) {
$force_start = 1;
}
open(STDOUT, ">> $logpath") or die "failed to reopen STDOUT to $logpath";
open(STDERR, ">> $logpath") or die "failed to reopen STDERR to $logpath";
use IO::Handle;
autoflush STDOUT 1;
autoflush STDERR 1;
sub build_tail_command {
my @command;
if ($nice) {
push @command, 'nice', '-n', $nice;
}
push @command, $tail_path;
if ($sleep_interval) {
push @command, '-s', $sleep_interval;
}
push @command, '-n', '0', '-F', $input_file;
infof 'tail command line: %s', \@command;
return @command;
}
sub open_tail {
my $pid = open(my $tailfd, '-|', build_tail_command())
or die "failed to exec tail";
my $io = IO::Handle->new();
$io->fdopen($tailfd, "r");
$io->blocking(0); # set nonblock
return ($io, $pid);
}
sub open_stdin {
my $io = IO::Handle->new();
$io->fdopen(fileno(STDIN), "r");
$io->blocking(0);
return ($io, undef);
}
sub close_fd {
my $fd = shift;
eval {
$fd->blocking(1);
close($fd);
};
# throw away errors
1;
}
sub main {
my ($tailfd, $tailpid);
if ( $input_file eq "-" ) {
($tailfd, $tailpid) = open_stdin();
} elsif ( -f $input_file || $force_start ) {
close(STDIN) or die "failed to close STDIN";
($tailfd, $tailpid) = open_tail();
} else {
die 'cannot find input file ', $input_file;
}
my $agent = Fluent::AgentLite->new(
$output_tag,
\@primary_servers,
\@secondary_servers,
{
buffer_size => $buffer_size,
ping_message => $ping_message,
drain_log_tag => $drain_log_tag,
keepalive_time => $keepalive_time,
output_format => $output_format,
},
);
$agent->execute( {
fieldname => $fieldname,
tailfd => $tailfd,
checker => {
term => $checker_terminated,
reconnect => $checker_reconnect,
},
} );
if (defined $tailpid) {
infof "Sending TERM to pid %s", $tailpid;
kill(TERM => $tailpid);
}
# agent exits (killed, or on error), and close pipe
close_fd($tailfd);
}
infof "starting to forward logs.";
main();
infof "process exit.";
exit 0;
( run in 2.245 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )