App-ElasticSearch-Utilities

 view release on metacpan or  search on metacpan

scripts/es-copy-index.pl  view on Meta::CPAN

    $res = es_request($ES{to}, '/',
        {
            method => 'PUT',
            index => $INDEX{to},
        },
        {
            settings => $to_settings,
            $mappings ? ( mappings => $mappings ) : (),
        }
    );

    if (!defined $res || !is_hashref($res) || !$res->{ok}) {
        die "Failed to create index in $HOST{to} : " . $JSON->encode($res);
    }
}
else {
    my @ignored=();
    foreach my $k (qw(settings mapping)) {
        push @ignored, $k if exists $OPT{$k} && -f $OPT{$k};
    }
    output({color=>'yellow',sticky=>1},
        sprintf "%s - warning ignoring %s as they are invalid in this context.", basename($0), join(', ', map { "--$_" } @ignored)
    ) if @ignored;
} # End Mappings/Settings for Non-existant index.

debug_var($q->request_body);
$res = es_request($ES{from}, '_search',
    # Search Parameters
    {
        index     => $INDEX{from},
        uri_param => $q->uri_params,
        method => 'GET',
    },
    # Search Body
    $q->request_body,
);
debug_var($res);

while( $res && @{ $res->{hits}{hits} }) {
    $TOTAL ||= $res->{hits}{total};
    my $start=time;
    my $batch=0;
    my $body = [
        map {
            $batch++;
            (
                { create => { _type => $_->{_type},  _id => $_->{_id}, } },
                $_->{_source}
            )
        } @{ $res->{hits}{hits} }
    ];
    my $max_retries = 3;
    my $success = 0;
    while ($max_retries--) {
        debug("Attempting bulk load of $batch documents");
        my ($s2, $r2) = $ES{to}->bulk(
            index => $INDEX{to},
            body => $body
        );
        if ($s2 ne "200") {
            output({stderr=>1,color=>'red'},"Failed to put documents to $HOST{to} (http status = $status): " . $JSON->encode([ $s2, $r2 ]));
            next;
        }
        $success=1;
        last;
    }
    die "Failed to write data to $HOST{to}:9200/$INDEX{to} after $RECORDS docs indexed."
        unless $success;
    my $took = time - $start;
    show_counts( scalar @{$res->{hits}{hits}} );
    $res = es_request($ES{from}, '_search/scroll', {
        uri_param => {
            scroll_id => $res->{_scroll_id},
            scroll    => '1m',
        }
    });

    verbose(sprintf "Batch of %d done in %00.2fs.", $batch, $took);
}

sub show_counts {
    my $inc_records = shift;

    output({color=>'green'}, "Starting copy of $INDEX{from} to $HOST{to}:$INDEX{to}.") if $RECORDS == 0;

    $RECORDS += $inc_records;
    if( $RECORDS % ($INDEX{block} * 10) == 0 ) {
        my $now = time;
        my $diff = $now - $LAST;
        my @time=localtime;
        my $msg = sprintf "%00.2f%% %02d:%02d:%02d Records: %d of %d in %0.2fs", ($RECORDS/$TOTAL)*100, @time[2,1,0], $RECORDS, $TOTAL, $diff;
        output({color=>'yellow'}, $msg);
        $LAST=$now;
    }
}

__END__

=pod

=head1 NAME

es-copy-index.pl - Copy an index from one cluster to another

=head1 VERSION

version 8.8

=head1 SYNOPSIS

es-copy-access.pl [options] [query to select documents]

Options:

    --source            (Required) The source index name for the copy
    --destination       Destination index name, assumes source
    --from              (Required) A server in the cluster where the index lives
    --to                A server in the cluster where the index will be copied to
    --block             How many docs to process in one batch, default: 1,000
    --mapping           JSON mapping to use instead of the source mapping
    --settings          JSON index settings to use instead of those from the source



( run in 3.143 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )