App-ElasticSearch-Utilities
view release on metacpan or search on metacpan
scripts/es-search.pl view on Meta::CPAN
# App Config
my %CONFIG = (
size => ($OPT{size} && $OPT{size} > 0) ? int($OPT{size}) : 20,
format => $OPT{json} ? 'json'
: $OPT{format} ? lc $OPT{format}
: 'yaml',
'max-batch-size' => $OPT{'max-batch-size'} || 50,
precision => $OPT{precision} || 3,
$OPT{timestamp} ? ( timestamp => $OPT{timestamp} ) : (),
);
$OPT{'no-decorators'} = 1 if $CONFIG{format} eq 'json';
$CONFIG{pretty} = $OPT{pretty} ? 1
: $CONFIG{format} =~ /pretty/ ? 1
: 0;
$CONFIG{decimal_format} = "%0.$CONFIG{precision}f";
#------------------------------------------------------------------------#
# Handle Indices
my $ORDER = exists $OPT{asc} && $OPT{asc} ? 'asc' : 'desc';
$ORDER = 'asc' if exists $OPT{tail};
my %by_age = ();
my %indices = map { $_ => (es_index_days_old($_) || 0) } es_indices();
die "# Failed to retrieve any indices using your paramaters." unless keys %indices;
my %FIELDS = ();
my $TimeStampCheck=0;
foreach my $index (sort by_index_age keys %indices) {
my $age = $indices{$index};
$by_age{$age} ||= [];
push @{ $by_age{$age} }, $index;
my $fields = es_index_fields($index);
foreach my $k ( keys %{ $fields } ) {
$FIELDS{$k} = $fields->{$k}
unless $FIELDS{$k};
}
# Lookup the Index in our local YAML
if( !$TimeStampCheck ) {
$TimeStampCheck++;
$CONFIG{timestamp} ||= es_local_index_meta(timestamp => $index);
}
}
# Set fields so we know how to construct complex aggs
$q->fields_meta( \%FIELDS );
#------------------------------------------------------------------------#
# Figure out the timestamp
$CONFIG{timestamp} ||= es_globals('timestamp') || '@timestamp';
debug_var(\%by_age);
my @AGES = sort { $ORDER eq 'asc' ? $b <=> $a : $a <=> $b } keys %by_age;
# Figure out if we summarize
$CONFIG{summary} = @AGES > 1 && $OPT{top} && ( !$OPT{by} && !$OPT{with} && !$OPT{interval} );
debug({color=>"cyan"}, "Fields discovered.");
if( $OPT{fields} ) {
show_fields();
exit 0;
}
# Attempt date autodiscovery
if( !exists $FIELDS{$CONFIG{timestamp}} ) {
my @dates = grep { $FIELDS{$_}->{type} eq 'date' } keys %FIELDS;
if( @dates == 0 ) {
output({color=>'red',stderr=>1},"FATAL: No date fields found in the indices specified" );
exit 1;
}
elsif( @dates == 1 ) {
output({color=>'yellow',stderr=>1}, "WARNING: Timestamp field '$CONFIG{timestamp}' not found, using '$dates[0]' instead");
$CONFIG{timestamp} = $dates[0];
}
else {
output({color=>'red',stderr=>1},
sprintf "FATAL: Timestamp field '%s' not found and discovered multiple date fields: %s",
$CONFIG{timestamp},
join(', ', sort @dates)
);
output({color=>'yellow',indent=>1}, "Try again with '--timestamp $dates[0]' for example.");
exit 1;
}
}
# Which fields to show
my @SHOW = ();
if ( exists $OPT{show} && scalar @{ $OPT{show} } ) {
foreach my $args (@{ $OPT{show} }) {
push @SHOW, grep { defined && length } split /,/, $args;
}
$q->set_fields([$CONFIG{timestamp},@SHOW]);
}
# How to sort
my $SORT = [ { $CONFIG{timestamp} => $ORDER } ];
if( exists $OPT{sort} && length $OPT{sort} ) {
$SORT = [
map { /:/ ? +{ split /:/ } : $_ }
split /,/,
$OPT{sort}
]
}
$q->set_sort($SORT);
# Improper Usage
pod2usage({-exitval=>1, -verbose=>0, -sections=>'SYNOPSIS', -msg=>'No search string specified'})
unless keys %{ $q->query };
pod2usage({-exitval=>1, -verbose=>0, -sections=>'SYNOPSIS', -msg=>'Cannot use --tail and --top together'})
if exists $OPT{tail} && $OPT{top};
pod2usage({-exitval=>1, -verbose=>0, -sections=>'SYNOPSIS', -msg=>'Cannot use --tail and --sort together'})
if exists $OPT{tail} && $OPT{sort};
pod2usage({-exitval=>1, -verbose=>0, -sections=>'SYNOPSIS', -msg=>'Cannot use --sort along with --asc or --desc'})
if $OPT{sort} && ($OPT{asc} || $OPT{desc});
pod2usage({-exitval=>1, -verbose=>0, -sections=>'SYNOPSIS', -msg=>'Please specify --show or --jq with --tail'})
if exists $OPT{tail} && !( @SHOW || $OPT{json});
# Process extra parameters
foreach my $presence ( qw( exists missing ) ) {
if( exists $OPT{$presence} ) {
my @fields = map { split /[,:]/ } (is_arrayref($OPT{$presence}) ? @{ $OPT{$presence} } : ($OPT{$presence}));
my $context = $presence eq 'exists' ? 'must' : 'must_not';
foreach my $field (@fields) {
$q->add_bool( $context => { exists => { field => $field } } );
}
}
}
my %SUPPORTED_AGGREGATIONS = map {$_=>'simple_value'} qw(cardinality sum min max avg value_count);
my $agg_header = '';
if( exists $OPT{top} ) {
my @top = split /:/, $OPT{top};
my $top_field = pop @top;
my $top_agg = @top ? shift @top : 'terms';
my @agg_fields = grep { exists $FIELDS{$_} } split /\s*,\s*/, $top_field;
croak(sprintf("Option --top takes a field, found %d fields: %s\n", scalar(@agg_fields),join(',',@agg_fields)))
scripts/es-search.pl view on Meta::CPAN
my %displayed_indices = ();
my $TOTAL_HITS = 0;
my $OUT_OF = 0;
my $last_hit_ts = undef;
my $duration = 0;
my $displayed = 0;
my $header = 0;
my $age = undef;
my %last_batch_id = ();
my %AGGS_TOTALS = ();
my %AGES_SEEN = ();
# Handle CTRL+C During the Loop
my $DONE = 0;
local $SIG{INT} = sub { $DONE=1 };
verbose({color=>'green',level=>1}, "= Query setup complete, beginning request.");
AGES: while( !$DONE && @AGES ) {
# With --tail, we don't want to deplete @AGES
$age = $OPT{tail} ? $AGES[0] : shift @AGES;
# Pause for 200ms if we're tailing
sleep(0.2) if exists $OPT{tail} && $last_hit_ts;
my $start=time();
$last_hit_ts ||= strftime('%Y-%m-%dT%H:%M:%S%z',localtime($start-30));
# If we're tailing, bump the @query with a timestamp range
$q->stash( filter => {range => { $CONFIG{timestamp} => {gte => $last_hit_ts}}} ) if $OPT{tail};
# Header
if( !exists $AGES_SEEN{$age} ) {
output({color=>'yellow'}, "= Querying Indexes: " . join(',', @{ $by_age{$age} })) unless $OPT{'no-decorators'};
$AGES_SEEN{$age}=1;
$header=0;
}
debug("== Request Parameters");
debug_var($q->uri_params);
debug("== Query");
debug(to_json $q->request_body,{allow_nonref=>1,canonical=>1,pretty=>1});
# Execute the query
my $result = $q->execute( $by_age{$age} );
debug({clear=>1},"== Results");
debug_var($result);
$duration += time() - $start;
# Advance if we don't have a result
next unless defined $result;
if ( $result->{error} ) {
my $simple_error;
eval {
$simple_error = $result->{error}{caused_by}{caused_by}{reason};
} or do {
($simple_error) = $result->{error} =~ m/(QueryParsingException\[\[[^\]]+\][^\]]+\]\]);/;
};
$simple_error ||= '';
output({stderr=>1,color=>'red'},
"# Received an error from the cluster. $simple_error"
);
last;
}
$displayed_indices{$_} = 1 for @{ $by_age{$age} };
$TOTAL_HITS += $result->{hits}{total} if $result->{hits}{total};
my @always = ();
push @always, $CONFIG{timestamp} unless $OPT{'no-decorators'};
if(!$header && @SHOW) {
output({color=>'cyan'}, join("\t", @always, @SHOW));
$header++;
}
while( $result && !$DONE ) {
my $hits = is_arrayref($result->{hits}{hits}) ? $result->{hits}{hits} : [];
# Handle Aggregations
if( my $aggregations = $result->{aggregations} ) {
display_aggregations($aggregations, $result->{hits}{total});
next AGES;
}
# Reset the last batch ID if we have new data
%last_batch_id = () if @{$hits} > 0 && $last_hit_ts ne $hits->[-1]->{_source}{$CONFIG{timestamp}};
debug({color=>'magenta'}, "+ ID cache is now empty.") unless keys %last_batch_id;
foreach my $hit (@{ $hits }) {
# Skip if we've seen this record
next if exists $last_batch_id{$hit->{_id}};
$last_hit_ts = $hit->{_source}{$CONFIG{timestamp}};
$last_batch_id{$hit->{_id}}=1;
my $record = {};
# Add the _id field to the source so that it is listed
# when showing full records and can be and can be
# used in @SHOW
$hit->{_source}->{_id} = $hit->{_id} unless defined($hit->{_source}->{_id});
if( @SHOW ) {
my $flat = es_flatten_hash( $hit->{_source} );
debug_var($flat);
foreach my $f (@always) {
$record->{$f} = $flat->{$f};
}
foreach my $f (@SHOW) {
my $value = undef;
if( exists $flat->{$f} ) {
$value = $flat->{$f};
}
elsif( my $v = document_lookdown($hit->{_source},$f) ) {
$value = $v;
}
elsif(index($f, '.') > 0) {
# Try path matching the key
my @values = ();
foreach my $k (keys %{ $flat }) {
if( index($k,$f) == 0 ) {
push @values, $flat->{$k};
}
elsif( $k =~ /\.\d+\./ ) {
my $flatter = join '.', grep { !/^\d+$/ } split /\./, $k;
if ( $flatter eq $f ) {
push @values, $flat->{$k};
}
}
}
$value = @values ? @values == 1 ? $values[0] : \@values : undef;
}
$record->{$f} = $value;
}
}
else {
$record = $hit->{_source};
}
# Determine how this record is output
my $output = undef;
if( @SHOW ) {
my @cols=();
foreach my $f (@always,@SHOW) {
my $v = '-';
if( exists $record->{$f} && defined $record->{$f} ) {
$v = is_arrayref($record->{$f}) && @{ $record->{$f} } == 1 ? $record->{$f}[0]
: is_ref($record->{$f}) ? to_json($record->{$f},{allow_nonref=>1,canonical=>1})
: $record->{$f};
}
push @cols,$v;
}
$output = join("\t",@cols);
}
else {
$output = $CONFIG{format} =~ /^json/? to_json($record,{allow_nonref=>1,canonical=>1,pretty=>$CONFIG{pretty}})
: Dump $record;
}
output({data=>1}, $output);
$displayed++;
last if all_records_displayed();
}
last if all_records_displayed();
# Scroll forward
$start = time;
$result = $q->scroll_results();
$duration += time - $start;
# Check if we need to keep going
last unless defined $result;
last unless $result->{hits} && $result->{hits}{hits} && @{ $result->{hits}{hits} } > 0
}
last if all_records_displayed();
}
output({stderr=>1,color=>'yellow'},
"# Search Parameters:",
(map { "# $_" } split /\r?\n/, to_json($q->query,{allow_nonref=>1,canonical=>1,pretty=>$CONFIG{pretty}})),
sprintf("# Displaying %d of %d results%s took %0.2f seconds.",
$displayed,
$OUT_OF || $TOTAL_HITS,
$OUT_OF ? " in $TOTAL_HITS documents" : '',
$duration,
),
sprintf("# Indexes (%d of %d) searched: %s\n",
scalar(keys %displayed_indices),
scalar(keys %indices),
join(',', sort keys %displayed_indices)
),
) unless $OPT{'no-decorators'};
if($CONFIG{summary} && keys %AGGS_TOTALS) {
unless ( $OPT{'no-decorators'} ) {
output({color=>'yellow'}, '#', '# Totals across batch', '#');
output({color=>'cyan'},$agg_header);
}
foreach my $k (sort { $AGGS_TOTALS{$b} <=> $AGGS_TOTALS{$a} } keys %AGGS_TOTALS) {
output({data=>1,color=>'green'}, join "\t",
$AGGS_TOTALS{$k}, sprintf($CONFIG{decimal_format}, $AGGS_TOTALS{$k} / $TOTAL_HITS), $k
);
}
}
sub all_records_displayed {
return 1 if $DONE;
return 0 if exists $OPT{tail};
return 0 if exists $OPT{all};
return 1 if $displayed >= $CONFIG{size};
return 0;
}
sub document_lookdown {
my ($href,$field) = @_;
return $href->{$field} if exists $href->{$field};
foreach my $k (keys %{ $href }) {
if( is_hashref($href->{$k}) ) {
return document_lookdown($href->{$k},$field);
}
}
return;
}
sub show_fields {
output({color=>'cyan'}, 'Fields available for search:' );
my $total = 0;
my %types = ();
foreach my $field (sort keys %FIELDS) {
$total++;
my $type = $FIELDS{$field}->{type};
$types{$type} ||= 0;
$types{$type}++;
my $color = $type eq 'ip' ? 'magenta'
: $type eq 'text' ? 'red'
: $type =~ /float|integer|short|byte|double/ ? 'cyan'
( run in 1.200 second using v1.01-cache-2.11-cpan-39bf76dae61 )