Alien-XGBoost

 view release on metacpan or  search on metacpan

xgboost/rabit/src/allreduce_base.cc  view on Meta::CPAN

      int hport, hrank;
      std::string hname;
      tracker.RecvStr(&hname);
      Assert(tracker.RecvAll(&hport, sizeof(hport)) == sizeof(hport),
             "ReConnectLink failure 9");
      Assert(tracker.RecvAll(&hrank, sizeof(hrank)) == sizeof(hrank),
             "ReConnectLink failure 10");
      r.sock.Create();
      if (!r.sock.Connect(utils::SockAddr(hname.c_str(), hport))) {
        num_error += 1; r.sock.Close(); continue;
      }
      Assert(r.sock.SendAll(&rank, sizeof(rank)) == sizeof(rank),
             "ReConnectLink failure 12");
      Assert(r.sock.RecvAll(&r.rank, sizeof(r.rank)) == sizeof(r.rank),
             "ReConnectLink failure 13");
      utils::Check(hrank == r.rank,
                   "ReConnectLink failure, link rank inconsistent");
      bool match = false;
      for (size_t i = 0; i < all_links.size(); ++i) {
        if (all_links[i].rank == hrank) {
          Assert(all_links[i].sock.IsClosed(),
                 "Override a link that is active");
          all_links[i].sock = r.sock; match = true; break;
        }
      }
      if (!match) all_links.push_back(r);
    }
    Assert(tracker.SendAll(&num_error, sizeof(num_error)) == sizeof(num_error),
           "ReConnectLink failure 14");
  } while (num_error != 0);
  // send back socket listening port to tracker
  Assert(tracker.SendAll(&port, sizeof(port)) == sizeof(port),
         "ReConnectLink failure 14");
  // close connection to tracker
  tracker.Close();
  // listen to incoming links
  for (int i = 0; i < num_accept; ++i) {
    LinkRecord r;
    r.sock = sock_listen.Accept();
    Assert(r.sock.SendAll(&rank, sizeof(rank)) == sizeof(rank),
           "ReConnectLink failure 15");
    Assert(r.sock.RecvAll(&r.rank, sizeof(r.rank)) == sizeof(r.rank),
           "ReConnectLink failure 15");
    bool match = false;
    for (size_t i = 0; i < all_links.size(); ++i) {
      if (all_links[i].rank == r.rank) {
        utils::Assert(all_links[i].sock.IsClosed(),
                      "Override a link that is active");
        all_links[i].sock = r.sock; match = true; break;
      }
    }
    if (!match) all_links.push_back(r);
  }
  // close listening sockets
  sock_listen.Close();
  this->parent_index = -1;
  // setup tree links and ring structure
  tree_links.plinks.clear();
  for (size_t i = 0; i < all_links.size(); ++i) {
    utils::Assert(!all_links[i].sock.BadSocket(), "ReConnectLink: bad socket");
    // set the socket to non-blocking mode, enable TCP keepalive
    all_links[i].sock.SetNonBlock(true);
    all_links[i].sock.SetKeepAlive(true);
    if (tree_neighbors.count(all_links[i].rank) != 0) {
      if (all_links[i].rank == parent_rank) {
        parent_index = static_cast<int>(tree_links.plinks.size());
      }
      tree_links.plinks.push_back(&all_links[i]);
    }
    if (all_links[i].rank == prev_rank) ring_prev = &all_links[i];
    if (all_links[i].rank == next_rank) ring_next = &all_links[i];
  }
  Assert(parent_rank == -1 || parent_index != -1,
         "cannot find parent in the link");
  Assert(prev_rank == -1 || ring_prev != NULL,
         "cannot find prev ring in the link");
  Assert(next_rank == -1 || ring_next != NULL,
         "cannot find next ring in the link");
}
/*!
 * \brief perform in-place allreduce, on sendrecvbuf, this function can fail, and will return the cause of failure
 *
 * NOTE on Allreduce:
 *    The kSuccess TryAllreduce does NOT mean every node have successfully finishes TryAllreduce.
 *    It only means the current node get the correct result of Allreduce.
 *    However, it means every node finishes LAST call(instead of this one) of Allreduce/Bcast
 *
 * \param sendrecvbuf_ buffer for both sending and recving data
 * \param type_nbytes the unit number of bytes the type have
 * \param count number of elements to be reduced
 * \param reducer reduce function
 * \return this function can return kSuccess, kSockError, kGetExcept, see ReturnType for details
 * \sa ReturnType
 */
AllreduceBase::ReturnType
AllreduceBase::TryAllreduce(void *sendrecvbuf_,
                            size_t type_nbytes,
                            size_t count,
                            ReduceFunction reducer) {
  if (count > reduce_ring_mincount) {
    return this->TryAllreduceRing(sendrecvbuf_, type_nbytes, count, reducer);
  } else {
    return this->TryAllreduceTree(sendrecvbuf_, type_nbytes, count, reducer);
  }
}
/*!
 * \brief perform in-place allreduce, on sendrecvbuf,
 * this function implements tree-shape reduction
 *
 * \param sendrecvbuf_ buffer for both sending and recving data
 * \param type_nbytes the unit number of bytes the type have
 * \param count number of elements to be reduced
 * \param reducer reduce function
 * \return this function can return kSuccess, kSockError, kGetExcept, see ReturnType for details
 * \sa ReturnType
 */
AllreduceBase::ReturnType
AllreduceBase::TryAllreduceTree(void *sendrecvbuf_,
                                size_t type_nbytes,
                                size_t count,
                                ReduceFunction reducer) {



( run in 0.955 second using v1.01-cache-2.11-cpan-df04353d9ac )