Metabrik-Repository
view release on metacpan or search on metacpan
lib/Metabrik/Client/Elasticsearch.pm view on Meta::CPAN
my $task = $self->get_taskid($id) or next;
my $status = $task->{task}{status};
my $desc = $task->{task}{description};
my $total = $status->{total};
my $created = $status->{created};
my $deleted = $status->{deleted};
my $updated = $status->{updated};
my $perc = ($created + $deleted + $updated) / $total * 100;
printf("> Task [%s]: %.02f%%\n", $desc, $perc);
print "created[$created] deleted[$deleted] updated[$updated] total[$total]\n";
}
return 1;
}
sub loop_show_reindex_progress {
my $self = shift;
my ($sec) = @_;
$sec ||= 60;
my $es = $self->_es;
$self->brik_help_run_undef_arg('open', $es) or return;
while (1) {
$self->show_reindex_progress or return;
sleep($sec);
}
return 1;
}
sub reindex_with_mapping_from_json_file {
my $self = shift;
my ($index, $new, $file) = @_;
my $es = $self->_es;
$self->brik_help_run_undef_arg('open', $es) or return;
$self->brik_help_run_undef_arg('reindex_with_mapping_from_json_file', $index)
or return;
$self->brik_help_run_undef_arg('reindex_with_mapping_from_json_file', $new) or return;
$self->brik_help_run_undef_arg('reindex_with_mapping_from_json_file', $file) or return;
$self->brik_help_run_file_not_found('reindex_with_mapping_from_json_file', $file)
or return;
my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
my $json = $fj->read($file) or return;
return $self->reindex($index, $new, $json);
}
#
# Search::Elasticsearch::Client::5_0::Direct
#
# To execute this Command using routing requires to use the correct field
# value directly in $hash->{routing}. We cannot "guess" it from arguments,
# this would be a little bit complicated to do in an efficient way.
#
sub update_document {
my $self = shift;
my ($doc, $id, $index, $type, $hash) = @_;
$index ||= $self->index;
$type ||= $self->type;
my $es = $self->_es;
$self->brik_help_run_undef_arg('open', $es) or return;
$self->brik_help_run_undef_arg('update_document', $doc) or return;
$self->brik_help_run_invalid_arg('update_document', $doc, 'HASH') or return;
$self->brik_help_run_undef_arg('update_document', $id) or return;
$self->brik_help_set_undef_arg('index', $index) or return;
$self->brik_help_set_undef_arg('type', $type) or return;
my %args = (
id => $id,
index => $index,
body => { doc => $doc },
);
if ($self->use_type) {
$args{type} = $type;
}
if (defined($hash)) {
$self->brik_help_run_invalid_arg('update_document', $hash, 'HASH')
or return;
%args = ( %args, %$hash );
}
my $r;
eval {
$r = $es->update(%args);
};
if ($@) {
chomp($@);
return $self->log->error("update_document: index failed for index [$index]: [$@]");
}
return $r;
}
#
# Search::Elasticsearch::Client::5_0::Bulk
#
sub index_bulk {
my $self = shift;
my ($doc, $index, $type, $hash, $id) = @_;
my $bulk = $self->_bulk;
$index ||= $self->index;
$type ||= $self->type;
$self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
$self->brik_help_run_undef_arg('index_bulk', $doc) or return;
$self->brik_help_set_undef_arg('index', $index) or return;
$self->brik_help_set_undef_arg('type', $type) or return;
my %args = (
source => $doc,
);
if (defined($id)) {
lib/Metabrik/Client/Elasticsearch.pm view on Meta::CPAN
if (defined($p) && exists($p->{class})) {
my $class = $p->{class};
my $code = $p->{code};
my $node = $p->{node};
return $self->log->error("index_bulk: failed for index [$index] with error ".
"[$class] code [$code] for node [$node]");
}
else {
return $self->log->error("index_bulk: index failed for index [$index]: [$@]");
}
}
return $r;
}
#
# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-forcemerge.html
#
sub clean_deleted_from_index {
my $self = shift;
my ($index) = @_;
$self->brik_help_run_undef_arg('clean_deleted_from_index', $index) or return;
my $es = $self->_es;
$self->brik_help_run_undef_arg('open', $es) or return;
my $indices = $self->_es->indices;
my $r;
eval {
$r = $indices->forcemerge(
index => $index,
only_expunge_deletes => 'true',
);
};
if ($@) {
chomp($@);
my $p = $self->parse_error_string($@);
if (defined($p) && exists($p->{class})) {
my $class = $p->{class};
my $code = $p->{code};
my $node = $p->{node};
return $self->log->error("clean_deleted_from_index: failed for index ".
"[$index] with error [$class] code [$code] for node [$node]");
}
else {
return $self->log->error("clean_deleted_from_index: index failed for ".
"index [$index]: [$@]");
}
}
return $r;
}
#
# To execute this Command using routing requires to use the correct field
# value directly in $hash->{routing}. We cannot "guess" it from arguments,
# this would be a little bit complicated to do in an efficient way.
#
sub update_document_bulk {
my $self = shift;
my ($doc, $index, $type, $hash, $id) = @_;
my $bulk = $self->_bulk;
$index ||= $self->index;
$type ||= $self->type;
$self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
$self->brik_help_run_undef_arg('update_document_bulk', $doc) or return;
$self->brik_help_set_undef_arg('index', $index) or return;
$self->brik_help_set_undef_arg('type', $type) or return;
my %args = (
index => $index,
doc => $doc,
);
if ($self->use_type) {
$args{type} = $type;
}
if (defined($id)) {
$args{id} = $id;
}
if (defined($hash)) {
$self->brik_help_run_invalid_arg('update_document_bulk', $hash, 'HASH')
or return;
%args = ( %args, %$hash );
}
my $r;
eval {
$r = $bulk->update(\%args);
};
if ($@) {
chomp($@);
my $p = $self->parse_error_string($@);
if (defined($p) && exists($p->{class})) {
my $class = $p->{class};
my $code = $p->{code};
my $node = $p->{node};
return $self->log->error("update_document_bulk: failed for index [$index] ".
"with error [$class] code [$code] for node [$node]");
}
else {
return $self->log->error("update_document_bulk: index failed for ".
"index [$index]: [$@]");
}
}
return $r;
}
#
# We may have to call refresh_index after a bulk_flush, so we give an additional
# optional Argument for given index.
#
sub bulk_flush {
my $self = shift;
my ($index) = @_;
lib/Metabrik/Client/Elasticsearch.pm view on Meta::CPAN
}
#
# Search::Elasticsearch::Client::5_0::Direct::Indices
#
sub put_alias {
my $self = shift;
my ($index, $alias) = @_;
my $es = $self->_es;
$self->brik_help_run_undef_arg('open', $es) or return;
$self->brik_help_run_undef_arg('put_alias', $index) or return;
$self->brik_help_run_undef_arg('put_alias', $alias) or return;
my %args = (
index => $index,
name => $alias,
);
my $r;
eval {
$r = $es->indices->put_alias(%args);
};
if ($@) {
chomp($@);
return $self->log->error("put_alias: put_alias failed: [$@]");
}
return $r;
}
#
# Search::Elasticsearch::Client::5_0::Direct::Indices
#
sub delete_alias {
my $self = shift;
my ($index, $alias) = @_;
my $es = $self->_es;
$self->brik_help_run_undef_arg('open', $es) or return;
$self->brik_help_run_undef_arg('delete_alias', $index) or return;
$self->brik_help_run_undef_arg('delete_alias', $alias) or return;
my %args = (
index => $index,
name => $alias,
);
my $r;
eval {
$r = $es->indices->delete_alias(%args);
};
if ($@) {
chomp($@);
return $self->log->error("delete_alias: delete_alias failed: [$@]");
}
return $r;
}
sub update_alias {
my $self = shift;
my ($new_index, $alias) = @_;
my $es = $self->_es;
$self->brik_help_run_undef_arg('open', $es) or return;
$self->brik_help_run_undef_arg('update_alias', $new_index) or return;
$self->brik_help_run_undef_arg('update_alias', $alias) or return;
# Search for previous index with that alias, if any.
my $prev_index;
my $aliases = $self->get_aliases or return;
while (my ($k, $v) = each %$aliases) {
for my $this (keys %$v) {
if ($this eq $alias) {
$prev_index = $k;
last;
}
}
last if $prev_index;
}
# Delete previous alias if it exists.
if (defined($prev_index)) {
$self->delete_alias($prev_index, $alias) or return;
}
return $self->put_alias($new_index, $alias);
}
sub is_mapping_exists {
my $self = shift;
my ($index, $mapping) = @_;
$self->brik_help_run_undef_arg('is_mapping_exists', $index) or return;
$self->brik_help_run_undef_arg('is_mapping_exists', $mapping) or return;
if (! $self->is_index_exists($index)) {
return 0;
}
my $all = $self->get_mappings($index) or return;
for my $this_index (keys %$all) {
my $mappings = $all->{$this_index}{mappings};
for my $this_mapping (keys %$mappings) {
if ($this_mapping eq $mapping) {
return 1;
}
}
}
return 0;
}
#
# Search::Elasticsearch::Client::2_0::Direct::Indices
#
sub get_mappings {
my $self = shift;
my ($index, $type) = @_;
lib/Metabrik/Client/Elasticsearch.pm view on Meta::CPAN
return $r;
}
#
# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
#
sub put_template {
my $self = shift;
my ($name, $template) = @_;
my $es = $self->_es;
$self->brik_help_run_undef_arg('open', $es) or return;
$self->brik_help_run_undef_arg('put_template', $name) or return;
$self->brik_help_run_undef_arg('put_template', $template) or return;
$self->brik_help_run_invalid_arg('put_template', $template, 'HASH')
or return;
my $r;
eval {
$r = $es->indices->put_template(
name => $name,
body => $template,
);
};
if ($@) {
chomp($@);
return $self->log->error("put_template: template failed ".
"for name [$name]: [$@]");
}
return $r;
}
sub put_mapping_from_json_file {
my $self = shift;
my ($index, $type, $json_file) = @_;
my $es = $self->_es;
$self->brik_help_run_undef_arg('open', $es) or return;
$self->brik_help_run_undef_arg('put_mapping_from_json_file', $index)
or return;
$self->brik_help_run_undef_arg('put_mapping_from_json_file', $type)
or return;
$self->brik_help_run_undef_arg('put_mapping_from_json_file', $json_file)
or return;
$self->brik_help_run_file_not_found('put_mapping_from_json_file',
$json_file) or return;
my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
my $data = $fj->read($json_file) or return;
if (! exists($data->{mappings})) {
return $self->log->error("put_mapping_from_json_file: no mapping ".
"data found");
}
return $self->put_mapping($index, $type, $data->{mappings});
}
sub update_mapping_from_json_file {
my $self = shift;
my ($json_file, $index, $type) = @_;
my $es = $self->_es;
$self->brik_help_run_undef_arg('open', $es) or return;
$self->brik_help_run_undef_arg('update_mapping_from_json_file',
$json_file) or return;
$self->brik_help_run_file_not_found('update_mapping_from_json_file',
$json_file) or return;
$self->brik_help_run_undef_arg('update_mapping_from_json_file',
$type) or return;
$self->brik_help_run_undef_arg('update_mapping_from_json_file',
$index) or return;
my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
my $data = $fj->read($json_file) or return;
if (! exists($data->{mappings})) {
return $self->log->error("update_mapping_from_json_file: ".
"no data found");
}
my $mappings = $data->{mappings};
return $self->put_mapping($index, $type, $mappings);
}
sub put_template_from_json_file {
my $self = shift;
my ($json_file, $name) = @_;
my $es = $self->_es;
$self->brik_help_run_undef_arg('open', $es) or return;
$self->brik_help_run_undef_arg('put_template_from_json_file', $json_file)
or return;
$self->brik_help_run_file_not_found('put_template_from_json_file',
$json_file) or return;
my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
my $data = $fj->read($json_file) or return;
if (!defined($name)) {
($name) = $json_file =~ m{([^/]+)\.json$};
}
if (! defined($name)) {
return $self->log->error("put_template_from_json_file: no template ".
"name found");
}
return $self->put_template($name, $data);
}
sub update_template_from_json_file {
my $self = shift;
my ($json_file, $name) = @_;
my $es = $self->_es;
$self->brik_help_run_undef_arg('open', $es) or return;
$self->brik_help_run_undef_arg('update_template_from_json_file',
$json_file) or return;
$self->brik_help_run_file_not_found('update_template_from_json_file',
$json_file) or return;
my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
my $data = $fj->read($json_file) or return;
if (!defined($name)) {
($name) = $json_file =~ m{([^/]+)\.json$};
}
if (! defined($name)) {
return $self->log->error("put_template_from_json_file: no template ".
"name found");
}
# We ignore errors, template may not exist.
$self->delete_template($name);
return $self->put_template($name, $data);
}
#
# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
# Search::Elasticsearch::Client::2_0::Direct::Indices
#
sub get_settings {
my $self = shift;
my ($indices, $names) = @_;
my $es = $self->_es;
$self->brik_help_run_undef_arg('open', $es) or return;
my %args = ();
if (defined($indices)) {
$self->brik_help_run_undef_arg('get_settings', $indices) or return;
my $ref = $self->brik_help_run_invalid_arg('get_settings', $indices, 'ARRAY', 'SCALAR')
or return;
$args{index} = $indices;
}
if (defined($names)) {
$self->brik_help_run_file_not_found('get_settings', $names) or return;
my $ref = $self->brik_help_run_invalid_arg('get_settings', $names, 'ARRAY', 'SCALAR')
or return;
$args{name} = $names;
}
my $r;
eval {
$r = $es->indices->get_settings(%args);
};
if ($@) {
chomp($@);
return $self->log->error("get_settings: failed: [$@]");
( run in 0.988 second using v1.01-cache-2.11-cpan-39bf76dae61 )