Alien-XGBoost
view release on metacpan or search on metacpan
xgboost/dmlc-core/src/io/s3_filesys.cc view on Meta::CPAN
}
void WriteStream::Run(const std::string &method,
const URI &path,
const std::string &args,
const std::string &content_type,
const std::string &data,
std::string *out_header,
std::string *out_data) {
// initialize the curl request
std::vector<std::string> amz;
std::string md5str = ComputeMD5(data);
std::string date = GetDateString();
std::string signature = Sign(aws_key_, method.c_str(), md5str,
content_type, date, amz,
std::string("/") + path_.host + '/' +
RemoveBeginSlash(path_.name) + args);
// generate headers
std::ostringstream sauth, sdate, surl, scontent, smd5;
std::ostringstream rheader, rdata;
sauth << "Authorization: AWS " << aws_id_ << ":" << signature;
sdate << "Date: " << date;
if (path_.host.find('.', 0) == std::string::npos && aws_region_ == "us-east-1") {
// for backword compatibility, use virtual host if no period in host and no region was set.
surl << "https://" << path_.host << ".s3.amazonaws.com" << '/'
<< RemoveBeginSlash(path_.name) << args;
} else {
surl << "https://" << getEndpoint(aws_region_) << '/' << path_.host << '/'
<< RemoveBeginSlash(path_.name) << args;
}
scontent << "Content-Type: " << content_type;
// list
curl_slist *slist = NULL;
slist = curl_slist_append(slist, sdate.str().c_str());
slist = curl_slist_append(slist, scontent.str().c_str());
if (md5str.length() != 0) {
smd5 << "Content-MD5: " << md5str;
slist = curl_slist_append(slist, smd5.str().c_str());
}
slist = curl_slist_append(slist, sauth.str().c_str());
int num_retry = 0;
while (true) {
// helper for read string
ReadStringStream ss(data);
curl_easy_reset(ecurl_);
CHECK(curl_easy_setopt(ecurl_, CURLOPT_HTTPHEADER, slist) == CURLE_OK);
CHECK(curl_easy_setopt(ecurl_, CURLOPT_URL, surl.str().c_str()) == CURLE_OK);
CHECK(curl_easy_setopt(ecurl_, CURLOPT_HEADER, 0L) == CURLE_OK);
CHECK(curl_easy_setopt(ecurl_, CURLOPT_WRITEFUNCTION, WriteSStreamCallback) == CURLE_OK);
CHECK(curl_easy_setopt(ecurl_, CURLOPT_WRITEDATA, &rdata) == CURLE_OK);
CHECK(curl_easy_setopt(ecurl_, CURLOPT_WRITEHEADER, WriteSStreamCallback) == CURLE_OK);
CHECK(curl_easy_setopt(ecurl_, CURLOPT_HEADERDATA, &rheader) == CURLE_OK);
CHECK(curl_easy_setopt(ecurl_, CURLOPT_NOSIGNAL, 1) == CURLE_OK);
if (method == "POST") {
CHECK(curl_easy_setopt(ecurl_, CURLOPT_POST, 0L) == CURLE_OK);
CHECK(curl_easy_setopt(ecurl_, CURLOPT_POSTFIELDSIZE, data.length()) == CURLE_OK);
CHECK(curl_easy_setopt(ecurl_, CURLOPT_POSTFIELDS, BeginPtr(data)) == CURLE_OK);
} else if (method == "PUT") {
CHECK(curl_easy_setopt(ecurl_, CURLOPT_PUT, 1L) == CURLE_OK);
CHECK(curl_easy_setopt(ecurl_, CURLOPT_READDATA, &ss) == CURLE_OK);
CHECK(curl_easy_setopt(ecurl_, CURLOPT_INFILESIZE_LARGE, data.length()) == CURLE_OK);
CHECK(curl_easy_setopt(ecurl_, CURLOPT_READFUNCTION, ReadStringStream::Callback) == CURLE_OK);
}
CURLcode ret = curl_easy_perform(ecurl_);
if (ret != CURLE_OK) {
LOG(INFO) << "request " << surl.str() << "failed with error "
<< curl_easy_strerror(ret) << " Progress "
<< etags_.size() << " uploaded " << " retry=" << num_retry;
num_retry += 1;
CHECK(num_retry < max_error_retry_) << " maximum retry time reached";
curl_easy_cleanup(ecurl_);
ecurl_ = curl_easy_init();
} else {
break;
}
}
curl_slist_free_all(slist);
*out_header = rheader.str();
*out_data = rdata.str();
if (FindHttpError(*out_header) ||
out_data->find("<Error>") != std::string::npos) {
LOG(FATAL) << "AWS S3 Error:\n" << *out_header << *out_data;
}
}
void WriteStream::Init(void) {
std::string rheader, rdata;
Run("POST", path_, "?uploads",
"binary/octel-stream", "", &rheader, &rdata);
XMLIter xml(rdata.c_str());
XMLIter upid;
CHECK(xml.GetNext("UploadId", &upid)) << "missing UploadId";
upload_id_ = upid.str();
}
void WriteStream::Upload(bool force_upload_even_if_zero_bytes) {
if (buffer_.length() == 0 && !force_upload_even_if_zero_bytes) return;
std::ostringstream sarg;
std::string rheader, rdata;
size_t partno = etags_.size() + 1;
sarg << "?partNumber=" << partno << "&uploadId=" << upload_id_;
Run("PUT", path_, sarg.str(),
"binary/octel-stream", buffer_, &rheader, &rdata);
const char *p = strstr(rheader.c_str(), "ETag: ");
CHECK(p != NULL) << "cannot find ETag in header";
p = strchr(p, '\"');
CHECK(p != NULL) << "cannot find ETag in header";
const char *end = strchr(p + 1, '\"');
CHECK(end != NULL) << "cannot find ETag in header";
etags_.push_back(std::string(p, end - p + 1));
part_ids_.push_back(partno);
buffer_.clear();
}
void WriteStream::Finish(void) {
std::ostringstream sarg, sdata;
std::string rheader, rdata;
sarg << "?uploadId=" << upload_id_;
sdata << "<CompleteMultipartUpload>\n";
CHECK(etags_.size() == part_ids_.size());
for (size_t i = 0; i < etags_.size(); ++i) {
sdata << " <Part>\n"
<< " <PartNumber>" << part_ids_[i] << "</PartNumber>\n"
<< " <ETag>" << etags_[i] << "</ETag>\n"
<< " </Part>\n";
}
sdata << "</CompleteMultipartUpload>\n";
Run("POST", path_, sarg.str(),
"text/xml", sdata.str(), &rheader, &rdata);
}
/*!
* \brief list the objects in the bucket with prefix specified by path.name
* \param path the path to query
* \param aws_id access id of aws
* \param aws_key access key of aws
* \paam out_list stores the output results
*/
void ListObjects(const URI &path,
const std::string aws_id,
const std::string aws_key,
const std::string aws_region,
std::vector<FileInfo> *out_list) {
CHECK(path.host.length() != 0) << "bucket name not specified in s3";
out_list->clear();
std::vector<std::string> amz;
std::string date = GetDateString();
std::string signature = Sign(aws_key, "GET", "", "", date, amz,
std::string("/") + path.host + "/");
std::ostringstream sauth, sdate, surl;
std::ostringstream result;
sauth << "Authorization: AWS " << aws_id << ":" << signature;
sdate << "Date: " << date;
if (path.host.find('.', 0) == std::string::npos && aws_region == "us-east-1") {
// for backword compatibility, use virtual host if no period in host and no region was set.
surl << "https://" << path.host << ".s3.amazonaws.com"
<< "/?delimiter=/&prefix=" << RemoveBeginSlash(path.name);
} else {
surl << "https://" << getEndpoint(aws_region) << "/" << path.host
<< "/?delimiter=/&prefix=" << RemoveBeginSlash(path.name);
( run in 0.720 second using v1.01-cache-2.11-cpan-39bf76dae61 )