App-ElasticSearch-Utilities

 view release on metacpan or  search on metacpan

scripts/es-search.pl  view on Meta::CPAN


use App::ElasticSearch::Utilities qw(:all);
use App::ElasticSearch::Utilities::Query;
use App::ElasticSearch::Utilities::QueryString;
use Carp;
use CLI::Helpers qw(:all);
use Getopt::Long qw(:config no_ignore_case no_ignore_case_always);
use JSON::MaybeXS qw(:legacy);
use Pod::Usage;
use POSIX qw(strftime);
use Ref::Util qw(is_ref is_arrayref is_hashref);
use Time::HiRes qw(sleep time);
use YAML::XS;
local $YAML::XS::Boolean = "JSON::PP";

#------------------------------------------------------------------------#
# Argument Parsing
my %OPT;
GetOptions(\%OPT, qw(
    all
    asc
    bases
    bg-filter=s
    by=s
    desc
    exists=s@
    fields
    filter
    format=s
    help|h
    json|jq
    manual|m
    match-all
    max-batch-size=i
    missing=s@
    no-decorators|no-header
    no-implications|no-imply
    precision=i
    prefix=s@
    pretty
    show=s@
    size|n|limit=i
    sort=s
    tail
    timestamp=s
    top=s
    interval=s
    with=s@
    with-missing
    or
));

# Search string is the rest of the argument string
my $context = $OPT{filter} ? 'filter' : 'must';
my $qs = App::ElasticSearch::Utilities::QueryString->new(
            $OPT{filter} ?  (context => 'filter') : (),
            default_join => $OPT{or} ? 'OR' : 'AND',
);
my $q = exists $OPT{'match-all'} && $OPT{'match-all'}
            ? App::ElasticSearch::Utilities::Query->new($context => { match_all => {} })
            : $qs->expand_query_string(@ARGV);

$q->set_timeout('10s');
$q->set_scroll('30s');

if( exists $OPT{prefix} ){
    foreach my $prefix (@{ $OPT{prefix} }) {
        my ($f,$v) = split /:/, $prefix, 2;
        next unless $f && $v;
        $q->add_bool( $context => { prefix => { $f => $v } } );
    }
}

#------------------------------------------------------------------------#
# Documentation
pod2usage({-sections => 'SYNOPSIS'}) if $OPT{help};
pod2usage(-exitval => 0, -verbose => 2) if $OPT{manual};
my $unknown_options = join ', ', grep /^--/, @ARGV;
pod2usage({-exitval => 1, -sections => 'SYNOPSIS', -msg =>"Unknown option(s): $unknown_options"}) if $unknown_options;

#--------------------------------------------------------------------------#
# Information Gathering Routines
if( $OPT{bases} ) {
    show_bases();
    exit 0;
}
#--------------------------------------------------------------------------#
# App Config
my %CONFIG = (
    size      => ($OPT{size} && $OPT{size} > 0) ? int($OPT{size}) : 20,
    format    => $OPT{json}   ? 'json'
               : $OPT{format} ? lc $OPT{format}
               : 'yaml',
    'max-batch-size' => $OPT{'max-batch-size'} || 50,
    precision => $OPT{precision} || 3,
    $OPT{timestamp} ? ( timestamp => $OPT{timestamp} ) : (),
);
$OPT{'no-decorators'} = 1 if $CONFIG{format} eq 'json';
$CONFIG{pretty} = $OPT{pretty} ? 1
                : $CONFIG{format} =~ /pretty/ ? 1
                : 0;
my $JSON = JSON->new->utf8->canonical;
#------------------------------------------------------------------------#
# Handle Indices
my $ORDER = exists $OPT{asc} && $OPT{asc} ? 'asc' : 'desc';
$ORDER = 'asc' if exists $OPT{tail};
my %by_age = ();
my %indices = map { $_ => (es_index_days_old($_) || 0) } es_indices();
die "# Failed to retrieve any indices using your paramaters." unless keys %indices;
my %FIELDS = ();
my $TimeStampCheck=0;
foreach my $index (sort by_index_age keys %indices) {
    my $age = $indices{$index};
    $by_age{$age} ||= [];
    push @{ $by_age{$age} }, $index;
    my $fields = es_index_fields($index);
    foreach my $k ( keys %{ $fields } ) {
        $FIELDS{$k} = $fields->{$k}
            unless $FIELDS{$k};
    }
    # Lookup the Index in our local YAML

scripts/es-search.pl  view on Meta::CPAN

    my %sub_agg = ();
    if( $OPT{by}) {
        my ($type,$field) = split /\:/, $OPT{by};
        if( exists $SUPPORTED_AGGREGATIONS{$type} ) {
            $sub_agg{by} = { $type => {field => $field} };
        }
        else {
            output({color=>'red'}, "Aggregation '$type' is not currently supported, ignoring.");
        }
    }
    if( $OPT{with} ) {
        my @with = is_arrayref($OPT{with}) ? @{ $OPT{with} } : ( $OPT{with} );
        foreach my $with ( @with )  {
            my @attrs = split /:/, $with, 3;
            # Process Args from Right to Left
            my $arg   = @attrs == 3 ? pop @attrs
                      : $attrs[-1] =~ /^\d/ ? pop @attrs
                      : '';
            my $pcts  = $arg =~ /^\d{1,2}(?:\.\d+)?(?:,\d{1,2}(?:\.\d+)?)*$/ ? $arg : '25,50,75,90,95,99';
            my $size  = $arg =~ /^\d+$/ ? $arg : 3;
            my $hi    = $arg || 0.1;
            my $field = exists $FIELDS{$attrs[-1]} ? pop @attrs : undef;
            my $type  = @attrs ? pop @attrs : 'terms';
            # Skip invalid elements
            next unless defined $field and defined $size and $size > 0;

            my %params = ();
            my $id = "$type-$field";
            # If a term agg and we haven't used this field name, simplify it
            if( $type =~ /terms$/ && !$sub_agg{$field} ) {
                $id = $field;
                $params{size} = $size;
                $params{missing} = 'MISSING' if $OPT{'with-missing'};
            }

            if( $type =~ /histogram|stats|percentiles/ && !$OPT{'no-implications'} ) {
                output({color=>'magenta',sticky=>1}, "* Using a statistical aggregation implies an exists filter on $field, use --no-implications to disable this");
                $q->add_bool( must => { exists => { field => $field } } );
            }

            $sub_agg{$id} = {
                $type => {
                    field => $field,
                    $type eq 'percentiles' ? ( percents => [split /,/, $pcts] ) : (),
                    $type eq 'histogram'   ? ( interval => $hi ) :  (),
                    %params,
                }
            };
        }
    }

    my %params = ();
    $params{missing} = 'MISSING' if $OPT{'with-missing'} and $top_agg eq 'terms';

    my $field = shift @agg_fields;
    $agg_header = "count\tpct\t" . $field;
    $agg{$top_agg} = { field => $field, %params };

    if( $OPT{'bg-filter'} && $top_agg eq 'significant_terms' ) {
        my $bgf = App::ElasticSearch::Utilities::QueryString->new();
        my $bgq = $bgf->expand_query_string($OPT{'bg-filter'});
        $agg{$top_agg}->{background_filter} = $bgq->query;

    }

    if( exists $sub_agg{by} ) {
        $agg_header = "$OPT{by}\t" . $agg_header;
        $agg{$top_agg}->{order} = [ { by => $ORDER }, { "_count" => "desc" } ];
    }
    $agg{aggregations} = \%sub_agg if keys %sub_agg;

    if( exists $OPT{all} ) {
        verbose({color=>'cyan'}, "# Aggregations with --all are limited to returning 1,000,000 results.");
        $agg{$top_agg}->{size} = 1_000_000;
    }
    else {
        $agg{$top_agg}->{size} = $CONFIG{size};
    }
    $q->add_aggregations( top => \%agg );
    $q->add_aggregations( out_of => { cardinality => { field => $field  } } );

    if( $OPT{interval} ) {
        $q->wrap_aggregations( step => {
            date_histogram => {
                field    => $CONFIG{timestamp},
                interval => $OPT{interval},
            }
        });
    }
}
elsif(exists $OPT{tail}) {
    $q->set_size($CONFIG{'max-batch-size'});
    @AGES = ($AGES[-1]);
}
elsif( $OPT{all} ) {
    $q->set_size( $CONFIG{'max-batch-size'} );
}
else {
    $q->set_size( $CONFIG{size} < $CONFIG{'max-batch-size'} ? $CONFIG{size} : $CONFIG{'max-batch-size'} );
}

my %displayed_indices = ();
my $TOTAL_HITS        = 0;
my $OUT_OF            = 0;
my $last_hit_ts       = undef;
my $duration          = 0;
my $displayed         = 0;
my $header            = 0;
my $age               = undef;
my %last_batch_id     = ();
my %AGGS_TOTALS       = ();
my %AGES_SEEN         = ();
# Handle CTRL+C During the Loop
my $DONE              = 0;
local $SIG{INT}       = sub { $DONE=1 };

verbose({color=>'green',level=>1}, "= Query setup complete, beginning request.");
AGES: while( !$DONE && @AGES ) {
    # With --tail, we don't want to deplete @AGES
    $age = $OPT{tail} ? $AGES[0] : shift @AGES;

scripts/es-search.pl  view on Meta::CPAN

this mode enforces that only the most recent indices are searched.  Also, given the output is continuous, you must
specify --show with this option.

=item B<top>

Perform an aggregation returning the top field.  Limited to a single field at this time.
This option is not available when using --tail.

    --top src_ip

You can override the default of the C<terms> bucket aggregation by prefixing
the parameter with the required bucket aggregation, i.e.:

    --top significant_terms:src_ip

=item B<by>

Perform a sub aggregation on the top terms aggregation and order by the result of this aggregation.
Aggregation syntax is as follows:

    --by <type>:<field>

A full example might look like this:

    $ es-search.pl --base access dst:www.example.com --top src_ip --by cardinality:acct

This will show the top source IP's ordered by the cardinality (count of the distinct values) of accounts logging
in as each source IP, instead of the source IP with the most records.

Supported sub agggregations and formats:

    cardinality:<field>
    min:<field>
    max:<field>
    avg:<field>
    sum:<field>

=item B<with>

Perform a subaggregation on the top terms and report that sub aggregation details in the output.  The format is:

    --with <aggregation>:<field>:<size>

The default B<size> is 3.
The default B<aggregation> is 'terms'.

B<field> is the only required element.

e.g.

    $ es-search.pl --base logstash error --top program --size 2 --by cardinality:host --with host:5

This will show the top 2 programs with log messages containing the word error by the cardinality (count
distinct host) of hosts showing the top 5 hosts

Without the --with, the results might look like this:

    112314 0.151 sshd
    21224  0.151 ntp

The B<--with> option would expand that output to look like this:

    112314   0.151 host   bastion-804   12431  0.111 sshd
    112314   0.151 host   bastion-803   10009  0.089 sshd
    112314   0.151 host   bastion-805   9768   0.087 sshd
    112314   0.151 host   bastion-801   8789   0.078 sshd
    112314   0.151 host   bastion-802   4121   0.037 sshd
    21224    0.016 host   webapp-324    21223  0.999 ntp
    21224    0.016 host   mail-42       1      0.000 ntp

This may be specified multiple times, the result is more I<rows>, not more I<columns>, e.g.

    $ es-search.pl --base logstash error --top program --size 2 --by cardinality:host --with host:5 --with dc:2

Produces:

    112314 0.151  dc     arlington     112314 1.000 sshd
    112314 0.151  host   bastion-804   12431  0.111 sshd
    112314 0.151  host   bastion-803   10009  0.089 sshd
    112314 0.151  host   bastion-805   9768   0.087 sshd
    112314 0.151  host   bastion-801   8789   0.078 sshd
    112314 0.151  host   bastion-802   4121   0.037 sshd
    21224  0.016  dc     amsterdam     21223  0.999 ntp
    21224  0.016  dc     la            1      0.000 ntp
    21224  0.016  host   webapp-324    21223  0.999 ntp
    21224  0.016  host   mail-42       1      0.000 ntp

You may sub aggregate using any L<bucket agggregation|https://www.elastic.co/guide/en/elasticsearch/reference/master/search-aggregations-bucket.html>
as long as the aggregation provides a B<key> element.  Additionally, doc_count, score, and bg_count will be reported in the output.

Other examples:

    --with significant_terms:crime
    --with cardinality:accts
    --with min:out_bytes
    --with max:out_bytes
    --with avg:out_bytes
    --with sum:out_bytes
    --with stats:out_bytes
    --with extended_stats:out_bytes
    --with percentiles:out_bytes
    --with percentiles:out_bytes:50,95,99
    --with histogram:out_bytes:1024

=item B<with-missing>

For terms aggregations, adds a C<MISSING> bucket.

=item B<bg-filter>

Only used if the C<--top> aggregation is C<significant_terms>.  Sets the
background filter for the C<significant_terms> aggregation.

    es-search.pl --top significant_terms:src_ip method:POST file:\/get\/sensitive_data --bg-filter method:POST

=item B<interval>

When performing aggregations, wrap those aggregations in a date_histogram of this interval.  This
helps flush out "what changed in the last hour."

=item B<match-all>

Apply the ElasticSearch "match_all" search operator to query on all documents
in the index.  This is the default with no search parameters.

=item B<prefix>

Takes a "field:string" combination and you can use multiple --prefix options will be "AND"'d

Example:

    --prefix useragent:'Go '

Will search for documents where the useragent field matches a prefix search on the string 'Go '

JSON Equivalent is:

    { "prefix": { "useragent": "Go " } }

=item B<exists>

Filter results to those containing a valid, not null field

    --exists referer

Only show records with a referer field in the document.

=item B<missing>

Filter results to those not containing a valid, not null field

    --missing referer

Only show records without a referer field in the document.

=item B<bases>

Display a list of bases that can be used with the --base option.

Use with --verbose to show age information on the indexes in each base.

=item B<fields>

Display a list of searchable fields

=item B<index>

Search only this index for data, may also be a comma separated list

=item B<days>

The number of days back to search, the default is 5

=item B<base>

Index base name, will be expanded using the days back parameter.  The default
is 'logstash' which will expand to 'logstash-YYYY.MM.DD'

=item B<timestamp>

The field in your documents that we'll treat as a "date" type in our queries.

May also be specified in the C<~/.es-utils.yaml> file per index, or index base:

    ---
    host: es-readonly-01
    port: 9200
    meta:
      bro:
        timestamp: 'record_ts'
      mayans-2012.12.21:
        timestamp: 'end_of_the_world'

Then running:

    # timestamp is set to '@timestamp', the default
    es-search.pl --base logstash --match-all

    # timestamp is set to 'record_ts', from ~/.es-utils.yaml
    es-search.pl --base bro --match-all

    # timestamp is set to '@timestamp', the default
    es-search.pl --base mayans --match-all

    # timestamp is set to 'end_of_the_world', from ~/.es-utils.yaml
    es-search.pl --index mayans-2012.12.21 --match-all

=item B<size>

The number of results to show, default is 20.

=item B<max-batch-size>

When building result sets, this tool uses scroll searches.  This parameter
controls how many docs are in each scroll.  It defaults to 50, but will be
scaled down lower if C<size> is smaller.

=item B<all>

If specified, ignore the --size parameter and show me everything within the date range I specified.
In the case of --top, this limits the result set to 1,000,000 results.

=back

=head1 Extended Syntax

The search string is pre-analyzed before being sent to ElasticSearch.  The following plugins
work to manipulate the query string and provide richer, more complete syntax for CLI applications.

=head2 App::ElasticSearch::Utilities::QueryString::Barewords

The following barewords are transformed:

    or => OR
    and => AND
    not => NOT

scripts/es-search.pl  view on Meta::CPAN

=head3 Wildcard Query via '*'

Provide an '*' prefix to a query string parameter to promote that parameter to a C<wildcard> filter.

This uses the wild card match for text fields to making matching more intuitive.

E.g.:

    *user_agent:"Mozilla*"

Is translated into:

    { wildcard => { user_agent => "Mozilla* } }

=head3 Regexp Query via '/'

Provide an '/' prefix to a query string parameter to promote that parameter to a C<regexp> filter.

If you want to use regexp matching for finding data, you can use:

    /message:'\\bden(ial|ied|y)'

Is translated into:

    { regexp => { message => "\\bden(ial|ied|y)" } }

=head3 Fuzzy Matching via '~'

Provide an '~' prefix to a query string parameter to promote that parameter to a C<fuzzy> filter.

    ~message:deny

Is translated into:

    { fuzzy => { message => "deny" } }

=head3 Phrase Matching via '+'

Provide an '+' prefix to a query string parameter to promote that parameter to a C<match_phrase> filter.

    +message:"login denied"

Is translated into:

    { match_phrase => { message => "login denied" } }

=head3 Automatic Match Queries for Text Fields

If the field meta data is provided and the field is a C<text> type, the query
will automatically be mapped to a C<match> query.

    # message field is text
    message:"foo"

Is translated into:

    { match => { message => "foo" } }

=head2 App::ElasticSearch::Utilities::QueryString::IP

If a field is an IP address uses CIDR Notation, it's expanded to a range query.

    src_ip:10.0/8 => src_ip:[10.0.0.0 TO 10.255.255.255]

=head2 App::ElasticSearch::Utilities::QueryString::Ranges

This plugin translates some special comparison operators so you don't need to
remember them anymore.

Example:

    price:<100

Will translate into a:

    { range: { price: { lt: 100 } } }

And:

    price:>50,<100

Will translate to:

    { range: { price: { gt: 50, lt: 100 } } }

=head3 Supported Operators

B<gt> via E<gt>, B<gte> via E<gt>=, B<lt> via E<lt>, B<lte> via E<lt>=

=head2 App::ElasticSearch::Utilities::QueryString::Underscored

This plugin translates some special underscore surrounded tokens into
the Elasticsearch Query DSL.

Implemented:

=head3 _prefix_

Example query string:

    _prefix_:useragent:'Go '

Translates into:

    { prefix => { useragent => 'Go ' } }

=head2 App::ElasticSearch::Utilities::QueryString::FileExpansion

If the match ends in .dat, .txt, .csv, or .json then we attempt to read a file with that name and OR the condition:

    $ cat test.dat
    50  1.2.3.4
    40  1.2.3.5
    30  1.2.3.6
    20  1.2.3.7

Or

    $ cat test.csv
    50,1.2.3.4
    40,1.2.3.5
    30,1.2.3.6
    20,1.2.3.7

Or

    $ cat test.txt
    1.2.3.4
    1.2.3.5
    1.2.3.6
    1.2.3.7

Or

    $ cat test.json
    { "ip": "1.2.3.4" }
    { "ip": "1.2.3.5" }
    { "ip": "1.2.3.6" }
    { "ip": "1.2.3.7" }

We can source that file:

    src_ip:test.dat      => src_ip:(1.2.3.4 1.2.3.5 1.2.3.6 1.2.3.7)
    src_ip:test.json[ip] => src_ip:(1.2.3.4 1.2.3.5 1.2.3.6 1.2.3.7)

This make it simple to use the --data-file output options and build queries
based off previous queries. For .txt and .dat file, the delimiter for columns
in the file must be either a tab or a null.  For files ending in
.csv, Text::CSV_XS is used to accurate parsing of the file format.  Files
ending in .json are considered to be newline-delimited JSON.

You can also specify the column of the data file to use, the default being the last column or (-1).  Columns are
B<zero-based> indexing. This means the first column is index 0, second is 1, ..  The previous example can be rewritten
as:

    src_ip:test.dat[1]

or:
    src_ip:test.dat[-1]

For newline delimited JSON files, you need to specify the key path you want to extract from the file.  If we have a
JSON source file with:

    { "first": { "second": { "third": [ "bob", "alice" ] } } }
    { "first": { "second": { "third": "ginger" } } }
    { "first": { "second": { "nope":  "fred" } } }

We could search using:

    actor:test.json[first.second.third]

Which would expand to:

    { "terms": { "actor": [ "alice", "bob", "ginger" ] } }

This option will iterate through the whole file and unique the elements of the list.  They will then be transformed into
an appropriate L<terms query|http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/query-dsl-terms-query.html>.

=head3 Wildcards

We can also have a group of wildcard or regexp in a file:

    $ cat wildcards.dat
    *@gmail.com
    *@yahoo.com

To enable wildcard parsing, prefix the filename with a C<*>.

    es-search.pl to_address:*wildcards.dat

Which expands the query to:

    {
      "bool": {
        "minimum_should_match":1,
        "should": [
           {"wildcard":{"to_outbound":{"value":"*@gmail.com"}}},
           {"wildcard":{"to_outbound":{"value":"*@yahoo.com"}}}
        ]
      }
    }

No attempt is made to verify or validate the wildcard patterns.

=head3 Regular Expressions

If you'd like to specify a file full of regexp, you can do that as well:

    $ cat regexp.dat
    .*google\.com$
    .*yahoo\.com$

To enable regexp parsing, prefix the filename with a C<~>.

    es-search.pl to_address:~regexp.dat

Which expands the query to:

    {
      "bool": {
        "minimum_should_match":1,
        "should": [
          {"regexp":{"to_outbound":{"value":".*google\\.com$"}}},
          {"regexp":{"to_outbound":{"value":".*yahoo\\.com$"}}}
        ]
      }
    }

No attempt is made to verify or validate the regexp expressions.

=head2 App::ElasticSearch::Utilities::QueryString::Nested

Implement the proposed nested query syntax early.  Example:

    nested_path:"field:match AND string"

=head1 Meta-Queries

Helpful in building queries is the --bases and --fields options which lists the index bases and fields:

    es-search.pl --bases

    es-search.pl --fields

    es-search.pl --base access --fields

=head1 AUTHOR

Brad Lhotsky <brad@divisionbyzero.net>

=head1 COPYRIGHT AND LICENSE

This software is Copyright (c) 2026 by Brad Lhotsky.

This is free software, licensed under:

  The (three-clause) BSD License

=cut



( run in 0.327 second using v1.01-cache-2.11-cpan-5623c5533a1 )