Alien-XGBoost
view release on metacpan or search on metacpan
xgboost/dmlc-core/tracker/yarn/src/main/java/org/apache/hadoop/yarn/dmlc/ApplicationMaster.java view on Meta::CPAN
}
File dir = new File(ps);
if (dir.isDirectory()) {
for (File f: dir.listFiles()) {
if (f.isFile() && f.getPath().endsWith(".jar")) {
cpath.append(File.pathSeparatorChar);
cpath.append(ps + '/' + f.getName());
}
}
}
cpath.append(File.pathSeparatorChar);
cpath.append(ps + '/');
} else {
cpath.append(File.pathSeparatorChar);
cpath.append(ps.trim());
}
}
}
// already use hadoop command to get class path in worker, maybe a
// better solution in future
env.put("CLASSPATH", cpath.toString());
// setup LD_LIBARY_PATH path for libhdfs
String oldLD_LIBRARY_PATH = System.getenv("LD_LIBRARY_PATH");
env.put("LD_LIBRARY_PATH",
oldLD_LIBRARY_PATH == null ? "" : oldLD_LIBRARY_PATH + ":$HADOOP_HDFS_HOME/lib/native:$JAVA_HOME/jre/lib/amd64/server");
env.put("PYTHONPATH", "${PYTHONPATH}:.");
// inherit all rabit variables
for (Map.Entry<String, String> e : System.getenv().entrySet()) {
if (e.getKey().startsWith("DMLC_")) {
env.put(e.getKey(), e.getValue());
}
if (e.getKey().startsWith("rabit_")) {
env.put(e.getKey(), e.getValue());
}
if (e.getKey().startsWith("AWS_")) {
env.put(e.getKey(), e.getValue());
}
if (e.getKey() == "LIBHDFS_OPTS") {
env.put(e.getKey(), e.getValue());
}
}
String nodeHost = container.getNodeId().getHost();
env.put("DMLC_NODE_HOST", nodeHost);
env.put("DMLC_TASK_ID", String.valueOf(task.taskId));
env.put("DMLC_ROLE", task.taskRole);
env.put("DMLC_NUM_ATTEMPT", String.valueOf(task.attemptCounter));
// ctx.setUser(userName);
ctx.setEnvironment(env);
LOG.info(env);
synchronized (this) {
assert (!this.runningTasks.containsKey(container.getId()));
this.runningTasks.put(container.getId(), task);
this.nmClient.startContainerAsync(container, ctx);
}
}
/**
* free the containers that have not yet been launched
*
* @param containers
*/
private synchronized void onStartContainerError(ContainerId cid) {
ApplicationMaster.this.handleFailure(Collections.singletonList(cid));
}
/**
* free the containers that have not yet been launched
*
* @param containers
*/
private synchronized void freeUnusedContainers(
Collection<Container> containers) {
if(containers.size() == 0) return;
for(Container c : containers){
launchDummyTask(c);
}
}
/**
* handle method for AMRMClientAsync.CallbackHandler container allocation
*
* @param containers
*/
private synchronized void onContainersAllocated(List<Container> containers) {
if (this.startAbort) {
this.freeUnusedContainers(containers);
return;
}
Collection<Container> freelist = new java.util.LinkedList<Container>();
for (Container c : containers) {
if(blackList.contains(c.getNodeHttpAddress())){
launchDummyTask(c);
continue;
}
TaskRecord task;
task = pendingTasks.poll();
if (task == null) {
freelist.add(c);
continue;
}
this.launchTask(c, task);
}
this.freeUnusedContainers(freelist);
}
/**
* start aborting the job
*
* @param msg
* the fatal message
*/
private synchronized void abortJob(String msg) {
if (!this.startAbort)
this.abortDiagnosis = msg;
this.startAbort = true;
for (TaskRecord r : this.runningTasks.values()) {
if (!r.abortRequested) {
nmClient.stopContainerAsync(r.container.getId(),
r.container.getNodeId());
r.abortRequested = true;
this.killedTasks.add(r);
}
}
this.killedTasks.addAll(this.pendingTasks);
for (TaskRecord r : this.pendingTasks) {
rmClient.removeContainerRequest(r.containerRequest);
}
this.pendingTasks.clear();
this.runningTasks.clear();
LOG.info(msg);
}
/**
* handle non fatal failures
*
* @param cid
*/
private synchronized void handleFailure(Collection<ContainerId> failed) {
Collection<TaskRecord> tasks = new java.util.LinkedList<TaskRecord>();
for (ContainerId cid : failed) {
TaskRecord r = runningTasks.remove(cid);
if (r == null) {
continue;
}
LOG.info("Task "
+ r.taskId
+ " failed on "
+ r.container.getId()
+ ". See LOG at : "
+ String.format("http://%s/node/containerlogs/%s/"
+ userName, r.container.getNodeHttpAddress(),
r.container.getId()));
r.attemptCounter += 1;
//stop the failed container and add it to blacklist
nmClient.stopContainerAsync(r.container.getId(), r.container.getNodeId());
blackList.add(r.container.getNodeHttpAddress());
r.container = null;
tasks.add(r);
if (r.attemptCounter >= this.maxNumAttempt) {
this.abortJob("[DMLC] Task " + r.taskId + " failed more than "
+ r.attemptCounter + "times");
}
}
if (this.startAbort) {
this.killedTasks.addAll(tasks);
} else {
this.submitTasks(tasks);
}
}
/**
* handle method for AMRMClientAsync.CallbackHandler container allocation
*
* @param status
* list of status
*/
private synchronized void onContainersCompleted(List<ContainerStatus> status) {
Collection<ContainerId> failed = new java.util.LinkedList<ContainerId>();
for (ContainerStatus s : status) {
assert (s.getState().equals(ContainerState.COMPLETE));
int exstatus = s.getExitStatus();
TaskRecord r = runningTasks.get(s.getContainerId());
if (r == null)
continue;
if (exstatus == ContainerExitStatus.SUCCESS) {
finishedTasks.add(r);
runningTasks.remove(s.getContainerId());
} else {
try {
if (exstatus == ContainerExitStatus.class.getField(
"KILLED_EXCEEDED_PMEM").getInt(null)) {
this.abortJob("[DMLC] Task "
+ r.taskId
+ " killed because of exceeding allocated physical memory");
return;
}
if (exstatus == ContainerExitStatus.class.getField(
"KILLED_EXCEEDED_VMEM").getInt(null)) {
this.abortJob("[DMLC] Task "
+ r.taskId
+ " killed because of exceeding allocated virtual memory");
return;
}
} catch (Exception e) {
LOG.warn(e.getMessage());
}
LOG.info("[DMLC] Task " + r.taskId + " exited with status "
+ exstatus + " Diagnostics:"+ s.getDiagnostics());
failed.add(s.getContainerId());
}
}
this.handleFailure(failed);
}
/**
* callback handler for resource manager
*/
private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
@Override
public float getProgress() {
return 1.0f - (float) (pendingTasks.size()) / numTasks;
}
@Override
public void onContainersAllocated(List<Container> containers) {
ApplicationMaster.this.onContainersAllocated(containers);
}
@Override
public void onContainersCompleted(List<ContainerStatus> status) {
ApplicationMaster.this.onContainersCompleted(status);
}
@Override
public void onError(Throwable ex) {
ApplicationMaster.this.abortJob("[DMLC] Resource manager Error "
+ ex.toString());
}
@Override
public void onNodesUpdated(List<NodeReport> nodereport) {
}
@Override
public void onShutdownRequest() {
ApplicationMaster.this
.abortJob("[DMLC] Get shutdown request, start to shutdown...");
}
}
private class NMCallbackHandler implements NMClientAsync.CallbackHandler {
@Override
public void onContainerStarted(ContainerId cid,
Map<String, ByteBuffer> services) {
LOG.info("onContainerStarted Invoked");
}
@Override
public void onContainerStatusReceived(ContainerId cid,
ContainerStatus status) {
LOG.info("onContainerStatusReceived Invoked");
}
@Override
public void onContainerStopped(ContainerId cid) {
LOG.info("onContainerStopped Invoked");
}
@Override
public void onGetContainerStatusError(ContainerId cid, Throwable ex) {
LOG.info("onGetContainerStatusError Invoked: " + ex.toString());
ApplicationMaster.this
.handleFailure(Collections.singletonList(cid));
}
@Override
public void onStartContainerError(ContainerId cid, Throwable ex) {
LOG.info("onStartContainerError Invoked: " + ex.getMessage());
ApplicationMaster.this
.onStartContainerError(cid);
}
@Override
public void onStopContainerError(ContainerId cid, Throwable ex) {
LOG.info("onStopContainerError Invoked: " + ex.toString());
}
}
}
( run in 0.759 second using v1.01-cache-2.11-cpan-39bf76dae61 )