Fluent-AgentLite
view release on metacpan or search on metacpan
lib/Fluent/AgentLite.pm view on Meta::CPAN
package Fluent::AgentLite;
use strict;
use warnings;
use English;
use Carp;
use POSIX qw(:errno_h);
use Time::HiRes;
use Time::Piece;
use Log::Minimal;
use IO::Socket::INET;
use Data::MessagePack;
our $VERSION = '1.0';
use constant READ_WAIT => 0.1; # 0.1sec
use constant SOCKET_TIMEOUT => 5; # 5sec
use constant CONNECTION_KEEPALIVE_INFINITY => 0;
use constant CONNECTION_KEEPALIVE_TIME => 1800; # 30min
use constant CONNECTION_KEEPALIVE_MIN => 120; # min 2min
use constant CONNECTION_KEEPALIVE_MARGIN_MAX => 30; # max 30sec
use constant RECONNECT_WAIT_MIN => 0.5; # 0.5sec
use constant RECONNECT_WAIT_MAX => 3600; # 60min
use constant RECONNECT_WAIT_INCR_RATE => 1.5;
use constant SEND_RETRY_MAX => 4;
sub connection_keepalive_time {
my ($keepalive_time) = @_;
$keepalive_time + int(CONNECTION_KEEPALIVE_MARGIN_MAX * 2 * rand()) - CONNECTION_KEEPALIVE_MARGIN_MAX;
}
sub new {
my $this = shift;
my ($tag, $primary_servers, $secondary_servers, $configuration) = @_;
my $self = {
tag => $tag,
servers => {
primary => $primary_servers,
secondary => $secondary_servers,
},
buffer_size => $configuration->{buffer_size},
ping_message => $configuration->{ping_message},
drain_log_tag => $configuration->{drain_log_tag},
keepalive_time => $configuration->{keepalive_time},
output_format => $configuration->{output_format},
};
if (defined $self->{output_format} and $self->{output_format} eq 'json') {
*pack = *pack_json;
*pack_drainlog = *pack_drainlog_json;
}
srand (time ^ $PID ^ unpack("%L*", `ps axww | gzip`));
bless $self, $this;
}
sub execute {
my $self = shift;
my $args = shift;
my $fieldname = $args->{fieldname};
my $tailfd = $args->{tailfd};
my $check_terminated = ($args->{checker} || {})->{term} || sub { 0 };
my $check_reconnect = ($args->{checker} || {})->{reconnect} || sub { 0 };
my $packer = Data::MessagePack->new();
my $reconnect_wait = RECONNECT_WAIT_MIN;
my $last_ping_message = time;
if ($self->{ping_message}) {
$last_ping_message = time - $self->{ping_message}->{interval} * 2;
}
my $keepalive_time = CONNECTION_KEEPALIVE_TIME;
if (defined $self->{keepalive_time}) {
$keepalive_time = $self->{keepalive_time};
if ($keepalive_time < CONNECTION_KEEPALIVE_MIN and $keepalive_time != CONNECTION_KEEPALIVE_INFINITY) {
warnf 'Keepalive time setting is too short. Set minimum value %s', CONNECTION_KEEPALIVE_MIN;
$keepalive_time = CONNECTION_KEEPALIVE_MIN;
}
}
my $pending_packed;
my $continuous_line;
my $disconnected_primary = 0;
my $expiration_enable = $keepalive_time != CONNECTION_KEEPALIVE_INFINITY;
while(not $check_terminated->()) {
# at here, connection initialized (after retry wait if required)
# connect to servers
my $primary = $self->choose($self->{servers}->{primary});
my $secondary;
my $sock = $self->connect($primary) unless $disconnected_primary;
if (not $sock and $self->{servers}->{secondary}) {
$secondary = $self->choose($self->{servers}->{secondary});
$sock = $self->connect($self->choose($self->{servers}->{secondary}));
}
$disconnected_primary = 0;
unless ($sock) {
# failed to connect both of primary / secondary
warnf 'failed to connect servers, primary: %s, secondary: %s', $primary, ($secondary || 'none');
warnf 'waiting %s seconds to reconnect', $reconnect_wait;
Time::HiRes::sleep($reconnect_wait);
$reconnect_wait *= RECONNECT_WAIT_INCR_RATE;
$reconnect_wait = RECONNECT_WAIT_MAX if $reconnect_wait > RECONNECT_WAIT_MAX;
next;
}
# succeed to connect. set keepalive disconnect time
my $connecting = $secondary || $primary;
my $expired = time + connection_keepalive_time($keepalive_time) if $expiration_enable;
$reconnect_wait = RECONNECT_WAIT_MIN;
while(not $check_reconnect->()) {
# connection keepalive expired
if ($expiration_enable and time > $expired) {
infof "connection keepalive expired.";
last;
}
# ping message (if enabled)
my $ping_packed = undef;
if ($self->{ping_message} and time >= $last_ping_message + $self->{ping_message}->{interval}) {
$ping_packed = $self->pack_ping_message($packer, $self->{ping_message}->{tag}, $self->{ping_message}->{data});
$last_ping_message = time;
}
# drain (sysread)
my $lines = 0;
if (not $pending_packed) {
my $buffered_lines;
($buffered_lines, $continuous_line, $lines) = $self->drain($tailfd, $continuous_line);
if ($buffered_lines) {
$pending_packed = $self->pack($packer, $fieldname, $buffered_lines);
if ($self->{drain_log_tag}) {
$pending_packed .= $self->pack_drainlog($packer, $self->{drain_log_tag}, $lines);
}
}
if ($ping_packed) {
$pending_packed ||= '';
$pending_packed .= $ping_packed;
}
unless ($pending_packed) {
Time::HiRes::sleep READ_WAIT;
next;
}
}
# send
my $written = $self->send($sock, $pending_packed);
unless ($written) { # failed to write (socket error).
$disconnected_primary = 1 unless $secondary;
last;
}
$pending_packed = undef;
}
if ($check_reconnect->()) {
infof "SIGHUP (or SIGTERM) received";
$disconnected_primary = 0;
$check_reconnect->(1); # clear SIGHUP signal
}
infof "disconnecting to current server";
if ($sock) {
$sock->close;
$sock = undef;
}
infof "disconnected.";
}
if ($check_terminated->()) {
warnf "SIGTERM received";
}
infof "process exit";
}
sub drain {
# if broken child process (undefined return value of $fd->sysread())
( run in 1.332 second using v1.01-cache-2.11-cpan-39bf76dae61 )