Alien-XGBoost

 view release on metacpan or  search on metacpan

xgboost/rabit/src/allreduce_robust.cc  view on Meta::CPAN

      }
    }
  }
  // wait all ack
  for (int i = 0; i < nlink; ++i) {
    if (!all_links[i].sock.BadSocket()) {
      char ack;
      ssize_t len = all_links[i].sock.Recv(&ack, sizeof(ack), MSG_WAITALL);
      if (len == 0) {
        all_links[i].sock.Close(); continue;
      } else if (len > 0) {
        utils::Assert(ack == kResetAck, "wrong Ack MSG");
      } else {
        utils::Assert(errno != EAGAIN|| errno != EWOULDBLOCK, "BUG");
      }
      // set back to nonblock mode
      all_links[i].sock.SetNonBlock(true);
    }
  }
  for (int i = 0; i < nlink; ++i) {
    if (all_links[i].sock.BadSocket()) return kSockError;
  }
  return kSuccess;
}
/*!
 * \brief if err_type indicates an error
 *         recover links according to the error type reported
 *        if there is no error, return true
 * \param err_type the type of error happening in the system
 * \return true if err_type is kSuccess, false otherwise
 */
bool AllreduceRobust::CheckAndRecover(ReturnType err_type) {
  if (err_type == kSuccess) return true;
  utils::Assert(err_link != NULL, "must know the error source");
  recover_counter += 1;
  {
    // simple way, shutdown all links
    for (size_t i = 0; i < all_links.size(); ++i) {
      if (!all_links[i].sock.BadSocket()) all_links[i].sock.Close();
    }
    ReConnectLinks("recover");
    return false;
  }
  // this was old way
  // TryResetLinks still causes possible errors, so not use this one
  while (err_type != kSuccess) {
    switch (err_type.value) {
      case kGetExcept: err_type = TryResetLinks(); break;
      case kSockError: {
        TryResetLinks();
        ReConnectLinks();
        err_type = kSuccess;
        break;
      }
      default: utils::Assert(false, "RecoverLinks: cannot reach here");
    }
  }
  return false;
}
/*!
 * \brief message passing function, used to decide the
 *        shortest distance to the possible source of data
 * \param node_value a pair of have_data and size
 *           have_data whether current node have data
 *           size gives the size of data, if current node is kHaveData
 * \param dist_in the shorest to any data source distance in each direction
 * \param out_index the edge index of output link
 * \return the shorest distance result of out edge specified by out_index
 */
inline std::pair<int, size_t>
ShortestDist(const std::pair<bool, size_t> &node_value,
             const std::vector< std::pair<int, size_t> > &dist_in,
             size_t out_index) {
  if (node_value.first) {
    return std::make_pair(1, node_value.second);
  }
  size_t size = 0;
  int res = std::numeric_limits<int>::max();
  for (size_t i = 0; i < dist_in.size(); ++i) {
    if (i == out_index) continue;
    if (dist_in[i].first == std::numeric_limits<int>::max()) continue;
    if (dist_in[i].first + 1 < res) {
      res = dist_in[i].first + 1;
      size = dist_in[i].second;
    }
  }
  // add one hop

  return std::make_pair(res, size);
}
/*!
 * \brief message passing function, used to decide the
 *    data request from each edge, whether need to request data from certain edge
 * \param node_value a pair of request_data and best_link
 *           request_data stores whether current node need to request data
 *           best_link gives the best edge index to fetch the data
 * \param req_in the data request from incoming edges
 * \param out_index the edge index of output link
 * \return the request to the output edge
 */
inline char DataRequest(const std::pair<bool, int> &node_value,
                        const std::vector<char> &req_in,
                        size_t out_index) {
  // whether current node need to request data
  bool request_data = node_value.first;
  // which edge index is the best link to request data
  // can be -1, which means current node contains data
  const int best_link = node_value.second;
  if (static_cast<int>(out_index) == best_link) {
    if (request_data) return 1;
    for (size_t i = 0; i < req_in.size(); ++i) {
      if (i == out_index) continue;
      if (req_in[i] != 0) return 1;
    }
  }
  return 0;
}
/*!
 * \brief try to decide the recovery message passing request
 * \param role the current role of the node
 * \param p_size used to store the size of the message, for node in state kHaveData,
 *               this size must be set correctly before calling the function
 *               for others, this surves as output parameter
 *
 * \param p_recvlink used to store the link current node should recv data from, if necessary
 *          this can be -1, which means current node have the data
 * \param p_req_in used to store the resulting vector, indicating which link we should send the data to
 *
 * \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details
 * \sa ReturnType
 */
AllreduceRobust::ReturnType
AllreduceRobust::TryDecideRouting(AllreduceRobust::RecoverType role,
                                  size_t *p_size,
                                  int *p_recvlink,
                                  std::vector<bool> *p_req_in) {
  int best_link = -2;
  {
    // get the shortest distance to the request point
    std::vector<std::pair<int, size_t> > dist_in, dist_out;
    ReturnType succ = MsgPassing(std::make_pair(role == kHaveData, *p_size),
                                 &dist_in, &dist_out, ShortestDist);
    if (succ != kSuccess) return succ;
    if (role != kHaveData) {
      for (size_t i = 0; i < dist_in.size(); ++i) {
        if (dist_in[i].first != std::numeric_limits<int>::max()) {
          utils::Check(best_link == -2 || *p_size == dist_in[i].second,
                       "[%d] Allreduce size inconsistent, distin=%lu, size=%lu, reporting=%lu\n",
                       rank, dist_in[i].first, *p_size, dist_in[i].second);
          if (best_link == -2 || dist_in[i].first < dist_in[best_link].first) {
            best_link = static_cast<int>(i);
            *p_size = dist_in[i].second;
          }
        }
      }
      utils::Check(best_link != -2, "Too many nodes went down and we cannot recover..");
    } else {
      best_link = -1;
    }
  }
  // get the node request
  std::vector<char> req_in, req_out;
  ReturnType succ = MsgPassing(std::make_pair(role == kRequestData, best_link),
                               &req_in, &req_out, DataRequest);
  if (succ != kSuccess) return succ;
  // set p_req_in
  p_req_in->resize(req_in.size());
  for (size_t i = 0; i < req_in.size(); ++i) {
    // set p_req_in
    (*p_req_in)[i] = (req_in[i] != 0);
    if (req_out[i] != 0) {
      utils::Assert(req_in[i] == 0, "cannot get and receive request");
      utils::Assert(static_cast<int>(i) == best_link, "request result inconsistent");
    }
  }
  *p_recvlink = best_link;
  return kSuccess;
}
/*!
 * \brief try to finish the data recovery request,
 *        this function is used together with TryDecideRouting
 * \param role the current role of the node
 * \param sendrecvbuf_ the buffer to store the data to be sent/recived
 *          - if the role is kHaveData, this stores the data to be sent
 *          - if the role is kRequestData, this is the buffer to store the result
 *          - if the role is kPassData, this will not be used, and can be NULL
 * \param size the size of the data, obtained from TryDecideRouting
 * \param recv_link the link index to receive data, if necessary, obtained from TryDecideRouting
 * \param req_in the request of each link to send data, obtained from TryDecideRouting
 *
 * \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details
 * \sa ReturnType, TryDecideRouting
 */
AllreduceRobust::ReturnType
AllreduceRobust::TryRecoverData(RecoverType role,
                                void *sendrecvbuf_,
                                size_t size,
                                int recv_link,
                                const std::vector<bool> &req_in) {
  RefLinkVector &links = tree_links;
  // no need to run recovery for zero size messages
  if (links.size() == 0 || size == 0) return kSuccess;
  utils::Assert(req_in.size() == links.size(), "TryRecoverData");
  const int nlink = static_cast<int>(links.size());
  {
    bool req_data = role == kRequestData;
    for (int i = 0; i < nlink; ++i) {
      if (req_in[i]) {
        utils::Assert(i != recv_link, "TryDecideRouting");
        req_data = true;
      }
    }
    // do not need to provide data or receive data, directly exit
    if (!req_data) return kSuccess;
  }
  utils::Assert(recv_link >= 0 || role == kHaveData, "recv_link must be active");
  if (role == kPassData) {
    links[recv_link].InitBuffer(1, size, reduce_buffer_size);
  }
  for (int i = 0; i < nlink; ++i) {
    links[i].ResetSize();
  }
  while (true) {
    bool finished = true;
    utils::SelectHelper selecter;
    for (int i = 0; i < nlink; ++i) {
      if (i == recv_link && links[i].size_read != size) {
        selecter.WatchRead(links[i].sock);
        finished = false;
      }
      if (req_in[i] && links[i].size_write != size) {
        if (role == kHaveData ||
            (links[recv_link].size_read != links[i].size_write)) {
          selecter.WatchWrite(links[i].sock);
        }
        finished = false;
      }
      selecter.WatchException(links[i].sock);
    }
    if (finished) break;
    selecter.Select();
    // exception handling
    for (int i = 0; i < nlink; ++i) {
      if (selecter.CheckExcept(links[i].sock)) {
        return ReportError(&links[i], kGetExcept);
      }
    }
    if (role == kRequestData) {
      const int pid = recv_link;
      if (selecter.CheckRead(links[pid].sock)) {
        ReturnType ret = links[pid].ReadToArray(sendrecvbuf_, size);
        if (ret != kSuccess) {
          return ReportError(&links[pid], ret);
        }
      }
      for (int i = 0; i < nlink; ++i) {
        if (req_in[i] && links[i].size_write != links[pid].size_read) {
          ReturnType ret = links[i].WriteFromArray(sendrecvbuf_, links[pid].size_read);
          if (ret != kSuccess) {
            return ReportError(&links[i], ret);
          }
        }
      }
    }
    if (role == kHaveData) {
      for (int i = 0; i < nlink; ++i) {
        if (req_in[i] && links[i].size_write != size) {
          ReturnType ret = links[i].WriteFromArray(sendrecvbuf_, size);
          if (ret != kSuccess) {

xgboost/rabit/src/allreduce_robust.cc  view on Meta::CPAN

      }
    }
  }
  return kSuccess;
}
/*!
 * \brief try to load check point
 *
 *        This is a collaborative function called by all nodes
 *        only the nodes with requester set to true really needs to load the check point
 *        other nodes acts as collaborative roles to complete this request
 *
 * \param requester whether current node is the requester
 * \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details
 * \sa ReturnType
 */
AllreduceRobust::ReturnType AllreduceRobust::TryLoadCheckPoint(bool requester) {
  // check in local data
  RecoverType role =  requester ? kRequestData : kHaveData;
  ReturnType succ;
  if (num_local_replica != 0) {
    if (requester) {
      // clear existing history, if any, before load
      local_rptr[local_chkpt_version].clear();
      local_chkpt[local_chkpt_version].clear();
    }
    // recover local checkpoint
    succ = TryRecoverLocalState(&local_rptr[local_chkpt_version],
                                &local_chkpt[local_chkpt_version]);
    if (succ != kSuccess) return succ;
    int nlocal = std::max(static_cast<int>(local_rptr[local_chkpt_version].size()) - 1, 0);
    // check if everyone is OK
    unsigned state = 0;
    if (nlocal == num_local_replica + 1) {
      // complete recovery
      state = 1;
    } else if (nlocal == 0) {
      // get nothing
      state = 2;
    } else {
      // partially complete state
      state = 4;
    }
    succ = TryAllreduce(&state, sizeof(state), 1, op::Reducer<op::BitOR, unsigned>);
    if (succ != kSuccess) return succ;
    utils::Check(state == 1 || state == 2,
                 "LoadCheckPoint: too many nodes fails, cannot recover local state");
  }
  // do call save model if the checkpoint was lazy
  if (role == kHaveData && global_lazycheck != NULL) {
    global_checkpoint.resize(0);
    utils::MemoryBufferStream fs(&global_checkpoint);
    fs.Write(&version_number, sizeof(version_number));
    global_lazycheck->Save(&fs);
    global_lazycheck = NULL;
  }
  // recover global checkpoint
  size_t size = this->global_checkpoint.length();
  int recv_link;
  std::vector<bool> req_in;
  succ = TryDecideRouting(role, &size, &recv_link, &req_in);
  if (succ != kSuccess) return succ;
  if (role == kRequestData) {
    global_checkpoint.resize(size);
  }
  if (size == 0) return kSuccess;
  return TryRecoverData(role, BeginPtr(global_checkpoint), size, recv_link, req_in);
}
/*!
 * \brief try to get the result of operation specified by seqno
 *
 *        This is a collaborative function called by all nodes
 *        only the nodes with requester set to true really needs to get the result
 *        other nodes acts as collaborative roles to complete this request
 *
 * \param buf the buffer to store the result, this parameter is only used when current node is requester
 * \param size the total size of the buffer, this parameter is only used when current node is requester
 * \param seqno sequence number of the operation, this is unique index of a operation in current iteration
 * \param requester whether current node is the requester
 * \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details
 * \sa ReturnType
 */
AllreduceRobust::ReturnType
AllreduceRobust::TryGetResult(void *sendrecvbuf, size_t size, int seqno, bool requester) {
  // if minimum sequence requested is local check point ack,
  // this means all nodes have finished local check point, directly return
  if (seqno == ActionSummary::kLocalCheckAck) return kSuccess;
  if (seqno == ActionSummary::kLocalCheckPoint) {
    // new version of local model
    int new_version = !local_chkpt_version;
    int nlocal = std::max(static_cast<int>(local_rptr[new_version].size()) - 1, 0);
    // if we goes to this place, use must have already setup the state once
    utils::Assert(nlocal == 1 || nlocal == num_local_replica + 1,
                  "TryGetResult::Checkpoint");
    return TryRecoverLocalState(&local_rptr[new_version], &local_chkpt[new_version]);
  }
  // handles normal data recovery
  RecoverType role;
  if (!requester) {
    sendrecvbuf = resbuf.Query(seqno, &size);
    role = sendrecvbuf != NULL ? kHaveData : kPassData;
  } else {
    role = kRequestData;
  }
  int recv_link;
  std::vector<bool> req_in;
  // size of data
  size_t data_size = size;
  ReturnType succ = TryDecideRouting(role, &data_size, &recv_link, &req_in);
  if (succ != kSuccess) return succ;
  utils::Check(data_size != 0, "zero size check point is not allowed");
  if (role == kRequestData || role == kHaveData) {
    utils::Check(data_size == size,
                 "Allreduce Recovered data size do not match the specification of function call.\n"\
                 "Please check if calling sequence of recovered program is the " \
                 "same the original one in current VersionNumber");
  }
  return TryRecoverData(role, sendrecvbuf, data_size, recv_link, req_in);
}
/*!
 * \brief try to run recover execution for a request action described by flag and seqno,
 *        the function will keep blocking to run possible recovery operations before the specified action,
 *        until the requested result is received by a recovering procedure,
 *        or the function discovers that the requested action is not yet executed, and return false
 *
 * \param buf the buffer to store the result
 * \param size the total size of the buffer
 * \param flag flag information about the action \sa ActionSummary
 * \param seqno sequence number of the action, if it is special action with flag set,
 *              seqno needs to be set to ActionSummary::kSpecialOp
 *
 * \return if this function can return true or false
 *    - true means buf already set to the
 *           result by recovering procedure, the action is complete, no further action is needed
 *    - false means this is the lastest action that has not yet been executed, need to execute the action
 */
bool AllreduceRobust::RecoverExec(void *buf, size_t size, int flag, int seqno) {
  if (flag != 0) {
    utils::Assert(seqno == ActionSummary::kSpecialOp, "must only set seqno for normal operations");
  }
  // request
  ActionSummary req(flag, seqno);
  while (true) {
    this->ReportStatus();
    // action
    ActionSummary act = req;
    // get the reduced action
    if (!CheckAndRecover(TryAllreduce(&act, sizeof(act), 1, ActionSummary::Reducer))) continue;
    if (act.check_ack()) {
      if (act.check_point()) {
        // if we also have check_point, do check point first
        utils::Assert(!act.diff_seq(),
                      "check ack & check pt  cannot occur together with normal ops");
        // if we requested checkpoint, we are free to go
        if (req.check_point()) return true;
      } else if (act.load_check()) {
        // if there is only check_ack and load_check, do load_check
        if (!CheckAndRecover(TryLoadCheckPoint(req.load_check()))) continue;
        // if requested load check, then misson complete
        if (req.load_check()) return true;
      } else {
        // there is no check point and no load check, execute check ack
        if (req.check_ack()) return true;
      }
      // if execute to this point
      // this means the action requested has not been completed
      // try next round
    } else {
      if (act.check_point()) {



( run in 0.550 second using v1.01-cache-2.11-cpan-39bf76dae61 )