InfluxDB-Writer
view release on metacpan or search on metacpan
lib/InfluxDB/Writer/FileTailer.pm view on Meta::CPAN
package InfluxDB::Writer::FileTailer;
# ABSTRACT: Tail files and send lines to InfluxDB
our $VERSION = '1.003'; # VERSION
use strict;
use warnings;
use feature 'say';
use Moose;
use IO::Async::File;
use IO::Async::FileStream;
use IO::Async::Loop;
use Hijk ();
use Carp qw(croak);
use InfluxDB::LineProtocol qw(line2data data2line);
use Log::Any qw($log);
use File::Spec::Functions;
use Cwd 'abs_path';
with qw(InfluxDB::Writer::AuthHeaderRole);
has 'dir' => ( is => 'ro', isa => 'Str', required => 1 );
has 'influx_host' => ( is => 'ro', isa => 'Str', required => 1 );
has 'influx_port' =>
( is => 'ro', isa => 'Int', default => 8086, required => 1 );
has 'influx_db' => ( is => 'ro', isa => 'Str', required => 1 );
has 'flush_size' =>
( is => 'ro', isa => 'Int', required => 1, default => 1000 );
has 'flush_interval' =>
( is => 'ro', isa => 'Int', required => 1, default => 30 );
has 'tags' => ( is => 'ro', isa => 'HashRef', predicate => 'has_tags' );
has '_files' => ( is => 'ro', isa => 'HashRef', default => sub { {} } );
has '_loop' => ( is => 'ro', isa => 'IO::Async::Loop', lazy_build => 1 );
has 'buffer' => ( is => 'ro', isa => 'ArrayRef[Str]', default => sub { [] }, traits => ['Array'],
handles => {
buffer_push => 'push',
buffer_all => 'elements',
buffer_size => 'count',
buffer_splice => 'splice',
buffer_is_empty => 'is_empty',
},
);
sub _build__loop {
return IO::Async::Loop->new;
}
sub run {
my $self = shift;
unless ( -d $self->dir ) {
croak "Not a directory: " . $self->dir;
}
$log->infof( "Starting %s in directory %s", __PACKAGE__, $self->dir );
$self->watch_dir;
my $dir = IO::Async::File->new(
filename => $self->dir,
on_mtime_changed => sub {
$self->watch_dir;
},
);
$self->_loop->add($dir);
my $timer = IO::Async::Timer::Periodic->new( # could be Countdown
interval => $self->flush_interval,
on_tick => sub {
$self->send;
},
);
$timer->start;
$self->_loop->add($timer);
$self->_loop->run;
}
sub cleanup_hook {}
sub archive_hook {}
sub watch_dir {
my ($self) = @_;
$log->infof( "Checking for new files to watch in %s", $self->dir );
opendir( my $dh, $self->dir );
while ( my $f = readdir($dh) ) {
next unless $f =~ /\.stats$/;
if ( my $watcher =
$self->setup_file_watcher( catfile( $self->dir, $f ) ) ) {
$self->_loop->add($watcher);
( run in 0.697 second using v1.01-cache-2.11-cpan-fe3c2283af0 )