App-ElasticSearch-Utilities
view release on metacpan or search on metacpan
scripts/es-copy-index.pl view on Meta::CPAN
$q->set_scan_scroll('1m');
$q->set_size( $INDEX{block} );
# Connect to ElasticSearch
my %ES = ();
foreach my $dir (qw(from to)) {
$ES{$dir} = es_connect( $HOST{$dir} );
}
die "Invalid index: $INDEX{from}" unless $ES{from}->exists( index => $INDEX{from} );
my $TO_EXISTS = $ES{to}->exists( index => $INDEX{to} );
my $RECORDS = 0;
my $TOTAL=0;
my $LAST = time;
my ($status, $res);
# Mappings/Settings for Non-existant index.
unless( exists $OPT{append} ) {
die "Index $INDEX{to} already exists in $HOST{to}" if $TO_EXISTS;
$res = es_request($ES{from}, '_settings', {index => $INDEX{from}} );
debug_var($res);
my $from_settings = $res->{$INDEX{from}}{settings};
my @settings = ({
index => {
number_of_shards => $from_settings->{index}{number_of_shards},
number_of_replicas => $from_settings->{index}{number_of_replicas},
}
});
if( exists $OPT{settings} && -f $OPT{settings} ) {
my $content = join '', read_lines($OPT{settings});
eval {
push @settings, $JSON->decode($content);
1;
} or do {
debug($content);
die "Parsing JSON from $OPT{settings} failed: $@";
};
}
my $to_settings = clone_merge(@settings);
# Determine if we get mappings from a file or from the index.
my $mappings;
if( exists $OPT{mapping} && -f $OPT{mapping} ) {
my $content = join '', read_lines($OPT{mapping});
eval {
$mappings = $JSON->decode($content);
1;
} or do {
debug($content);
die "Parsing JSON from $OPT{mapping} failed: $@";
};
}
else {
$mappings = $res->{$INDEX{from}}{mappings};
}
$res = es_request($ES{to}, '/',
{
method => 'PUT',
index => $INDEX{to},
},
{
settings => $to_settings,
$mappings ? ( mappings => $mappings ) : (),
}
);
if (!defined $res || !is_hashref($res) || !$res->{ok}) {
die "Failed to create index in $HOST{to} : " . $JSON->encode($res);
}
}
else {
my @ignored=();
foreach my $k (qw(settings mapping)) {
push @ignored, $k if exists $OPT{$k} && -f $OPT{$k};
}
output({color=>'yellow',sticky=>1},
sprintf "%s - warning ignoring %s as they are invalid in this context.", basename($0), join(', ', map { "--$_" } @ignored)
) if @ignored;
} # End Mappings/Settings for Non-existant index.
debug_var($q->request_body);
$res = es_request($ES{from}, '_search',
# Search Parameters
{
index => $INDEX{from},
uri_param => $q->uri_params,
method => 'GET',
},
# Search Body
$q->request_body,
);
debug_var($res);
while( $res && @{ $res->{hits}{hits} }) {
$TOTAL ||= $res->{hits}{total};
my $start=time;
my $batch=0;
my $body = [
map {
$batch++;
(
{ create => { _type => $_->{_type}, _id => $_->{_id}, } },
$_->{_source}
)
} @{ $res->{hits}{hits} }
];
my $max_retries = 3;
my $success = 0;
while ($max_retries--) {
debug("Attempting bulk load of $batch documents");
my ($s2, $r2) = $ES{to}->bulk(
index => $INDEX{to},
body => $body
);
if ($s2 ne "200") {
output({stderr=>1,color=>'red'},"Failed to put documents to $HOST{to} (http status = $status): " . $JSON->encode([ $s2, $r2 ]));
next;
}
( run in 1.646 second using v1.01-cache-2.11-cpan-39bf76dae61 )