InfluxDB-Writer

 view release on metacpan or  search on metacpan

lib/InfluxDB/Writer/CompactFiles.pm  view on Meta::CPAN

package InfluxDB::Writer::CompactFiles;

# ABSTRACT: Collect and compact files containing InfluxDB lines
our $VERSION = '1.003'; # VERSION

use strict;
use warnings;
use feature 'say';

use Moose;
use Carp qw(croak);
use Log::Any qw($log);
use File::Spec::Functions;
use Sys::Hostname qw(hostname);
use InfluxDB::LineProtocol qw(line2data data2line);
use Time::Moment;

has 'dir'    => ( is => 'ro', isa => 'Str',     required  => 1 );
has 'tags'   => ( is => 'ro', isa => 'HashRef', predicate => 'has_tags' );
has 'delete' => ( is => 'ro', isa => 'Bool',    default   => 1 );

sub run {
    my $self = shift;

    unless ( -d $self->dir ) {
        croak "Not a directory: " . $self->dir;
    }

    my $now = Time::Moment->now->to_string;
    my $outfile = join('_',hostname(),'stats',$now) . '.compacted';

    my $target = catfile( $self->dir, $outfile );
    open( my $out, ">>", $target ) || die $!;
    $log->infof( "Starting %s of directory %s into %s", __PACKAGE__,
        $self->dir, $target );

    opendir( my $dh, $self->dir );
    while ( my $file = readdir($dh) ) {
        next unless $file =~ /\.stats$/;
        $file =~ /(\d+)\.stats/;
        my $pid = $1;
        my $is_running = kill 0, $pid;
        if ($is_running) {
            $log->debugf( "Skip file %s because pid %i is still running",
                $file, $pid );
            next;
        }
        else {
            $log->infof( "Append file %s to %s", $file, $outfile );
            my $source = catfile( $self->dir, $file );
            open( my $fh, "<", $source );
            while ( my $line = <$fh> ) {
                if ( $self->has_tags ) {
                    $line = $self->add_tags_to_line($line);
                }
                say $out $line;
            }
            if ( $self->delete ) {
                unlink($source) || die "$!";
            }
        }
    }
    system('gzip', $target);
}

sub add_tags_to_line {
    my ( $self, $line ) = @_;

    my ( $measurement, $values, $tags, $timestamp ) = line2data($line);
    my $combined_tags;
    if ($tags) {
        $combined_tags = { %$tags, %{ $self->tags } };
    }
    else {
        $combined_tags = $tags;
    }
    return data2line( $measurement, $values, $combined_tags, $timestamp );
}



( run in 2.029 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )