Metabrik-Repository

 view release on metacpan or  search on metacpan

lib/Metabrik/Client/Elasticsearch.pm  view on Meta::CPAN

      my $task = $self->get_taskid($id) or next;

      my $status = $task->{task}{status};
      my $desc = $task->{task}{description};
      my $total = $status->{total};
      my $created = $status->{created};
      my $deleted = $status->{deleted};
      my $updated = $status->{updated};

      my $perc = ($created + $deleted + $updated) / $total * 100;

      printf("> Task [%s]: %.02f%%\n", $desc, $perc);
      print "created[$created] deleted[$deleted] updated[$updated] total[$total]\n";
   }

   return 1;
}

sub loop_show_reindex_progress {
   my $self = shift;
   my ($sec) = @_;

   $sec ||= 60;
   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   while (1) {
      $self->show_reindex_progress or return;
      sleep($sec);
   }

   return 1;
}

sub reindex_with_mapping_from_json_file {
   my $self = shift;
   my ($index, $new, $file) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('reindex_with_mapping_from_json_file', $index)
      or return;
   $self->brik_help_run_undef_arg('reindex_with_mapping_from_json_file', $new) or return;
   $self->brik_help_run_undef_arg('reindex_with_mapping_from_json_file', $file) or return;
   $self->brik_help_run_file_not_found('reindex_with_mapping_from_json_file', $file)
      or return;

   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
   my $json = $fj->read($file) or return;

   return $self->reindex($index, $new, $json);
}

#
# Search::Elasticsearch::Client::5_0::Direct
#
# To execute this Command using routing requires to use the correct field
# value directly in $hash->{routing}. We cannot "guess" it from arguments,
# this would be a little bit complicated to do in an efficient way.
#
sub update_document {
   my $self = shift;
   my ($doc, $id, $index, $type, $hash) = @_;

   $index ||= $self->index;
   $type ||= $self->type;
   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('update_document', $doc) or return;
   $self->brik_help_run_invalid_arg('update_document', $doc, 'HASH') or return;
   $self->brik_help_run_undef_arg('update_document', $id) or return;
   $self->brik_help_set_undef_arg('index', $index) or return;
   $self->brik_help_set_undef_arg('type', $type) or return;

   my %args = (
      id => $id,
      index => $index,
      body => { doc => $doc },
   );

   if ($self->use_type) {
      $args{type} = $type;
   }

   if (defined($hash)) {
      $self->brik_help_run_invalid_arg('update_document', $hash, 'HASH')
         or return;
      %args = ( %args, %$hash );
   }

   my $r;
   eval {
      $r = $es->update(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("update_document: index failed for index [$index]: [$@]");
   }

   return $r;
}

#
# Search::Elasticsearch::Client::5_0::Bulk
#
sub index_bulk {
   my $self = shift;
   my ($doc, $index, $type, $hash, $id) = @_;

   my $bulk = $self->_bulk;
   $index ||= $self->index;
   $type ||= $self->type;
   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
   $self->brik_help_run_undef_arg('index_bulk', $doc) or return;
   $self->brik_help_set_undef_arg('index', $index) or return;
   $self->brik_help_set_undef_arg('type', $type) or return;

   my %args = (
      source => $doc,
   );
   if (defined($id)) {

lib/Metabrik/Client/Elasticsearch.pm  view on Meta::CPAN

      if (defined($p) && exists($p->{class})) {
         my $class = $p->{class};
         my $code = $p->{code};
         my $node = $p->{node};
         return $self->log->error("index_bulk: failed for index [$index] with error ".
            "[$class] code [$code] for node [$node]");
      }
      else {
         return $self->log->error("index_bulk: index failed for index [$index]: [$@]");
      }
   }

   return $r;
}

#
# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-forcemerge.html
#
sub clean_deleted_from_index {
   my $self = shift;
   my ($index) = @_;

   $self->brik_help_run_undef_arg('clean_deleted_from_index', $index) or return;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   my $indices = $self->_es->indices;

   my $r;
   eval {
      $r = $indices->forcemerge(
         index => $index,
         only_expunge_deletes => 'true',
      );
   };
   if ($@) {
      chomp($@);
      my $p = $self->parse_error_string($@);
      if (defined($p) && exists($p->{class})) {
         my $class = $p->{class};
         my $code = $p->{code};
         my $node = $p->{node};
         return $self->log->error("clean_deleted_from_index: failed for index ".
            "[$index] with error [$class] code [$code] for node [$node]");
      }
      else {
         return $self->log->error("clean_deleted_from_index: index failed for ".
            "index [$index]: [$@]");
      }
   }

   return $r;
}

#
# To execute this Command using routing requires to use the correct field
# value directly in $hash->{routing}. We cannot "guess" it from arguments,
# this would be a little bit complicated to do in an efficient way.
#
sub update_document_bulk {
   my $self = shift;
   my ($doc, $index, $type, $hash, $id) = @_;

   my $bulk = $self->_bulk;
   $index ||= $self->index;
   $type ||= $self->type;
   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
   $self->brik_help_run_undef_arg('update_document_bulk', $doc) or return;
   $self->brik_help_set_undef_arg('index', $index) or return;
   $self->brik_help_set_undef_arg('type', $type) or return;

   my %args = (
      index => $index,
      doc => $doc,
   );

   if ($self->use_type) {
      $args{type} = $type;
   }

   if (defined($id)) {
      $args{id} = $id;
   }

   if (defined($hash)) {
      $self->brik_help_run_invalid_arg('update_document_bulk', $hash, 'HASH')
         or return;
      %args = ( %args, %$hash );
   }

   my $r;
   eval {
      $r = $bulk->update(\%args);
   };
   if ($@) {
      chomp($@);
      my $p = $self->parse_error_string($@);
      if (defined($p) && exists($p->{class})) {
         my $class = $p->{class};
         my $code = $p->{code};
         my $node = $p->{node};
         return $self->log->error("update_document_bulk: failed for index [$index] ".
            "with error [$class] code [$code] for node [$node]");
      }
      else {
         return $self->log->error("update_document_bulk: index failed for ".
            "index [$index]: [$@]");
      }
   }

   return $r;
}

#
# We may have to call refresh_index after a bulk_flush, so we give an additional 
# optional Argument for given index.
#
sub bulk_flush {
   my $self = shift;
   my ($index) = @_;

lib/Metabrik/Client/Elasticsearch.pm  view on Meta::CPAN

}

#
# Search::Elasticsearch::Client::5_0::Direct::Indices
#
sub put_alias {
   my $self = shift;
   my ($index, $alias) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('put_alias', $index) or return;
   $self->brik_help_run_undef_arg('put_alias', $alias) or return;

   my %args = (
      index => $index,
      name => $alias,
   );

   my $r;
   eval {
      $r = $es->indices->put_alias(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("put_alias: put_alias failed: [$@]");
   }

   return $r;
}

#
# Search::Elasticsearch::Client::5_0::Direct::Indices
#
sub delete_alias {
   my $self = shift;
   my ($index, $alias) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('delete_alias', $index) or return;
   $self->brik_help_run_undef_arg('delete_alias', $alias) or return;

   my %args = (
      index => $index,
      name => $alias,
   );

   my $r;
   eval {
      $r = $es->indices->delete_alias(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("delete_alias: delete_alias failed: [$@]");
   }

   return $r;
}

sub update_alias {
   my $self = shift;
   my ($new_index, $alias) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('update_alias', $new_index) or return;
   $self->brik_help_run_undef_arg('update_alias', $alias) or return;

   # Search for previous index with that alias, if any.
   my $prev_index;
   my $aliases = $self->get_aliases or return;
   while (my ($k, $v) = each %$aliases) {
      for my $this (keys %$v) {
         if ($this eq $alias) {
            $prev_index = $k;
            last;
         }
      }
      last if $prev_index;
   }

   # Delete previous alias if it exists.
   if (defined($prev_index)) {
      $self->delete_alias($prev_index, $alias) or return;
   }

   return $self->put_alias($new_index, $alias);
}

sub is_mapping_exists {
   my $self = shift;
   my ($index, $mapping) = @_;

   $self->brik_help_run_undef_arg('is_mapping_exists', $index) or return;
   $self->brik_help_run_undef_arg('is_mapping_exists', $mapping) or return;

   if (! $self->is_index_exists($index)) {
      return 0;
   }

   my $all = $self->get_mappings($index) or return;
   for my $this_index (keys %$all) {
      my $mappings = $all->{$this_index}{mappings};
      for my $this_mapping (keys %$mappings) {
         if ($this_mapping eq $mapping) {
            return 1;
         }
      }
   }

   return 0;
}

#
# Search::Elasticsearch::Client::2_0::Direct::Indices
#
sub get_mappings {
   my $self = shift;
   my ($index, $type) = @_;

lib/Metabrik/Client/Elasticsearch.pm  view on Meta::CPAN


   return $r;
}

#
# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
#
sub put_template {
   my $self = shift;
   my ($name, $template) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('put_template', $name) or return;
   $self->brik_help_run_undef_arg('put_template', $template) or return;
   $self->brik_help_run_invalid_arg('put_template', $template, 'HASH')
      or return;

   my $r;
   eval {
      $r = $es->indices->put_template(
         name => $name,
         body => $template,
      );
   };
   if ($@) {
      chomp($@);
      return $self->log->error("put_template: template failed ".
         "for name [$name]: [$@]");
   }

   return $r;
}

sub put_mapping_from_json_file {
   my $self = shift;
   my ($index, $type, $json_file) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('put_mapping_from_json_file', $index)
      or return;
   $self->brik_help_run_undef_arg('put_mapping_from_json_file', $type)
      or return;
   $self->brik_help_run_undef_arg('put_mapping_from_json_file', $json_file)
      or return;
   $self->brik_help_run_file_not_found('put_mapping_from_json_file',
      $json_file) or return;

   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
   my $data = $fj->read($json_file) or return;

   if (! exists($data->{mappings})) {
      return $self->log->error("put_mapping_from_json_file: no mapping ".
         "data found");
   }

   return $self->put_mapping($index, $type, $data->{mappings});
}

sub update_mapping_from_json_file {
   my $self = shift;
   my ($json_file, $index, $type) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('update_mapping_from_json_file',
      $json_file) or return;
   $self->brik_help_run_file_not_found('update_mapping_from_json_file',
      $json_file) or return;
   $self->brik_help_run_undef_arg('update_mapping_from_json_file',
      $type) or return;
   $self->brik_help_run_undef_arg('update_mapping_from_json_file',
      $index) or return;

   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
   my $data = $fj->read($json_file) or return;

   if (! exists($data->{mappings})) {
      return $self->log->error("update_mapping_from_json_file: ".
         "no data found");
   }

   my $mappings = $data->{mappings};

   return $self->put_mapping($index, $type, $mappings);
}

sub put_template_from_json_file {
   my $self = shift;
   my ($json_file, $name) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('put_template_from_json_file', $json_file)
      or return;
   $self->brik_help_run_file_not_found('put_template_from_json_file',
      $json_file) or return;

   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
   my $data = $fj->read($json_file) or return;

   if (!defined($name)) {
      ($name) = $json_file =~ m{([^/]+)\.json$};
   }

   if (! defined($name)) {
      return $self->log->error("put_template_from_json_file: no template ".
         "name found");
   }

   return $self->put_template($name, $data);
}

sub update_template_from_json_file {
   my $self = shift;
   my ($json_file, $name) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('update_template_from_json_file',
      $json_file) or return;
   $self->brik_help_run_file_not_found('update_template_from_json_file',
      $json_file) or return;

   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
   my $data = $fj->read($json_file) or return;

   if (!defined($name)) {
      ($name) = $json_file =~ m{([^/]+)\.json$};
   }

   if (! defined($name)) {
      return $self->log->error("put_template_from_json_file: no template ".
         "name found");
   }

   # We ignore errors, template may not exist.
   $self->delete_template($name);

   return $self->put_template($name, $data);
}

#
# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
# Search::Elasticsearch::Client::2_0::Direct::Indices
#
sub get_settings {
   my $self = shift;
   my ($indices, $names) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   my %args = ();
   if (defined($indices)) {
      $self->brik_help_run_undef_arg('get_settings', $indices) or return;
      my $ref = $self->brik_help_run_invalid_arg('get_settings', $indices, 'ARRAY', 'SCALAR')
         or return;
      $args{index} = $indices;
   }
   if (defined($names)) {
      $self->brik_help_run_file_not_found('get_settings', $names) or return;
      my $ref = $self->brik_help_run_invalid_arg('get_settings', $names, 'ARRAY', 'SCALAR')
         or return;
      $args{name} = $names;
   }

   my $r;
   eval {
      $r = $es->indices->get_settings(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("get_settings: failed: [$@]");



( run in 0.988 second using v1.01-cache-2.11-cpan-39bf76dae61 )