App-ElasticSearch-Utilities

 view release on metacpan or  search on metacpan

lib/App/ElasticSearch/Utilities.pm  view on Meta::CPAN


    $host ||= $DEF{HOST};

    # Return the results if we've done this already
    return @{ $_auth_cache{$host} }{qw(username password)}
        if exists $_auth_cache{$host};

    # Set the cached element
    my %auth = ();

    # Lookup the details netrc
    my $netrc = Net::Netrc->lookup($host);
    if( $DEF{HOST} eq $host ) {
        %auth = map { lc($_) => $DEF{$_} } qw(USERNAME);
    }
    my %meta = ();
    foreach my $k (qw( http-username password-exec )) {
        foreach my $name ( $DEF{INDEX}, $DEF{BASE} ) {
            next unless $name;
            if( my $v = es_local_index_meta($k, $name) ) {
                $meta{$k} = $v;
                last;
            }
        }
    }

    # Get the Username
    $auth{username} ||= $meta{'http-username'} ? $meta{'http-username'}
                      : defined $DEF{USERNAME} ? $DEF{USERNAME}
                      : defined $netrc         ? $netrc->login
                      : $ENV{USER};

    # Prompt for the password
    $auth{password} ||= defined $netrc ? $netrc->password
                      : (es_pass_exec($host,$auth{username},$meta{'password-exec'})
                            || prompt(sprintf "Password for '%s' at '%s': ", $auth{username}, $host)
                        );

    # Store
    $_auth_cache{$host} = \%auth;
    return @auth{qw(username password)};
}


sub es_pass_exec {
    my ($host,$username,$exec) = @_;

    es_utils_initialize() unless keys %DEF;

    # Simplest case we can't run
    $exec ||= $DEF{PASSEXEC};
    return unless length $exec && -x $exec;

    my(@out,@err);
    # Run the password command captue out, error and RC
    run3 [ $exec, $host, $username ], \undef, \@out, \@err;
    my $rc = $?;

    # Record the error
    if( @err or $rc != 0 ) {
        output({color=>'red',stderr=>1},
            sprintf("es_pass_exec() called '%s' and met with an error code '%d'", $exec, $rc),
            @err
        );
        return;
    }

    # Format and return the result
    my $passwd = $out[-1];
    chomp($passwd);
    return $passwd;
}



sub es_pattern {
    es_utils_initialize() unless keys %DEF;
    return {
        re     => $PATTERN,
        string => $DEF{PATTERN},
    };
}

sub _get_ssl_opts {
    es_utils_initialize() unless keys %DEF;
    my %opts = ();
    $opts{verify_hostname} = 0            if $DEF{INSECURE};
    $opts{SSL_ca_file}     = $DEF{CACERT} if $DEF{CACERT};
    $opts{SSL_ca_path}     = $DEF{CAPATH} if $DEF{CAPATH};
    $opts{SSL_cert_file}   = $DEF{CERT}   if $DEF{CERT};
    $opts{SSL_key_file}    = $DEF{KEY}    if $DEF{KEY};
    return \%opts;
}

sub _get_es_version {
    return $CURRENT_VERSION if defined $CURRENT_VERSION;
    my $conn = es_connect();
    # Build the request
    my $req  = App::ElasticSearch::Utilities::HTTPRequest->new(
        GET => sprintf "%s://%s:%d",
                    $conn->proto, $conn->host, $conn->port
    );
    # Check if we're doing auth
    my @auth = $DEF{PASSEXEC} ? es_basic_auth($conn->host) : ();
    # Add authentication if we get a password
    $req->authorization_basic( @auth ) if @auth;

    # Retry with TLS and/or Auth
    my %try = map { $_ => 1 } qw( tls auth );
    my $resp;
    while( not defined $CURRENT_VERSION ) {
        $resp = $conn->ua->request($req);
        if( $resp->is_success ) {
            my $ver;
            eval {
                $ver = $resp->content->{version};
            };
            if( $ver ) {
                if( $ver->{distribution} and $ver->{distribution} eq 'opensearch' ) {
                    $CURRENT_VERSION = '7.10';
                }
                else {
                    $CURRENT_VERSION = join('.', (split /\./,$ver->{number})[0,1]);
                }
            }
        }
        elsif( $resp->code == 500 && $resp->message eq "Server closed connection without sending any data back" ) {
            # Try TLS
            last unless $try{tls};
            delete $try{tls};
            $conn->proto('https');
            warn "Attempting promotion to HTTPS, try setting 'proto: https' in ~/.es-utils.yaml";
        }
        elsif( $resp->code == 401 ) {
            # Retry with credentials
            last unless $try{auth};
            delete $try{auth};
            warn "Authorization required, try setting 'password-exec: /home/user/bin/get-password.sh` in ~/.es-utils.yaml'"
                unless $DEF{PASSEXEC};
            $req->authorization_basic( es_basic_auth($conn->host) );
        }
        else {
            warn "Failed getting version";
            last;
        }
    }
    if( !defined $CURRENT_VERSION || $CURRENT_VERSION <= 2 ) {
        output({color=>'red',stderr=>1}, sprintf "[%d] Unable to determine Elasticsearch version, something has gone terribly wrong: aborting.", $resp->code);
        output({color=>'red',stderr=>1}, ref $resp->content ? YAML::XS::Dump($resp->content) : $resp->content) if $resp->content;
        exit 1;
    }
    debug({color=>'magenta'}, "FOUND VERISON '$CURRENT_VERSION'");
    return $CURRENT_VERSION;
}


my $ES = undef;

sub es_connect {
    my ($override_servers) = @_;

    es_utils_initialize() unless keys %DEF;

    my %conn = (
        host     => $DEF{HOST},
        port     => $DEF{PORT},
        proto    => $DEF{PROTO},
        timeout  => $DEF{TIMEOUT},
        ssl_opts => _get_ssl_opts,
    );
    # Only authenticate over TLS
    if( $DEF{PROTO} eq 'https' ) {
        $conn{username} = $DEF{USERNAME};
        $conn{password} = es_pass_exec(@DEF{qw(HOST USERNAME)}) if $DEF{PASSEXEC};
    }

    # If we're overriding, return a unique handle
    if(defined $override_servers) {
        my @overrides = is_arrayref($override_servers) ? @$override_servers : $override_servers;
        my @servers;
        foreach my $entry ( @overrides ) {
            my ($s,$p) = split /\:/, $entry;
            $p ||= $conn{port};
            push @servers, { %conn, host => $s, port => $p };
        }

        if( @servers > 0 ) {
            my $pick = @servers > 1 ? $servers[int(rand(@servers))] : $servers[0];
            return App::ElasticSearch::Utilities::Connection->new(%{$pick});
        }
    }
    else {
        # Check for index metadata
        foreach my $k ( keys %conn ) {
            foreach my $name ( $DEF{INDEX}, $DEF{BASE} ) {
                next unless $name;
                if( my $v = es_local_index_meta($k => $name) ) {
                    $conn{$k} = $v;
                    last;
                }
            }
        }
    }

    # Otherwise, cache our handle
    $ES ||= App::ElasticSearch::Utilities::Connection->new(%conn);

    return $ES;
}

lib/App/ElasticSearch/Utilities.pm  view on Meta::CPAN

        $url =~ s/\/$//;
        $url = join('/', $url, $index);
        delete $options->{command};
    }
    elsif( $index ) {
        $options->{index} = $index;
    }
    else {
        $index = '';
    }

    # Figure out if we're modifying things
    my $modification = $url eq '_search' && $options->{method} eq 'POST' ? 0
                     : $options->{method} ne 'GET';

    if($modification) {
        # Set NOOP if necessary
        if(!$DEF{NOOP} && $DEF{MASTERONLY}) {
            if( !es_master() ) {
                $DEF{NOOP} = 1;
            }
        }

        # Check for noop
        if( $DEF{NOOP} ) {
            my $flag = $DEF{MASTERONLY} && !es_master() ? '--master-only' : '--noop';
            output({color=>'cyan'}, "Called es_request($index/$options->{command}), but $flag set and method is $options->{method}");
            return;
        }
    }

    # Make the request
    my $resp = $instance->request($url,$options,$body);

    # Check the response is defined, bail if it's not
    die "Unsupported request method: $options->{method}" unless defined $resp;

    # Logging
    verbose({color=>'yellow'}, sprintf "es_request(%s/%s) returned HTTP Status %s",
        $index, $options->{command}, $resp->message,
    ) if $resp->code != 200;

    # Error handling
    if( !$resp->is_success ) {
        my $msg;
        eval {
            my @causes = ();
            foreach my $cause ( @{ $resp->content->{error}{root_cause} } ) {
                push @causes, $cause->{index} ? "$cause->{index}: $cause->{reason}" : $cause->{reason};
            }
            $msg = join("\n", map { "\t$_" } @causes);
            1;
        } or do {
            # Default to the message, though it's usually unhelpful
            $msg = $resp->{message};
        };
        die sprintf "es_request(%s/%s) failed[%d]:\n%s",
                    $index, $options->{command}, $resp->code, $msg || 'missing error message';

    } elsif( !defined $resp->content || ( !is_ref($resp->content) && !length $resp->content )) {
        output({color=>'yellow',stderr=>1},
            sprintf "es_request(%s/%s) empty response[%d]: %s",
                $index, $options->{command}, $resp->code, $resp->message
        );
    }

    return $resp->content;
}



my %_nodes;
sub es_nodes {
    if(!keys %_nodes) {
        my $res = es_request('_cluster/state/nodes', {});
        if( !defined $res  ) {
            output({color=>"red"}, "es_nodes(): Unable to locate nodes in status!");
            exit 1;
        }
        foreach my $id ( keys %{ $res->{nodes} } ) {
            $_nodes{$id} = $res->{nodes}{$id}{name};
        }
    }

    return wantarray ? %_nodes : { %_nodes };
}


my $_indices_meta;
sub es_indices_meta {
    if(!defined $_indices_meta) {
        my $result = es_request('_cluster/state/metadata');
        if ( !defined $result ) {
            output({stderr=>1,color=>"red"}, "es_indices_meta(): Unable to locate indices in status!");
            exit 1;
        }
        $_indices_meta = $result->{metadata}{indices};
    }

    my %copy = %{ $_indices_meta };
    return wantarray ? %copy : \%copy;
}


my %_valid_index = ();
sub es_indices {
    my %args = (
        state       => 'open',
        check_state => 1,
        check_dates => 1,
        @_
    );

    es_utils_initialize() unless keys %DEF;

    # Seriously, English? Do you speak it motherfucker?
    $args{state} = 'close' if $args{state} eq 'closed';

    my @indices = ();
    my %idx = ();
    my $wildcard = !exists $args{_all} && defined $DEF{BASE} ? sprintf "/*%s*", $DEF{BASE} : '';

    # Simplest case, single index
    if( defined $DEF{INDEX} ) {
        push @indices, $DEF{INDEX};
    }
    # Next simplest case, open indexes
    elsif( !exists $args{_all} && $args{check_state} && $args{state} eq 'open' ) {
        # Use _stats because it's break neck fast
        if( my $res = es_request($wildcard . '/_stats/docs') ) {
            foreach my $idx ( keys %{ $res->{indices} } ) {
                $idx{$idx} = 'open';
            }
        }
    }
    else {
        my $res = es_request('_cat/indices' . $wildcard, { uri_param => { h => 'index,status' } });
        foreach my $entry (@{ $res }) {
            my ($index,$status) = is_hashref($entry) ? @{ $entry }{qw(index status)} : split /\s+/, $entry;
            $idx{$index} = $status;
        }
    }

    foreach my $index (sort keys %idx) {
        if(!exists $args{_all}) {
            my $status = $idx{$index};
            # State Check Disqualification
            if($args{state} ne 'all' && $args{check_state})  {
                my $result = $status eq $args{state};
                next unless $result;
            }

            my $p = es_pattern();
            next unless $index =~ /$p->{re}/;

lib/App/ElasticSearch/Utilities.pm  view on Meta::CPAN


        # Loop through the mappings, skipping _default_, except on 7.x where we notice "properties"
        my @mappings = exists $ref->{properties} ? ($ref)
                     : map { $ref->{$_} } grep { $_ ne '_default_' } keys %{ $ref };
        foreach my $mapping (@mappings) {
            _find_fields(\%fields,$mapping);
        }
    }
    # Return the results
    return \%fields;
}

{
    # Closure for field metadata
    my $nested_path;

    sub _add_fields {
        my ($f,$type,@path) = @_;

        return unless @path;

        my %i = (
            type   => $type,
        );

        # Store the full path
        my $key = join('.', @path);

        if( $nested_path ) {
            $i{nested_path} = $nested_path;
            $i{nested_key}  = substr( $key, length($nested_path)+1 );
        }

        $f->{$key} = \%i;
    }

    sub _find_fields {
        my ($f,$ref,@path) = @_;

        return unless is_hashref($ref);
        # Handle things with properties
        if( exists $ref->{properties} && is_hashref($ref->{properties}) ) {
            $nested_path = join('.', @path) if $ref->{type} and $ref->{type} eq 'nested';
            foreach my $k (sort keys %{ $ref->{properties} }) {
                _find_fields($f,$ref->{properties}{$k},@path,$k);
            }
            undef($nested_path);
        }
        # Handle elements that contain data
        elsif( exists $ref->{type} ) {
            _add_fields($f,$ref->{type},@path);
            # Handle multifields
            if( exists $ref->{fields} && is_hashref($ref->{fields}) ) {
                foreach my $k (sort keys %{ $ref->{fields} } ) {
                    _add_fields($f,$ref->{type},@path,$k);
                }
            }
        }
        # Unknown data, throw an error if we care that deeply.
        else {
            debug({stderr=>1,color=>'red'},
                sprintf "_find_fields(): Invalid property at: %s ref info: %s",
                    join('.', @path),
                    join(',', is_hashref($ref) ? sort keys %{$ref} :
                            ref $ref         ? ref $ref : 'unknown ref'
                    ),
            );
        }
    }
}


sub es_close_index {
    my($index) = @_;

    return es_request('_close',{ method => 'POST', index => $index });
}


sub es_open_index {
    my($index) = @_;

    return es_request('_open',{ method => 'POST', index => $index });
}


sub es_delete_index {
    my($index) = @_;

    return es_request('',{ method => 'DELETE', index => $index });
}


sub es_optimize_index {
    my($index) = @_;

    return es_request('_forcemerge',{
            method    => 'POST',
            index     => $index,
            uri_param => {
                max_num_segments => 1,
            },
    });
}


sub es_apply_index_settings {
    my($index,$settings) = @_;

    if(!is_hashref($settings)) {
        output({stderr=>1,color=>'red'}, 'usage is es_apply_index_settings($index,$settings_hashref)');
        return;
    }

    return es_request('_settings',{ method => 'PUT', index => $index },$settings);
}


sub es_index_segments {
    my ($index) = @_;

    if( !defined $index || !length $index || !es_index_valid($index) ) {
        output({stderr=>1,color=>'red'}, "es_index_segments('$index'): invalid index");
        return;
    }

    return es_request('_segments', {
        index => $index,
    });

}


sub es_segment_stats {
    my ($index) = @_;

    my %segments =  map { $_ => 0 } qw(shards segments);
    my $result = es_index_segments($index);

    if(defined $result) {
        my $shard_data = $result->{indices}{$index}{shards};
        foreach my $id (keys %{$shard_data}) {
            $segments{segments} += $shard_data->{$id}[0]{num_search_segments};
            $segments{shards}++;
        }
    }
    return wantarray ? %segments : \%segments;
}



sub es_index_stats {
    my ($index) = @_;

    return es_request('_stats', {
        index     => $index
    });
}



sub es_settings {
    return es_request('_settings');
}


sub es_node_stats {
    my (@nodes) = @_;

    my @cmd = qw(_nodes);
    push @cmd, join(',', @nodes) if @nodes;
    push @cmd, 'stats';

    return es_request(join('/',@cmd));
}


sub es_flatten_hash {
    my $hash = shift;
    my $_flat = flatten($hash, { HashDelimiter=>':', ArrayDelimiter=>':' });
    my %compat = map { s/:/./gr => $_flat->{$_} } keys %{ $_flat };
    return \%compat;
}



( run in 1.776 second using v1.01-cache-2.11-cpan-39bf76dae61 )