Net-Statsd-Server
view release on metacpan or search on metacpan
lib/Net/Statsd/Server/Backend/Elasticsearch.pm view on Meta::CPAN
$payload .= $innerPayload . '}' . "\n";
$key++;
}
$key = 0;
for (@{ $listTimerData }) {
$payload .= '{"index":{"_index":"' . $statsdIndex . '","_type":"' . $elasticTimerType . '_stats"}}' . "\n";
$payload += '{';
my $innerPayload = '';
for my $statKey (keys %{ $listTimerData->[$key] }) {
if ($innerPayload) { $innerPayload .= ','; }
$innerPayload .= '"' . $statKey . '":"' . $listTimerData->[$key]->{$statKey} . '"';
}
$payload .= $innerPayload . '}' . "\n";
$key++;
}
return $payload;
}
sub post_stats {
my ($self, $payload) = @_;
my $optionsPost = {
host => $elasticHost,
port => $elasticPort,
path => $elasticPath . $statsdIndex . '/_bulk',
method => 'POST',
headers => {
'Content-Type' => 'application/json',
'Content-Length' => length($payload),
}
};
my $req = HTTP::Request->new();
$req->method($optionsPost->{method});
$req->uri("http://" . $optionsPost->{host} . ":" . ($optionsPost->{port} || 80) . $optionsPost->{path});
while (my ($name, $value) = each %{ $optionsPost->{headers} }) {
$req->header($name => $value);
}
$req->content($payload);
warn "-------------- Sending to Elasticsearch:\n$payload\n\n";
my $lwp = LWP::UserAgent->new();
$lwp->agent("Net::Statsd::Server::Backend::ElasticSearch/$VERSION");
return $lwp->request($req);
}
sub flush {
my ($self, $timestamp, $metrics) = @_;
my $statString = '';
my $numStats = 0;
my $key;
my @counts;
my @timers;
my @timer_data;
$timestamp *= 1000;
for my $key (keys %{ $metrics->{counters} }) {
#my @keys = split m{\.}, $key;
if (defined $elasticFilter && $key !~ $elasticFilter) {
next;
}
my $value = $metrics->{counters}->{$key};
#push @counts, {
# ns => $keys[0] || '',
# grp => $keys[1] || '',
# tgt => $keys[2] || '',
# act => $keys[3] || '',
# val => $value,
#};
push @counts, {
key => $key,
counter => $value,
'@timestamp' => $timestamp,
};
$numStats++;
}
for my $key (keys %{ $metrics->{timers} }) {
my @keys = split m{\.}, $key;
my $series = $metrics->{timers}->{$key};
if (defined $elasticFilter && $key !~ $elasticFilter) {
next;
}
for my $keyTimer (keys %{ $series }) {
my $value = $series->{$keyTimer};
push @timers, {
key => $key,
timer => $value,
'@timestamp' => $timestamp,
};
# push @timers, {
# ns => $keys[0] || '',
# grp => $keys[1] || '',
# tgt => $keys[2] || '',
# act => $keys[3] || '',
# val => $value,
# '@timestamp' => $timestamp,
# };
}
}
for my $key (keys %{ $metrics->{timer_data} }) {
my @keys = split m{\.}, $key;
if (defined $elasticFilter && $key !~ $elasticFilter) {
next;
}
my $value = $metrics->{timer_data}->{$key};
$value->{'@timestamp'} = $timestamp;
$value->{key} = $key;
if (defined $value->{histogram}) {
for my $keyH (keys %{ $value->{histogram} }) {
$value->{$keyH} = $value->{histogram}->{$keyH};
}
delete $value->{histogram};
}
push @timer_data, $value;
$numStats++;
}
my $es_payload = $self->bulk_insert(\@counts, \@timers, \@timer_data);
if ($numStats > 0 && $es_payload) {
$self->post_stats($es_payload);
if ($debug) {
warn "flushed ${numStats} stats to ElasticSearch\n";
}
}
}
#var elastic_backend_status = function graphite_status(writeCb) {
# for (stat in elasticStats) {
# writeCb(null, 'elastic', stat, elasticStats[stat]);
# }
#};
sub init {
my ($self, $startup_time, $config, $events) = @_;
$debug = $config->{debug};
my $configEs = $config->{elasticsearch} || {};
$elasticHost = $configEs->{host} || 'localhost';
$elasticPort = $configEs->{port} || 9200;
$elasticPath = $configEs->{path} || '/';
$elasticIndex = $configEs->{indexPrefix} || 'statsd';
# Only sends stats that match the 'statsFilter' substring
$elasticFilter = $configEs->{statsFilter} || 'vstatd',
$elasticCountType = $configEs->{countType} || 'counter';
$elasticTimerType = $configEs->{timerType} || 'timer';
$elasticTimerType = $configEs->{timerDataType} || 'timer_data';
$flushInterval = $config->{flushInterval};
$elasticStats->{last_flush} = $startup_time;
$elasticStats->{last_exception} = $startup_time;
#events.on('flush', flush_stats);
#events.on('status', elastic_backend_status);
return 1;
}
1;
( run in 0.996 second using v1.01-cache-2.11-cpan-71847e10f99 )