Fluent-AgentLite
view release on metacpan or search on metacpan
Ping message tag/data and emit interval specification. Without PING\_TAG, fluent-agent-lite doesn't emit ping\_messages.
Actual 'data' field of ping message is 'PING_DATA PATH\_OF\_INPUT\_FILE'.
### DRAIN_LOG_TAG
Tag name of drain\_log (messages count per drain/send to server), which is emitted to configured fluentd sever as fluentd message. Default is none (not to send drain\_log).
### KEEPALIVE_TIME
Connection keepalive time in seconds. 0 means infinity (Default: 1800, minimum: 120)
### LOG_PATH
Log file path for 'fluent-agent-lite' (Default: /tmp/fluent-agent.log).
### LOG_VERBOSE
If you specify 'LOG_VERBOSE="yes"', 'fluent-agent-lite' writes logs with level info/debug (Default: warn/crit only).
## Run
bin/fluent-agent-lite view on Meta::CPAN
-s LIST_PATH secondary servers list (server[:port] per line, random selected one server)
-b BUF_SIZE log tailing buffer size (DEFAULT: 1MB)
-n NICE tail process nice (DEFAULT: 0)
-t TAIL_PATH tail path (DEFAULT: /usr/bin/tail)
-i SECONDS tail -F sleep interval (GNU tail ONLY, DEFAULT: tail default)
-l LOG_PATH log file path (DEFAULT: /tmp/fluent-agent.log)
-P TAG:DATA send a ping message per minute with specified TAG and DATA (DEFAULT: not to send)
(see also: fluent-plugin-ping-message)
-S SECONDS ping message interval seconds (DEFAULT: 60)
-d DRAIN_LOG_TAG emits drain log to fluentd: messages per drain/send (DEFAULT: not to emits)
-k KEEPALIVE_TIME connection keepalive time in seconds. 0 means infinity (DEFAULT: 1800, minimum: 120)
-j use JSON for message structure in transfering (highly experimental)
-v output logs of level debug and info (DEFAULT: warn/crit only)
-F force start even if input file is not found
-h print this message
EOF
exit 0;
}
if ($commandline_options{h}) { HELP_MESSAGE; }
$Log::Minimal::AUTODUMP = 1;
bin/fluent-agent-lite view on Meta::CPAN
tag => $1,
data => $2 . ' ' . $input_file ,
interval => ($commandline_options{S} or 60),
};
}
my $drain_log_tag = undef;
if ($commandline_options{d}) {
$drain_log_tag = $commandline_options{d};
}
my $keepalive_time = undef;
if (defined $commandline_options{k} and $commandline_options{k} >= 0) {
$keepalive_time = $commandline_options{k};
}
my $output_format = undef; # default: MessagePack
if ($commandline_options{j}) {
$output_format = 'json';
}
my $force_start = undef;
if ($commandline_options{F}) {
$force_start = 1;
bin/fluent-agent-lite view on Meta::CPAN
}
my $agent = Fluent::AgentLite->new(
$output_tag,
\@primary_servers,
\@secondary_servers,
{
buffer_size => $buffer_size,
ping_message => $ping_message,
drain_log_tag => $drain_log_tag,
keepalive_time => $keepalive_time,
output_format => $output_format,
},
);
$agent->execute( {
fieldname => $fieldname,
tailfd => $tailfd,
checker => {
term => $checker_terminated,
reconnect => $checker_reconnect,
},
lib/Fluent/AgentLite.pm view on Meta::CPAN
use constant CONNECTION_KEEPALIVE_TIME => 1800; # 30min
use constant CONNECTION_KEEPALIVE_MIN => 120; # min 2min
use constant CONNECTION_KEEPALIVE_MARGIN_MAX => 30; # max 30sec
use constant RECONNECT_WAIT_MIN => 0.5; # 0.5sec
use constant RECONNECT_WAIT_MAX => 3600; # 60min
use constant RECONNECT_WAIT_INCR_RATE => 1.5;
use constant SEND_RETRY_MAX => 4;
sub connection_keepalive_time {
my ($keepalive_time) = @_;
$keepalive_time + int(CONNECTION_KEEPALIVE_MARGIN_MAX * 2 * rand()) - CONNECTION_KEEPALIVE_MARGIN_MAX;
}
sub new {
my $this = shift;
my ($tag, $primary_servers, $secondary_servers, $configuration) = @_;
my $self = {
tag => $tag,
servers => {
primary => $primary_servers,
secondary => $secondary_servers,
},
buffer_size => $configuration->{buffer_size},
ping_message => $configuration->{ping_message},
drain_log_tag => $configuration->{drain_log_tag},
keepalive_time => $configuration->{keepalive_time},
output_format => $configuration->{output_format},
};
if (defined $self->{output_format} and $self->{output_format} eq 'json') {
*pack = *pack_json;
*pack_drainlog = *pack_drainlog_json;
}
srand (time ^ $PID ^ unpack("%L*", `ps axww | gzip`));
lib/Fluent/AgentLite.pm view on Meta::CPAN
my $check_reconnect = ($args->{checker} || {})->{reconnect} || sub { 0 };
my $packer = Data::MessagePack->new();
my $reconnect_wait = RECONNECT_WAIT_MIN;
my $last_ping_message = time;
if ($self->{ping_message}) {
$last_ping_message = time - $self->{ping_message}->{interval} * 2;
}
my $keepalive_time = CONNECTION_KEEPALIVE_TIME;
if (defined $self->{keepalive_time}) {
$keepalive_time = $self->{keepalive_time};
if ($keepalive_time < CONNECTION_KEEPALIVE_MIN and $keepalive_time != CONNECTION_KEEPALIVE_INFINITY) {
warnf 'Keepalive time setting is too short. Set minimum value %s', CONNECTION_KEEPALIVE_MIN;
$keepalive_time = CONNECTION_KEEPALIVE_MIN;
}
}
my $pending_packed;
my $continuous_line;
my $disconnected_primary = 0;
my $expiration_enable = $keepalive_time != CONNECTION_KEEPALIVE_INFINITY;
while(not $check_terminated->()) {
# at here, connection initialized (after retry wait if required)
# connect to servers
my $primary = $self->choose($self->{servers}->{primary});
my $secondary;
my $sock = $self->connect($primary) unless $disconnected_primary;
if (not $sock and $self->{servers}->{secondary}) {
lib/Fluent/AgentLite.pm view on Meta::CPAN
# failed to connect both of primary / secondary
warnf 'failed to connect servers, primary: %s, secondary: %s', $primary, ($secondary || 'none');
warnf 'waiting %s seconds to reconnect', $reconnect_wait;
Time::HiRes::sleep($reconnect_wait);
$reconnect_wait *= RECONNECT_WAIT_INCR_RATE;
$reconnect_wait = RECONNECT_WAIT_MAX if $reconnect_wait > RECONNECT_WAIT_MAX;
next;
}
# succeed to connect. set keepalive disconnect time
my $connecting = $secondary || $primary;
my $expired = time + connection_keepalive_time($keepalive_time) if $expiration_enable;
$reconnect_wait = RECONNECT_WAIT_MIN;
while(not $check_reconnect->()) {
# connection keepalive expired
if ($expiration_enable and time > $expired) {
infof "connection keepalive expired.";
last;
}
# ping message (if enabled)
my $ping_packed = undef;
if ($self->{ping_message} and time >= $last_ping_message + $self->{ping_message}->{interval}) {
$ping_packed = $self->pack_ping_message($packer, $self->{ping_message}->{tag}, $self->{ping_message}->{data});
$last_ping_message = time;
}
( run in 1.300 second using v1.01-cache-2.11-cpan-39bf76dae61 )