App-ElasticSearch-Utilities
view release on metacpan or search on metacpan
scripts/es-copy-index.pl view on Meta::CPAN
$res = es_request($ES{to}, '/',
{
method => 'PUT',
index => $INDEX{to},
},
{
settings => $to_settings,
$mappings ? ( mappings => $mappings ) : (),
}
);
if (!defined $res || !is_hashref($res) || !$res->{ok}) {
die "Failed to create index in $HOST{to} : " . $JSON->encode($res);
}
}
else {
my @ignored=();
foreach my $k (qw(settings mapping)) {
push @ignored, $k if exists $OPT{$k} && -f $OPT{$k};
}
output({color=>'yellow',sticky=>1},
sprintf "%s - warning ignoring %s as they are invalid in this context.", basename($0), join(', ', map { "--$_" } @ignored)
) if @ignored;
} # End Mappings/Settings for Non-existant index.
debug_var($q->request_body);
$res = es_request($ES{from}, '_search',
# Search Parameters
{
index => $INDEX{from},
uri_param => $q->uri_params,
method => 'GET',
},
# Search Body
$q->request_body,
);
debug_var($res);
while( $res && @{ $res->{hits}{hits} }) {
$TOTAL ||= $res->{hits}{total};
my $start=time;
my $batch=0;
my $body = [
map {
$batch++;
(
{ create => { _type => $_->{_type}, _id => $_->{_id}, } },
$_->{_source}
)
} @{ $res->{hits}{hits} }
];
my $max_retries = 3;
my $success = 0;
while ($max_retries--) {
debug("Attempting bulk load of $batch documents");
my ($s2, $r2) = $ES{to}->bulk(
index => $INDEX{to},
body => $body
);
if ($s2 ne "200") {
output({stderr=>1,color=>'red'},"Failed to put documents to $HOST{to} (http status = $status): " . $JSON->encode([ $s2, $r2 ]));
next;
}
$success=1;
last;
}
die "Failed to write data to $HOST{to}:9200/$INDEX{to} after $RECORDS docs indexed."
unless $success;
my $took = time - $start;
show_counts( scalar @{$res->{hits}{hits}} );
$res = es_request($ES{from}, '_search/scroll', {
uri_param => {
scroll_id => $res->{_scroll_id},
scroll => '1m',
}
});
verbose(sprintf "Batch of %d done in %00.2fs.", $batch, $took);
}
sub show_counts {
my $inc_records = shift;
output({color=>'green'}, "Starting copy of $INDEX{from} to $HOST{to}:$INDEX{to}.") if $RECORDS == 0;
$RECORDS += $inc_records;
if( $RECORDS % ($INDEX{block} * 10) == 0 ) {
my $now = time;
my $diff = $now - $LAST;
my @time=localtime;
my $msg = sprintf "%00.2f%% %02d:%02d:%02d Records: %d of %d in %0.2fs", ($RECORDS/$TOTAL)*100, @time[2,1,0], $RECORDS, $TOTAL, $diff;
output({color=>'yellow'}, $msg);
$LAST=$now;
}
}
__END__
=pod
=head1 NAME
es-copy-index.pl - Copy an index from one cluster to another
=head1 VERSION
version 8.8
=head1 SYNOPSIS
es-copy-access.pl [options] [query to select documents]
Options:
--source (Required) The source index name for the copy
--destination Destination index name, assumes source
--from (Required) A server in the cluster where the index lives
--to A server in the cluster where the index will be copied to
--block How many docs to process in one batch, default: 1,000
--mapping JSON mapping to use instead of the source mapping
--settings JSON index settings to use instead of those from the source
( run in 3.143 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )