App-ElasticSearch-Utilities

 view release on metacpan or  search on metacpan

scripts/es-copy-index.pl  view on Meta::CPAN


$q->set_scan_scroll('1m');
$q->set_size( $INDEX{block} );

# Connect to ElasticSearch
my %ES = ();
foreach my $dir (qw(from to)) {
    $ES{$dir} = es_connect( $HOST{$dir} );
}

die "Invalid index: $INDEX{from}" unless $ES{from}->exists( index => $INDEX{from} );
my $TO_EXISTS = $ES{to}->exists( index => $INDEX{to}  );

my $RECORDS = 0;
my $TOTAL=0;
my $LAST = time;
my ($status, $res);

# Mappings/Settings for Non-existant index.
unless( exists $OPT{append} ) {
    die "Index $INDEX{to} already exists in $HOST{to}" if $TO_EXISTS;
    $res = es_request($ES{from}, '_settings', {index => $INDEX{from}} );
    debug_var($res);
    my $from_settings = $res->{$INDEX{from}}{settings};
    my @settings = ({
        index => {
            number_of_shards   => $from_settings->{index}{number_of_shards},
            number_of_replicas => $from_settings->{index}{number_of_replicas},
        }
    });
    if( exists $OPT{settings} && -f $OPT{settings} ) {
        my $content = join '', read_lines($OPT{settings});
        eval {
            push @settings, $JSON->decode($content);
            1;
        } or do {
            debug($content);
            die "Parsing JSON from $OPT{settings} failed: $@";
        };
    }
    my $to_settings = clone_merge(@settings);

    # Determine if we get mappings from a file or from the index.
    my $mappings;
    if( exists $OPT{mapping} && -f $OPT{mapping} ) {
        my $content = join '', read_lines($OPT{mapping});
        eval {
            $mappings = $JSON->decode($content);
            1;
        } or do {
            debug($content);
            die "Parsing JSON from $OPT{mapping} failed: $@";
        };
    }
    else {
        $mappings = $res->{$INDEX{from}}{mappings};
    }

    $res = es_request($ES{to}, '/',
        {
            method => 'PUT',
            index => $INDEX{to},
        },
        {
            settings => $to_settings,
            $mappings ? ( mappings => $mappings ) : (),
        }
    );

    if (!defined $res || !is_hashref($res) || !$res->{ok}) {
        die "Failed to create index in $HOST{to} : " . $JSON->encode($res);
    }
}
else {
    my @ignored=();
    foreach my $k (qw(settings mapping)) {
        push @ignored, $k if exists $OPT{$k} && -f $OPT{$k};
    }
    output({color=>'yellow',sticky=>1},
        sprintf "%s - warning ignoring %s as they are invalid in this context.", basename($0), join(', ', map { "--$_" } @ignored)
    ) if @ignored;
} # End Mappings/Settings for Non-existant index.

debug_var($q->request_body);
$res = es_request($ES{from}, '_search',
    # Search Parameters
    {
        index     => $INDEX{from},
        uri_param => $q->uri_params,
        method => 'GET',
    },
    # Search Body
    $q->request_body,
);
debug_var($res);

while( $res && @{ $res->{hits}{hits} }) {
    $TOTAL ||= $res->{hits}{total};
    my $start=time;
    my $batch=0;
    my $body = [
        map {
            $batch++;
            (
                { create => { _type => $_->{_type},  _id => $_->{_id}, } },
                $_->{_source}
            )
        } @{ $res->{hits}{hits} }
    ];
    my $max_retries = 3;
    my $success = 0;
    while ($max_retries--) {
        debug("Attempting bulk load of $batch documents");
        my ($s2, $r2) = $ES{to}->bulk(
            index => $INDEX{to},
            body => $body
        );
        if ($s2 ne "200") {
            output({stderr=>1,color=>'red'},"Failed to put documents to $HOST{to} (http status = $status): " . $JSON->encode([ $s2, $r2 ]));
            next;
        }



( run in 1.646 second using v1.01-cache-2.11-cpan-39bf76dae61 )