App-ElasticSearch-Utilities

 view release on metacpan or  search on metacpan

scripts/es-search.pl  view on Meta::CPAN

# App Config
my %CONFIG = (
    size      => ($OPT{size} && $OPT{size} > 0) ? int($OPT{size}) : 20,
    format    => $OPT{json}   ? 'json'
               : $OPT{format} ? lc $OPT{format}
               : 'yaml',
    'max-batch-size' => $OPT{'max-batch-size'} || 50,
    precision => $OPT{precision} || 3,
    $OPT{timestamp} ? ( timestamp => $OPT{timestamp} ) : (),
);
$OPT{'no-decorators'} = 1 if $CONFIG{format} eq 'json';
$CONFIG{pretty} = $OPT{pretty} ? 1
                : $CONFIG{format} =~ /pretty/ ? 1
                : 0;
$CONFIG{decimal_format} = "%0.$CONFIG{precision}f";
#------------------------------------------------------------------------#
# Handle Indices
my $ORDER = exists $OPT{asc} && $OPT{asc} ? 'asc' : 'desc';
$ORDER = 'asc' if exists $OPT{tail};
my %by_age = ();
my %indices = map { $_ => (es_index_days_old($_) || 0) } es_indices();
die "# Failed to retrieve any indices using your paramaters." unless keys %indices;
my %FIELDS = ();
my $TimeStampCheck=0;
foreach my $index (sort by_index_age keys %indices) {
    my $age = $indices{$index};
    $by_age{$age} ||= [];
    push @{ $by_age{$age} }, $index;
    my $fields = es_index_fields($index);
    foreach my $k ( keys %{ $fields } ) {
        $FIELDS{$k} = $fields->{$k}
            unless $FIELDS{$k};
    }
    # Lookup the Index in our local YAML
    if( !$TimeStampCheck ) {
        $TimeStampCheck++;
        $CONFIG{timestamp} ||= es_local_index_meta(timestamp => $index);
    }
}

# Set fields so we know how to construct complex aggs
$q->fields_meta( \%FIELDS );

#------------------------------------------------------------------------#
# Figure out the timestamp
$CONFIG{timestamp} ||= es_globals('timestamp') || '@timestamp';
debug_var(\%by_age);
my @AGES = sort { $ORDER eq 'asc' ? $b <=> $a : $a <=> $b } keys %by_age;
# Figure out if we summarize
$CONFIG{summary} = @AGES > 1 && $OPT{top} && ( !$OPT{by} && !$OPT{with} && !$OPT{interval} );
debug({color=>"cyan"}, "Fields discovered.");

if( $OPT{fields} ) {
    show_fields();
    exit 0;
}
# Attempt date autodiscovery
if( !exists $FIELDS{$CONFIG{timestamp}} ) {
    my @dates = grep { $FIELDS{$_}->{type} eq 'date' } keys %FIELDS;
    if( @dates == 0 ) {
        output({color=>'red',stderr=>1},"FATAL: No date fields found in the indices specified" );
        exit 1;
    }
    elsif( @dates == 1 ) {
        output({color=>'yellow',stderr=>1}, "WARNING: Timestamp field '$CONFIG{timestamp}' not found, using '$dates[0]' instead");
        $CONFIG{timestamp} = $dates[0];
    }
    else {
        output({color=>'red',stderr=>1},
            sprintf "FATAL: Timestamp field '%s' not found and discovered multiple date fields: %s",
                $CONFIG{timestamp},
                join(', ', sort @dates)
        );
        output({color=>'yellow',indent=>1}, "Try again with '--timestamp $dates[0]' for example.");
        exit 1;
    }
}

# Which fields to show
my @SHOW = ();
if ( exists $OPT{show} && scalar @{ $OPT{show} } ) {
    foreach my $args (@{ $OPT{show} }) {
        push @SHOW, grep { defined && length } split /,/, $args;
    }
    $q->set_fields([$CONFIG{timestamp},@SHOW]);
}
# How to sort
my $SORT = [ { $CONFIG{timestamp} => $ORDER } ];
if( exists $OPT{sort} && length $OPT{sort} ) {
    $SORT = [
        map { /:/ ? +{ split /:/ } : $_ }
        split /,/,
        $OPT{sort}
    ]
}
$q->set_sort($SORT);

# Improper Usage
pod2usage({-exitval=>1, -verbose=>0, -sections=>'SYNOPSIS', -msg=>'No search string specified'})
    unless keys %{ $q->query };
pod2usage({-exitval=>1, -verbose=>0, -sections=>'SYNOPSIS', -msg=>'Cannot use --tail and --top together'})
    if exists $OPT{tail} && $OPT{top};
pod2usage({-exitval=>1, -verbose=>0, -sections=>'SYNOPSIS', -msg=>'Cannot use --tail and --sort together'})
    if exists $OPT{tail} && $OPT{sort};
pod2usage({-exitval=>1, -verbose=>0, -sections=>'SYNOPSIS', -msg=>'Cannot use --sort along with --asc or --desc'})
    if $OPT{sort} && ($OPT{asc} || $OPT{desc});
pod2usage({-exitval=>1, -verbose=>0, -sections=>'SYNOPSIS', -msg=>'Please specify --show or --jq with --tail'})
    if exists $OPT{tail} && !( @SHOW || $OPT{json});

# Process extra parameters
foreach my $presence ( qw( exists missing ) ) {
    if( exists $OPT{$presence} ) {
        my @fields = map { split /[,:]/ } (is_arrayref($OPT{$presence}) ? @{ $OPT{$presence} } : ($OPT{$presence}));
        my $context = $presence eq 'exists' ? 'must' : 'must_not';
        foreach my $field (@fields) {
            $q->add_bool( $context => { exists => { field => $field } } );
        }
    }
}

my %SUPPORTED_AGGREGATIONS = map {$_=>'simple_value'} qw(cardinality sum min max avg value_count);
my $agg_header = '';
if( exists $OPT{top} ) {
    my @top = split /:/, $OPT{top};
    my $top_field = pop @top;
    my $top_agg   = @top ? shift @top : 'terms';

    my @agg_fields = grep { exists $FIELDS{$_} } split /\s*,\s*/, $top_field;
    croak(sprintf("Option --top takes a field, found %d fields: %s\n", scalar(@agg_fields),join(',',@agg_fields)))

scripts/es-search.pl  view on Meta::CPAN


my %displayed_indices = ();
my $TOTAL_HITS        = 0;
my $OUT_OF            = 0;
my $last_hit_ts       = undef;
my $duration          = 0;
my $displayed         = 0;
my $header            = 0;
my $age               = undef;
my %last_batch_id     = ();
my %AGGS_TOTALS       = ();
my %AGES_SEEN         = ();
# Handle CTRL+C During the Loop
my $DONE              = 0;
local $SIG{INT}       = sub { $DONE=1 };

verbose({color=>'green',level=>1}, "= Query setup complete, beginning request.");
AGES: while( !$DONE && @AGES ) {
    # With --tail, we don't want to deplete @AGES
    $age = $OPT{tail} ? $AGES[0] : shift @AGES;

    # Pause for 200ms if we're tailing
    sleep(0.2) if exists $OPT{tail} && $last_hit_ts;

    my $start=time();
    $last_hit_ts ||= strftime('%Y-%m-%dT%H:%M:%S%z',localtime($start-30));

    # If we're tailing, bump the @query with a timestamp range
    $q->stash( filter => {range => { $CONFIG{timestamp} => {gte => $last_hit_ts}}} ) if $OPT{tail};

    # Header
    if( !exists $AGES_SEEN{$age} ) {
        output({color=>'yellow'}, "= Querying Indexes: " . join(',', @{ $by_age{$age} })) unless $OPT{'no-decorators'};
        $AGES_SEEN{$age}=1;
        $header=0;
    }

    debug("== Request Parameters");
    debug_var($q->uri_params);
    debug("== Query");
    debug(to_json $q->request_body,{allow_nonref=>1,canonical=>1,pretty=>1});

    # Execute the query
    my $result = $q->execute( $by_age{$age} );

    debug({clear=>1},"== Results");
    debug_var($result);
    $duration += time() - $start;

    # Advance if we don't have a result
    next unless defined $result;

    if ( $result->{error} ) {
        my $simple_error;
        eval {
            $simple_error = $result->{error}{caused_by}{caused_by}{reason};
        } or do {
            ($simple_error) = $result->{error} =~ m/(QueryParsingException\[\[[^\]]+\][^\]]+\]\]);/;
        };
        $simple_error ||= '';
        output({stderr=>1,color=>'red'},
            "# Received an error from the cluster. $simple_error"
        );
        last;
    }
    $displayed_indices{$_} = 1 for @{ $by_age{$age} };
    $TOTAL_HITS += $result->{hits}{total} if $result->{hits}{total};

    my @always = ();
    push @always, $CONFIG{timestamp} unless $OPT{'no-decorators'};
    if(!$header && @SHOW) {
        output({color=>'cyan'}, join("\t", @always, @SHOW));
        $header++;
    }

    while( $result && !$DONE ) {
        my $hits = is_arrayref($result->{hits}{hits}) ? $result->{hits}{hits} : [];

        # Handle Aggregations
        if( my $aggregations = $result->{aggregations} ) {
            display_aggregations($aggregations, $result->{hits}{total});
            next AGES;
        }

        # Reset the last batch ID if we have new data
        %last_batch_id = () if @{$hits} > 0 && $last_hit_ts ne $hits->[-1]->{_source}{$CONFIG{timestamp}};
        debug({color=>'magenta'}, "+ ID cache is now empty.") unless keys %last_batch_id;

        foreach my $hit (@{ $hits }) {
            # Skip if we've seen this record
            next if exists $last_batch_id{$hit->{_id}};

            $last_hit_ts = $hit->{_source}{$CONFIG{timestamp}};
            $last_batch_id{$hit->{_id}}=1;
            my $record = {};

            # Add the _id field to the source so that it is listed
            # when showing full records and can be and can be
            # used in @SHOW
            $hit->{_source}->{_id} = $hit->{_id} unless defined($hit->{_source}->{_id});

            if( @SHOW ) {
                my $flat = es_flatten_hash( $hit->{_source} );
                debug_var($flat);
                foreach my $f (@always) {
                    $record->{$f} = $flat->{$f};
                }
                foreach my $f (@SHOW) {
                    my $value = undef;
                    if( exists $flat->{$f} ) {
                        $value = $flat->{$f};
                    }
                    elsif( my $v = document_lookdown($hit->{_source},$f) ) {
                        $value = $v;
                    }
                    elsif(index($f, '.') > 0) {
                        # Try path matching the key
                        my @values = ();
                        foreach my $k (keys %{ $flat }) {
                            if( index($k,$f) == 0 ) {
                                push @values, $flat->{$k};
                            }
                            elsif( $k =~ /\.\d+\./ ) {
                                my $flatter =  join '.', grep { !/^\d+$/ } split /\./, $k;
                                if ( $flatter eq $f ) {
                                    push @values, $flat->{$k};
                                }
                            }
                        }
                        $value = @values ? @values == 1 ? $values[0] : \@values : undef;
                    }
                    $record->{$f} = $value;
                }
            }
            else {
                $record = $hit->{_source};
            }
            # Determine how this record is output
            my $output = undef;
            if( @SHOW ) {
                my @cols=();
                foreach my $f (@always,@SHOW) {
                    my $v = '-';
                    if( exists $record->{$f} && defined $record->{$f} ) {
                        $v = is_arrayref($record->{$f}) && @{ $record->{$f} } == 1 ? $record->{$f}[0]
                           : is_ref($record->{$f}) ? to_json($record->{$f},{allow_nonref=>1,canonical=>1})
                           : $record->{$f};
                    }
                    push @cols,$v;
                }
                $output = join("\t",@cols);
            }
            else {
                $output = $CONFIG{format} =~ /^json/? to_json($record,{allow_nonref=>1,canonical=>1,pretty=>$CONFIG{pretty}})
                        : Dump $record;
            }

            output({data=>1}, $output);
            $displayed++;
            last if all_records_displayed();
        }
        last if all_records_displayed();

        # Scroll forward
        $start = time;
        $result = $q->scroll_results();
        $duration += time - $start;
        # Check if we need to keep going
        last unless defined $result;
        last unless $result->{hits} && $result->{hits}{hits} && @{ $result->{hits}{hits} } > 0
    }
    last if all_records_displayed();
}

output({stderr=>1,color=>'yellow'},
    "# Search Parameters:",
    (map { "#    $_" } split /\r?\n/, to_json($q->query,{allow_nonref=>1,canonical=>1,pretty=>$CONFIG{pretty}})),
    sprintf("# Displaying %d of %d results%s took %0.2f seconds.",
        $displayed,
        $OUT_OF || $TOTAL_HITS,
        $OUT_OF ? " in $TOTAL_HITS documents" : '',
        $duration,
    ),
    sprintf("# Indexes (%d of %d) searched: %s\n",
            scalar(keys %displayed_indices),
            scalar(keys %indices),
            join(',', sort keys %displayed_indices)
    ),
) unless $OPT{'no-decorators'};

if($CONFIG{summary} && keys %AGGS_TOTALS) {
    unless ( $OPT{'no-decorators'} ) {
        output({color=>'yellow'}, '#', '# Totals across batch', '#');
        output({color=>'cyan'},$agg_header);
    }
    foreach my $k (sort { $AGGS_TOTALS{$b} <=> $AGGS_TOTALS{$a} } keys %AGGS_TOTALS) {
        output({data=>1,color=>'green'}, join "\t",
                $AGGS_TOTALS{$k}, sprintf($CONFIG{decimal_format}, $AGGS_TOTALS{$k} / $TOTAL_HITS), $k
        );
    }
}

sub all_records_displayed {
    return 1 if $DONE;
    return 0 if exists $OPT{tail};
    return 0 if exists $OPT{all};
    return 1 if $displayed >= $CONFIG{size};
    return 0;
}

sub document_lookdown {
    my ($href,$field) = @_;

    return $href->{$field} if exists $href->{$field};

    foreach my $k (keys %{ $href }) {
        if( is_hashref($href->{$k}) ) {
            return document_lookdown($href->{$k},$field);
        }
    }
    return;
}

sub show_fields {
    output({color=>'cyan'}, 'Fields available for search:' );
    my $total = 0;
    my %types = ();
    foreach my $field (sort keys %FIELDS) {
        $total++;
        my $type = $FIELDS{$field}->{type};
        $types{$type} ||= 0;
        $types{$type}++;
        my $color = $type eq 'ip' ? 'magenta'
                  : $type eq 'text' ? 'red'
                  : $type =~ /float|integer|short|byte|double/ ? 'cyan'



( run in 1.200 second using v1.01-cache-2.11-cpan-39bf76dae61 )