POE-Component-ElasticSearch-Indexer
view release on metacpan or search on metacpan
lib/POE/Component/ElasticSearch/Indexer.pm view on Meta::CPAN
package POE::Component::ElasticSearch::Indexer;
# ABSTRACT: POE session to index data to ElasticSearch
use strict;
use warnings;
use version;
our $VERSION = '0.017'; # VERSION
use Const::Fast;
use Digest::MD5 qw(md5_hex);
use Digest::SHA1 qw(sha1_hex);
use Fcntl qw(:flock);
use HTTP::Request;
use JSON::MaybeXS;
use List::Util qw(shuffle);
use Log::Log4perl qw(:easy);
use Path::Tiny;
use POSIX qw(strftime);
use Ref::Util qw(is_ref is_arrayref is_blessed_ref is_hashref is_coderef);
use Time::HiRes qw(time);
use URI;
use POE qw(
Component::SSLify
Component::Client::HTTP
Component::Client::Keepalive
);
sub spawn {
my %params = @_;
# Setup Logging
my $loggingConfig = exists $params{LoggingConfig} && -f $params{LoggingConfig} ? $params{LoggingConfig}
: \q{
log4perl.logger = DEBUG, Sync
log4perl.appender.File = Log::Log4perl::Appender::File
log4perl.appender.File.layout = PatternLayout
log4perl.appender.File.layout.ConversionPattern = %d [%P] %p - %m%n
log4perl.appender.File.filename = es_indexer.log
log4perl.appender.File.mode = truncate
log4perl.appender.Sync = Log::Log4perl::Appender::Synchronized
log4perl.appender.Sync.appender = File
};
Log::Log4perl->init($loggingConfig) unless Log::Log4perl->initialized;
# Build Configuration
my %CONFIG = (
Alias => 'es',
Servers => [qw(localhost)],
Timeout => 5,
FlushInterval => 30,
FlushSize => 1_000,
DefaultIndex => 'logs-%Y.%m.%d',
DefaultType => '_doc',
BatchDir => '/tmp/es_index_backlog',
StatsInterval => 60,
BacklogInterval => 60,
CleanupInterval => 60,
PoolConnections => 1,
KeepAliveTimeout => 2,
MaxConnsPerServer => 3,
MaxPendingRequests => 5,
MaxRecoveryBatches => 10,
MaxFailedRatio => 0.8,
AuthUsername => $ENV{USER},
Protocol => 'http',
%params,
);
if( $CONFIG{BatchDiskSpace} ) {
# Human Readable to Computer Readable
if( my ($size,$unit) = ($CONFIG{BatchDiskSpace} =~ /(\d+(?:\.\d+)?)\s*([kmgt])b?/i) ) {
$unit = lc $unit;
$CONFIG{BatchDiskSpace} = $unit eq 'k' ? $size * 1_000
: $unit eq 'm' ? $size * 1_000_000
: $unit eq 'g' ? $size * 1_000_000_000
: $unit eq 't' ? $size * 1_000_000_000_000
: $size;
}
( run in 1.400 second using v1.01-cache-2.11-cpan-75ffa21a3d4 )