Alien-XGBoost

 view release on metacpan or  search on metacpan

xgboost/dmlc-core/src/io/s3_filesys.cc  view on Meta::CPAN

}

void WriteStream::Run(const std::string &method,
                      const URI &path,
                      const std::string &args,
                      const std::string &content_type,
                      const std::string &data,
                      std::string *out_header,
                      std::string *out_data) {
  // initialize the curl request
  std::vector<std::string> amz;
  std::string md5str = ComputeMD5(data);
  std::string date = GetDateString();
  std::string signature = Sign(aws_key_, method.c_str(), md5str,
                               content_type, date, amz,
                               std::string("/") + path_.host + '/' +
                               RemoveBeginSlash(path_.name) + args);

  // generate headers
  std::ostringstream sauth, sdate, surl, scontent, smd5;
  std::ostringstream rheader, rdata;
  sauth << "Authorization: AWS " << aws_id_ << ":" << signature;
  sdate << "Date: " << date;

  if (path_.host.find('.', 0) == std::string::npos && aws_region_ == "us-east-1") {
    // for backword compatibility, use virtual host if no period in host and no region was set.
    surl << "https://" << path_.host << ".s3.amazonaws.com" << '/'
         << RemoveBeginSlash(path_.name) << args;
  } else {
    surl << "https://" << getEndpoint(aws_region_) << '/' << path_.host << '/'
         << RemoveBeginSlash(path_.name) << args;
  }
  scontent << "Content-Type: " << content_type;
  // list
  curl_slist *slist = NULL;
  slist = curl_slist_append(slist, sdate.str().c_str());
  slist = curl_slist_append(slist, scontent.str().c_str());
  if (md5str.length() != 0) {
    smd5 << "Content-MD5: " << md5str;
    slist = curl_slist_append(slist, smd5.str().c_str());
  }
  slist = curl_slist_append(slist, sauth.str().c_str());

  int num_retry = 0;
  while (true) {
    // helper for read string
    ReadStringStream ss(data);
    curl_easy_reset(ecurl_);
    CHECK(curl_easy_setopt(ecurl_, CURLOPT_HTTPHEADER, slist) == CURLE_OK);
    CHECK(curl_easy_setopt(ecurl_, CURLOPT_URL, surl.str().c_str()) == CURLE_OK);
    CHECK(curl_easy_setopt(ecurl_, CURLOPT_HEADER, 0L) == CURLE_OK);
    CHECK(curl_easy_setopt(ecurl_, CURLOPT_WRITEFUNCTION, WriteSStreamCallback) == CURLE_OK);
    CHECK(curl_easy_setopt(ecurl_, CURLOPT_WRITEDATA, &rdata) == CURLE_OK);
    CHECK(curl_easy_setopt(ecurl_, CURLOPT_WRITEHEADER, WriteSStreamCallback) == CURLE_OK);
    CHECK(curl_easy_setopt(ecurl_, CURLOPT_HEADERDATA, &rheader) == CURLE_OK);
    CHECK(curl_easy_setopt(ecurl_, CURLOPT_NOSIGNAL, 1) == CURLE_OK);
    if (method == "POST") {
      CHECK(curl_easy_setopt(ecurl_, CURLOPT_POST, 0L) == CURLE_OK);
      CHECK(curl_easy_setopt(ecurl_, CURLOPT_POSTFIELDSIZE, data.length()) == CURLE_OK);
      CHECK(curl_easy_setopt(ecurl_, CURLOPT_POSTFIELDS, BeginPtr(data)) == CURLE_OK);
    } else if (method == "PUT") {
      CHECK(curl_easy_setopt(ecurl_, CURLOPT_PUT, 1L) == CURLE_OK);
      CHECK(curl_easy_setopt(ecurl_, CURLOPT_READDATA, &ss) == CURLE_OK);
      CHECK(curl_easy_setopt(ecurl_, CURLOPT_INFILESIZE_LARGE, data.length()) == CURLE_OK);
      CHECK(curl_easy_setopt(ecurl_, CURLOPT_READFUNCTION, ReadStringStream::Callback) == CURLE_OK);
    }
    CURLcode ret = curl_easy_perform(ecurl_);
    if (ret != CURLE_OK) {
      LOG(INFO) << "request " << surl.str() << "failed with error "
                << curl_easy_strerror(ret) << " Progress "
                << etags_.size() << " uploaded " << " retry=" << num_retry;
      num_retry += 1;
      CHECK(num_retry < max_error_retry_) << " maximum retry time reached";
      curl_easy_cleanup(ecurl_);
      ecurl_ = curl_easy_init();
    } else {
      break;
    }
  }
  curl_slist_free_all(slist);
  *out_header = rheader.str();
  *out_data = rdata.str();
  if (FindHttpError(*out_header) ||
      out_data->find("<Error>") != std::string::npos) {
    LOG(FATAL) << "AWS S3 Error:\n" << *out_header << *out_data;
  }
}
void WriteStream::Init(void) {
  std::string rheader, rdata;
  Run("POST", path_, "?uploads",
      "binary/octel-stream", "", &rheader, &rdata);
  XMLIter xml(rdata.c_str());
  XMLIter upid;
  CHECK(xml.GetNext("UploadId", &upid)) << "missing UploadId";
  upload_id_ = upid.str();
}

void WriteStream::Upload(bool force_upload_even_if_zero_bytes) {
  if (buffer_.length() == 0 && !force_upload_even_if_zero_bytes) return;
  std::ostringstream sarg;
  std::string rheader, rdata;
  size_t partno = etags_.size() + 1;

  sarg << "?partNumber=" << partno << "&uploadId=" << upload_id_;
  Run("PUT", path_, sarg.str(),
      "binary/octel-stream", buffer_, &rheader, &rdata);
  const char *p = strstr(rheader.c_str(), "ETag: ");
  CHECK(p != NULL) << "cannot find ETag in header";
  p = strchr(p, '\"');
  CHECK(p != NULL) << "cannot find ETag in header";
  const char *end = strchr(p + 1, '\"');
  CHECK(end != NULL) << "cannot find ETag in header";

  etags_.push_back(std::string(p, end - p + 1));
  part_ids_.push_back(partno);
  buffer_.clear();
}

void WriteStream::Finish(void) {
  std::ostringstream sarg, sdata;
  std::string rheader, rdata;
  sarg << "?uploadId=" << upload_id_;
  sdata << "<CompleteMultipartUpload>\n";
  CHECK(etags_.size() == part_ids_.size());
  for (size_t i = 0; i < etags_.size(); ++i) {
    sdata << " <Part>\n"
          << "  <PartNumber>" << part_ids_[i] << "</PartNumber>\n"
          << "  <ETag>" << etags_[i] << "</ETag>\n"
          << " </Part>\n";
  }
  sdata << "</CompleteMultipartUpload>\n";
  Run("POST", path_, sarg.str(),
      "text/xml", sdata.str(), &rheader, &rdata);
}
/*!
 * \brief list the objects in the bucket with prefix specified by path.name
 * \param path the path to query
 * \param aws_id access id of aws
 * \param aws_key access key of aws
 * \paam out_list stores the output results
 */
void ListObjects(const URI &path,
                 const std::string aws_id,
                 const std::string aws_key,
                 const std::string aws_region,
                 std::vector<FileInfo> *out_list) {
  CHECK(path.host.length() != 0) << "bucket name not specified in s3";
  out_list->clear();
  std::vector<std::string> amz;
  std::string date = GetDateString();
  std::string signature = Sign(aws_key, "GET", "", "", date, amz,
                               std::string("/") + path.host + "/");

  std::ostringstream sauth, sdate, surl;
  std::ostringstream result;
  sauth << "Authorization: AWS " << aws_id << ":" << signature;
  sdate << "Date: " << date;

  if (path.host.find('.', 0) == std::string::npos && aws_region == "us-east-1") {
    // for backword compatibility, use virtual host if no period in host and no region was set.
    surl << "https://" << path.host << ".s3.amazonaws.com"
         << "/?delimiter=/&prefix=" << RemoveBeginSlash(path.name);
  } else {
    surl << "https://" << getEndpoint(aws_region) << "/" << path.host
         << "/?delimiter=/&prefix=" << RemoveBeginSlash(path.name);



( run in 0.720 second using v1.01-cache-2.11-cpan-39bf76dae61 )