InfluxDB-Writer

 view release on metacpan or  search on metacpan

lib/InfluxDB/Writer/FileTailer.pm  view on Meta::CPAN

package InfluxDB::Writer::FileTailer;

# ABSTRACT: Tail files and send lines to InfluxDB
our $VERSION = '1.003'; # VERSION

use strict;
use warnings;
use feature 'say';

use Moose;
use IO::Async::File;
use IO::Async::FileStream;
use IO::Async::Loop;
use Hijk ();
use Carp qw(croak);
use InfluxDB::LineProtocol qw(line2data data2line);
use Log::Any qw($log);
use File::Spec::Functions;
use Cwd 'abs_path';

with qw(InfluxDB::Writer::AuthHeaderRole);

has 'dir'         => ( is => 'ro', isa => 'Str', required => 1 );
has 'influx_host' => ( is => 'ro', isa => 'Str', required => 1 );
has 'influx_port' =>
    ( is => 'ro', isa => 'Int', default => 8086, required => 1 );
has 'influx_db' => ( is => 'ro', isa => 'Str', required => 1 );

has 'flush_size' =>
    ( is => 'ro', isa => 'Int', required => 1, default => 1000 );
has 'flush_interval' =>
    ( is => 'ro', isa => 'Int', required => 1, default => 30 );
has 'tags' => ( is => 'ro', isa => 'HashRef', predicate => 'has_tags' );
has '_files' => ( is => 'ro', isa => 'HashRef', default => sub { {} } );
has '_loop' => ( is => 'ro', isa => 'IO::Async::Loop', lazy_build => 1 );
has 'buffer' => ( is => 'ro', isa => 'ArrayRef[Str]', default => sub { [] }, traits => ['Array'],
    handles => {
        buffer_push => 'push',
        buffer_all => 'elements',
        buffer_size => 'count',
        buffer_splice => 'splice',
        buffer_is_empty => 'is_empty',
    },

);

sub _build__loop {
    return IO::Async::Loop->new;
}

sub run {
    my $self = shift;

    unless ( -d $self->dir ) {
        croak "Not a directory: " . $self->dir;
    }

    $log->infof( "Starting %s in directory %s", __PACKAGE__, $self->dir );

    $self->watch_dir;

    my $dir = IO::Async::File->new(
        filename         => $self->dir,
        on_mtime_changed => sub {
            $self->watch_dir;
        },
    );

    $self->_loop->add($dir);

    my $timer = IO::Async::Timer::Periodic->new(    # could be Countdown
        interval => $self->flush_interval,
        on_tick  => sub {
            $self->send;
        },
    );
    $timer->start;
    $self->_loop->add($timer);

    $self->_loop->run;
}


sub cleanup_hook {}
sub archive_hook {}

sub watch_dir {
    my ($self) = @_;

    $log->infof( "Checking for new files to watch in %s", $self->dir );
    opendir( my $dh, $self->dir );
    while ( my $f = readdir($dh) ) {
        next unless $f =~ /\.stats$/;
        if ( my $watcher =
            $self->setup_file_watcher( catfile( $self->dir, $f ) ) ) {
            $self->_loop->add($watcher);



( run in 0.697 second using v1.01-cache-2.11-cpan-fe3c2283af0 )