RocksDB

 view release on metacpan or  search on metacpan

vendor/rocksdb/utilities/blob_db/blob_db_impl.cc  view on Meta::CPAN

      // Continue to next blob record or retry.
      continue;
    }

    // Relocate the blob record to new file.
    if (!newfile) {
      // new file
      std::string reason("GC of ");
      reason += bfptr->PathName();
      newfile = NewBlobFile(reason);

      new_writer = CheckOrCreateWriterLocked(newfile);
      // Can't use header beyond this point
      newfile->header_ = std::move(header);
      newfile->header_valid_ = true;
      newfile->file_size_ = BlobLogHeader::kSize;
      newfile->SetColumnFamilyId(bfptr->column_family_id());
      newfile->SetHasTTL(bfptr->HasTTL());
      newfile->SetCompression(bfptr->compression());
      newfile->expiration_range_ = bfptr->expiration_range_;

      s = new_writer->WriteHeader(newfile->header_);
      if (!s.ok()) {
        ROCKS_LOG_ERROR(db_options_.info_log,
                        "File: %s - header writing failed",
                        newfile->PathName().c_str());
        break;
      }

      // We don't add the file to open_ttl_files_ or open_non_ttl_files_, to
      // avoid user writes writing to the file, and avoid
      // EvictExpiredFiles close the file by mistake.
      WriteLock wl(&mutex_);
      blob_files_.insert(std::make_pair(newfile->BlobFileNumber(), newfile));
    }

    std::string new_index_entry;
    uint64_t new_blob_offset = 0;
    uint64_t new_key_offset = 0;
    // write the blob to the blob log.
    s = new_writer->AddRecord(record.key, record.value, record.expiration,
                              &new_key_offset, &new_blob_offset);

    BlobIndex::EncodeBlob(&new_index_entry, newfile->BlobFileNumber(),
                          new_blob_offset, record.value.size(),
                          bdb_options_.compression);

    newfile->blob_count_++;
    newfile->file_size_ +=
        BlobLogRecord::kHeaderSize + record.key.size() + record.value.size();

    TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate");
    WriteBatch rewrite_batch;
    Status rewrite_status = WriteBatchInternal::PutBlobIndex(
        &rewrite_batch, column_family_id, record.key, new_index_entry);
    if (rewrite_status.ok()) {
      rewrite_status = db_impl_->WriteWithCallback(WriteOptions(),
                                                   &rewrite_batch, &callback);
    }
    if (rewrite_status.ok()) {
      gc_stats->num_keys_relocated++;
      gc_stats->bytes_relocated += record.record_size();
    } else if (rewrite_status.IsBusy()) {
      // The key is overwritten in the meanwhile. Drop the blob record.
      gc_stats->num_keys_overwritten++;
      gc_stats->bytes_overwritten += record.record_size();
    } else {
      // We hit an error.
      s = rewrite_status;
      ROCKS_LOG_ERROR(db_options_.info_log, "Error while relocating key: %s",
                      s.ToString().c_str());
      break;
    }
  }  // end of ReadRecord loop

  {
    WriteLock wl(&mutex_);
    ObsoleteBlobFile(bfptr, GetLatestSequenceNumber(), true /*update_size*/);
  }

  ROCKS_LOG_INFO(
      db_options_.info_log,
      "%s blob file %" PRIu64 ". Total blob records: %" PRIu64
      ", expired: %" PRIu64 " keys/%" PRIu64
      " bytes, updated or deleted by user: %" PRIu64 " keys/%" PRIu64
      " bytes, rewrite to new file: %" PRIu64 " keys/%" PRIu64 " bytes.",
      s.ok() ? "Successfully garbage collected" : "Failed to garbage collect",
      bfptr->BlobFileNumber(), gc_stats->blob_count, gc_stats->num_keys_expired,
      gc_stats->bytes_expired, gc_stats->num_keys_overwritten,
      gc_stats->bytes_overwritten, gc_stats->num_keys_relocated,
      gc_stats->bytes_relocated);
  RecordTick(statistics_, BLOB_DB_GC_NUM_FILES);
  RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_OVERWRITTEN,
             gc_stats->num_keys_overwritten);
  RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_EXPIRED,
             gc_stats->num_keys_expired);
  RecordTick(statistics_, BLOB_DB_GC_BYTES_OVERWRITTEN,
             gc_stats->bytes_overwritten);
  RecordTick(statistics_, BLOB_DB_GC_BYTES_EXPIRED, gc_stats->bytes_expired);
  if (newfile != nullptr) {
    {
      MutexLock l(&write_mutex_);
      CloseBlobFile(newfile);
    }
    total_blob_size_ += newfile->file_size_;
    ROCKS_LOG_INFO(db_options_.info_log, "New blob file %" PRIu64 ".",
                   newfile->BlobFileNumber());
    RecordTick(statistics_, BLOB_DB_GC_NUM_NEW_FILES);
    RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_RELOCATED,
               gc_stats->num_keys_relocated);
    RecordTick(statistics_, BLOB_DB_GC_BYTES_RELOCATED,
               gc_stats->bytes_relocated);
  }
  if (!s.ok()) {
    RecordTick(statistics_, BLOB_DB_GC_FAILURES);
  }
  return s;
}

std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
  if (aborted) return std::make_pair(false, -1);

  {
    ReadLock rl(&mutex_);
    if (obsolete_files_.empty()) return std::make_pair(true, -1);
  }

  std::list<std::shared_ptr<BlobFile>> tobsolete;
  {
    WriteLock wl(&mutex_);
    tobsolete.swap(obsolete_files_);
  }

  bool file_deleted = false;
  for (auto iter = tobsolete.begin(); iter != tobsolete.end();) {
    auto bfile = *iter;
    {
      ReadLock lockbfile_r(&bfile->mutex_);
      if (VisibleToActiveSnapshot(bfile)) {
        ROCKS_LOG_INFO(db_options_.info_log,
                       "Could not delete file due to snapshot failure %s",
                       bfile->PathName().c_str());
        ++iter;
        continue;
      }
    }
    ROCKS_LOG_INFO(db_options_.info_log,
                   "Will delete file due to snapshot success %s",
                   bfile->PathName().c_str());

    blob_files_.erase(bfile->BlobFileNumber());
    Status s = env_->DeleteFile(bfile->PathName());
    if (!s.ok()) {
      ROCKS_LOG_ERROR(db_options_.info_log,
                      "File failed to be deleted as obsolete %s",
                      bfile->PathName().c_str());
      ++iter;
      continue;
    }

    file_deleted = true;
    ROCKS_LOG_INFO(db_options_.info_log,
                   "File deleted as obsolete from blob dir %s",
                   bfile->PathName().c_str());

    iter = tobsolete.erase(iter);
  }

  // directory change. Fsync
  if (file_deleted) {
    dir_ent_->Fsync();
  }



( run in 0.543 second using v1.01-cache-2.11-cpan-5511b514fd6 )