Fluent-AgentLite

 view release on metacpan or  search on metacpan

README.md  view on Meta::CPAN

Ping message tag/data and emit interval specification. Without PING\_TAG, fluent-agent-lite doesn't emit ping\_messages.

Actual 'data' field of ping message is 'PING_DATA PATH\_OF\_INPUT\_FILE'.

### DRAIN_LOG_TAG

Tag name of drain\_log (messages count per drain/send to server), which is emitted to configured fluentd sever as fluentd message. Default is none (not to send drain\_log).

### KEEPALIVE_TIME

Connection keepalive time in seconds. 0 means infinity (Default: 1800, minimum: 120)

### LOG_PATH

Log file path for 'fluent-agent-lite' (Default: /tmp/fluent-agent.log).

### LOG_VERBOSE

If you specify 'LOG_VERBOSE="yes"', 'fluent-agent-lite' writes logs with level info/debug (Default: warn/crit only).

## Run

bin/fluent-agent-lite  view on Meta::CPAN

     -s LIST_PATH       secondary servers list (server[:port] per line, random selected one server)
     -b BUF_SIZE        log tailing buffer size (DEFAULT: 1MB)
     -n NICE            tail process nice (DEFAULT: 0)
     -t TAIL_PATH       tail path (DEFAULT: /usr/bin/tail)
     -i SECONDS         tail -F sleep interval (GNU tail ONLY, DEFAULT: tail default)
     -l LOG_PATH        log file path (DEFAULT: /tmp/fluent-agent.log)
     -P TAG:DATA        send a ping message per minute with specified TAG and DATA (DEFAULT: not to send)
                          (see also: fluent-plugin-ping-message)
     -S SECONDS         ping message interval seconds (DEFAULT: 60)
     -d DRAIN_LOG_TAG   emits drain log to fluentd: messages per drain/send (DEFAULT: not to emits)
     -k KEEPALIVE_TIME  connection keepalive time in seconds. 0 means infinity (DEFAULT: 1800, minimum: 120)
     -j                 use JSON for message structure in transfering (highly experimental)
     -v                 output logs of level debug and info (DEFAULT: warn/crit only)
     -F                 force start even if input file is not found
     -h                 print this message
EOF
    exit 0;
}
if ($commandline_options{h}) { HELP_MESSAGE; }

$Log::Minimal::AUTODUMP = 1;

bin/fluent-agent-lite  view on Meta::CPAN

        tag => $1,
        data => $2 . ' ' . $input_file ,
        interval => ($commandline_options{S} or 60),
    };
}

my $drain_log_tag = undef;
if ($commandline_options{d}) {
    $drain_log_tag = $commandline_options{d};
}
my $keepalive_time = undef;
if (defined $commandline_options{k} and $commandline_options{k} >= 0) {
    $keepalive_time = $commandline_options{k};
}

my $output_format = undef; # default: MessagePack
if ($commandline_options{j}) {
    $output_format = 'json';
}

my $force_start = undef;
if ($commandline_options{F}) {
    $force_start = 1;

bin/fluent-agent-lite  view on Meta::CPAN

    }

    my $agent = Fluent::AgentLite->new(
        $output_tag,
        \@primary_servers,
        \@secondary_servers,
        {
            buffer_size => $buffer_size,
            ping_message => $ping_message,
            drain_log_tag => $drain_log_tag,
            keepalive_time => $keepalive_time,
            output_format => $output_format,
        },
    );
    $agent->execute( {
        fieldname => $fieldname,
        tailfd => $tailfd,
        checker => {
            term => $checker_terminated,
            reconnect => $checker_reconnect,
        },

lib/Fluent/AgentLite.pm  view on Meta::CPAN

use constant CONNECTION_KEEPALIVE_TIME => 1800; # 30min
use constant CONNECTION_KEEPALIVE_MIN => 120; # min 2min
use constant CONNECTION_KEEPALIVE_MARGIN_MAX => 30; # max 30sec

use constant RECONNECT_WAIT_MIN => 0.5;  # 0.5sec
use constant RECONNECT_WAIT_MAX => 3600; # 60min
use constant RECONNECT_WAIT_INCR_RATE => 1.5;

use constant SEND_RETRY_MAX => 4;

sub connection_keepalive_time {
    my ($keepalive_time) = @_;
    $keepalive_time + int(CONNECTION_KEEPALIVE_MARGIN_MAX * 2 * rand()) - CONNECTION_KEEPALIVE_MARGIN_MAX;
}

sub new {
    my $this = shift;
    my ($tag, $primary_servers, $secondary_servers, $configuration) = @_;
    my $self = {
        tag => $tag,
        servers => {
            primary => $primary_servers,
            secondary => $secondary_servers,
        },
        buffer_size => $configuration->{buffer_size},
        ping_message => $configuration->{ping_message},
        drain_log_tag => $configuration->{drain_log_tag},
        keepalive_time => $configuration->{keepalive_time},
        output_format => $configuration->{output_format},
    };

    if (defined $self->{output_format} and $self->{output_format} eq 'json') {
        *pack = *pack_json;
        *pack_drainlog = *pack_drainlog_json;
    }

    srand (time ^ $PID ^ unpack("%L*", `ps axww | gzip`));

lib/Fluent/AgentLite.pm  view on Meta::CPAN

    my $check_reconnect = ($args->{checker} || {})->{reconnect} || sub { 0 };

    my $packer = Data::MessagePack->new();

    my $reconnect_wait = RECONNECT_WAIT_MIN;

    my $last_ping_message = time;
    if ($self->{ping_message}) {
        $last_ping_message = time - $self->{ping_message}->{interval} * 2;
    }
    my $keepalive_time = CONNECTION_KEEPALIVE_TIME;
    if (defined $self->{keepalive_time}) {
        $keepalive_time = $self->{keepalive_time};
        if ($keepalive_time < CONNECTION_KEEPALIVE_MIN and $keepalive_time != CONNECTION_KEEPALIVE_INFINITY) {
            warnf 'Keepalive time setting is too short. Set minimum value %s', CONNECTION_KEEPALIVE_MIN;
            $keepalive_time = CONNECTION_KEEPALIVE_MIN;
        }
    }

    my $pending_packed;
    my $continuous_line;
    my $disconnected_primary = 0;
    my $expiration_enable = $keepalive_time != CONNECTION_KEEPALIVE_INFINITY;

    while(not $check_terminated->()) {
        # at here, connection initialized (after retry wait if required)

        # connect to servers
        my $primary = $self->choose($self->{servers}->{primary});
        my $secondary;

        my $sock = $self->connect($primary) unless $disconnected_primary;
        if (not $sock and $self->{servers}->{secondary}) {

lib/Fluent/AgentLite.pm  view on Meta::CPAN

            # failed to connect both of primary / secondary
            warnf 'failed to connect servers, primary: %s, secondary: %s', $primary, ($secondary || 'none');
            warnf 'waiting %s seconds to reconnect', $reconnect_wait;

            Time::HiRes::sleep($reconnect_wait);
            $reconnect_wait *= RECONNECT_WAIT_INCR_RATE;
            $reconnect_wait = RECONNECT_WAIT_MAX if $reconnect_wait > RECONNECT_WAIT_MAX;
            next;
        }

        # succeed to connect. set keepalive disconnect time
        my $connecting = $secondary || $primary;

        my $expired = time + connection_keepalive_time($keepalive_time) if $expiration_enable;
        $reconnect_wait = RECONNECT_WAIT_MIN;

        while(not $check_reconnect->()) {
            # connection keepalive expired
            if ($expiration_enable and time > $expired) {
                infof "connection keepalive expired.";
                last;
            }

            # ping message (if enabled)
            my $ping_packed = undef;
            if ($self->{ping_message} and time >= $last_ping_message + $self->{ping_message}->{interval}) {
                $ping_packed = $self->pack_ping_message($packer, $self->{ping_message}->{tag}, $self->{ping_message}->{data});
                $last_ping_message = time;
            }



( run in 1.300 second using v1.01-cache-2.11-cpan-39bf76dae61 )