POE-Component-ElasticSearch-Indexer

 view release on metacpan or  search on metacpan

lib/POE/Component/ElasticSearch/Indexer.pm  view on Meta::CPAN

package POE::Component::ElasticSearch::Indexer;
# ABSTRACT: POE session to index data to ElasticSearch

use strict;
use warnings;
use version;

our $VERSION = '0.017'; # VERSION

use Const::Fast;
use Digest::MD5 qw(md5_hex);
use Digest::SHA1 qw(sha1_hex);
use Fcntl qw(:flock);
use HTTP::Request;
use JSON::MaybeXS;
use List::Util qw(shuffle);
use Log::Log4perl qw(:easy);
use Path::Tiny;
use POSIX qw(strftime);
use Ref::Util qw(is_ref is_arrayref is_blessed_ref is_hashref is_coderef);
use Time::HiRes qw(time);
use URI;

use POE qw(
    Component::SSLify
    Component::Client::HTTP
    Component::Client::Keepalive
);


sub spawn {
    my %params = @_;

    # Setup Logging
    my $loggingConfig = exists $params{LoggingConfig} && -f $params{LoggingConfig} ? $params{LoggingConfig}
                      : \q{
                            log4perl.logger = DEBUG, Sync
                            log4perl.appender.File = Log::Log4perl::Appender::File
                            log4perl.appender.File.layout   = PatternLayout
                            log4perl.appender.File.layout.ConversionPattern = %d [%P] %p - %m%n
                            log4perl.appender.File.filename = es_indexer.log
                            log4perl.appender.File.mode = truncate
                            log4perl.appender.Sync = Log::Log4perl::Appender::Synchronized
                            log4perl.appender.Sync.appender = File
                        };
    Log::Log4perl->init($loggingConfig) unless Log::Log4perl->initialized;

    # Build Configuration
    my %CONFIG = (
        Alias              => 'es',
        Servers            => [qw(localhost)],
        Timeout            => 5,
        FlushInterval      => 30,
        FlushSize          => 1_000,
        DefaultIndex       => 'logs-%Y.%m.%d',
        DefaultType        => '_doc',
        BatchDir           => '/tmp/es_index_backlog',
        StatsInterval      => 60,
        BacklogInterval    => 60,
        CleanupInterval    => 60,
        PoolConnections    => 1,
        KeepAliveTimeout   => 2,
        MaxConnsPerServer  => 3,
        MaxPendingRequests => 5,
        MaxRecoveryBatches => 10,
        MaxFailedRatio     => 0.8,
        AuthUsername       => $ENV{USER},
        Protocol           => 'http',
        %params,
    );
    if( $CONFIG{BatchDiskSpace} ) {
        # Human Readable to Computer Readable
        if( my ($size,$unit) = ($CONFIG{BatchDiskSpace} =~ /(\d+(?:\.\d+)?)\s*([kmgt])b?/i) ) {
            $unit = lc $unit;
            $CONFIG{BatchDiskSpace} = $unit eq 'k' ? $size * 1_000
                                    : $unit eq 'm' ? $size * 1_000_000
                                    : $unit eq 'g' ? $size * 1_000_000_000
                                    : $unit eq 't' ? $size * 1_000_000_000_000
                                    : $size;
        }



( run in 1.400 second using v1.01-cache-2.11-cpan-75ffa21a3d4 )