Alien-XGBoost

 view release on metacpan or  search on metacpan

xgboost/dmlc-core/src/io/s3_filesys.cc  view on Meta::CPAN

void WriteStream::Write(const void *ptr, size_t size) {
  size_t rlen = buffer_.length();
  buffer_.resize(rlen + size);
  std::memcpy(BeginPtr(buffer_) + rlen, ptr, size);
  if (buffer_.length() >= max_buffer_size_) {
    this->Upload();
  }
}

void WriteStream::Run(const std::string &method,
                      const URI &path,
                      const std::string &args,
                      const std::string &content_type,
                      const std::string &data,
                      std::string *out_header,
                      std::string *out_data) {
  // initialize the curl request
  std::vector<std::string> amz;
  std::string md5str = ComputeMD5(data);
  std::string date = GetDateString();
  std::string signature = Sign(aws_key_, method.c_str(), md5str,
                               content_type, date, amz,
                               std::string("/") + path_.host + '/' +
                               RemoveBeginSlash(path_.name) + args);

  // generate headers
  std::ostringstream sauth, sdate, surl, scontent, smd5;
  std::ostringstream rheader, rdata;
  sauth << "Authorization: AWS " << aws_id_ << ":" << signature;
  sdate << "Date: " << date;

  if (path_.host.find('.', 0) == std::string::npos && aws_region_ == "us-east-1") {
    // for backword compatibility, use virtual host if no period in host and no region was set.
    surl << "https://" << path_.host << ".s3.amazonaws.com" << '/'
         << RemoveBeginSlash(path_.name) << args;
  } else {
    surl << "https://" << getEndpoint(aws_region_) << '/' << path_.host << '/'
         << RemoveBeginSlash(path_.name) << args;
  }
  scontent << "Content-Type: " << content_type;
  // list
  curl_slist *slist = NULL;
  slist = curl_slist_append(slist, sdate.str().c_str());
  slist = curl_slist_append(slist, scontent.str().c_str());
  if (md5str.length() != 0) {
    smd5 << "Content-MD5: " << md5str;
    slist = curl_slist_append(slist, smd5.str().c_str());
  }
  slist = curl_slist_append(slist, sauth.str().c_str());

  int num_retry = 0;
  while (true) {
    // helper for read string
    ReadStringStream ss(data);
    curl_easy_reset(ecurl_);
    CHECK(curl_easy_setopt(ecurl_, CURLOPT_HTTPHEADER, slist) == CURLE_OK);
    CHECK(curl_easy_setopt(ecurl_, CURLOPT_URL, surl.str().c_str()) == CURLE_OK);
    CHECK(curl_easy_setopt(ecurl_, CURLOPT_HEADER, 0L) == CURLE_OK);
    CHECK(curl_easy_setopt(ecurl_, CURLOPT_WRITEFUNCTION, WriteSStreamCallback) == CURLE_OK);
    CHECK(curl_easy_setopt(ecurl_, CURLOPT_WRITEDATA, &rdata) == CURLE_OK);
    CHECK(curl_easy_setopt(ecurl_, CURLOPT_WRITEHEADER, WriteSStreamCallback) == CURLE_OK);
    CHECK(curl_easy_setopt(ecurl_, CURLOPT_HEADERDATA, &rheader) == CURLE_OK);
    CHECK(curl_easy_setopt(ecurl_, CURLOPT_NOSIGNAL, 1) == CURLE_OK);
    if (method == "POST") {
      CHECK(curl_easy_setopt(ecurl_, CURLOPT_POST, 0L) == CURLE_OK);
      CHECK(curl_easy_setopt(ecurl_, CURLOPT_POSTFIELDSIZE, data.length()) == CURLE_OK);
      CHECK(curl_easy_setopt(ecurl_, CURLOPT_POSTFIELDS, BeginPtr(data)) == CURLE_OK);
    } else if (method == "PUT") {
      CHECK(curl_easy_setopt(ecurl_, CURLOPT_PUT, 1L) == CURLE_OK);
      CHECK(curl_easy_setopt(ecurl_, CURLOPT_READDATA, &ss) == CURLE_OK);
      CHECK(curl_easy_setopt(ecurl_, CURLOPT_INFILESIZE_LARGE, data.length()) == CURLE_OK);
      CHECK(curl_easy_setopt(ecurl_, CURLOPT_READFUNCTION, ReadStringStream::Callback) == CURLE_OK);
    }
    CURLcode ret = curl_easy_perform(ecurl_);
    if (ret != CURLE_OK) {
      LOG(INFO) << "request " << surl.str() << "failed with error "
                << curl_easy_strerror(ret) << " Progress "
                << etags_.size() << " uploaded " << " retry=" << num_retry;
      num_retry += 1;
      CHECK(num_retry < max_error_retry_) << " maximum retry time reached";
      curl_easy_cleanup(ecurl_);
      ecurl_ = curl_easy_init();
    } else {
      break;
    }
  }
  curl_slist_free_all(slist);
  *out_header = rheader.str();
  *out_data = rdata.str();
  if (FindHttpError(*out_header) ||
      out_data->find("<Error>") != std::string::npos) {
    LOG(FATAL) << "AWS S3 Error:\n" << *out_header << *out_data;
  }
}
void WriteStream::Init(void) {
  std::string rheader, rdata;
  Run("POST", path_, "?uploads",
      "binary/octel-stream", "", &rheader, &rdata);
  XMLIter xml(rdata.c_str());
  XMLIter upid;
  CHECK(xml.GetNext("UploadId", &upid)) << "missing UploadId";
  upload_id_ = upid.str();
}

void WriteStream::Upload(bool force_upload_even_if_zero_bytes) {
  if (buffer_.length() == 0 && !force_upload_even_if_zero_bytes) return;
  std::ostringstream sarg;
  std::string rheader, rdata;
  size_t partno = etags_.size() + 1;

  sarg << "?partNumber=" << partno << "&uploadId=" << upload_id_;
  Run("PUT", path_, sarg.str(),
      "binary/octel-stream", buffer_, &rheader, &rdata);
  const char *p = strstr(rheader.c_str(), "ETag: ");
  CHECK(p != NULL) << "cannot find ETag in header";
  p = strchr(p, '\"');
  CHECK(p != NULL) << "cannot find ETag in header";
  const char *end = strchr(p + 1, '\"');
  CHECK(end != NULL) << "cannot find ETag in header";

  etags_.push_back(std::string(p, end - p + 1));



( run in 1.286 second using v1.01-cache-2.11-cpan-524268b4103 )