Alien-XGBoost
view release on metacpan or search on metacpan
xgboost/dmlc-core/tracker/dmlc_tracker/yarn.py view on Meta::CPAN
"""
This is a script to submit dmlc job via Yarn
dmlc will run as a Yarn application
"""
# pylint: disable=invalid-name, too-many-locals, too-many-branches, missing-docstring
from __future__ import absolute_import
import os
import subprocess
import warnings
import logging
import platform
from threading import Thread
from . import opts
from . import tracker
def yarn_submit(args, nworker, nserver, pass_env):
"""Submission function for YARN."""
is_windows = os.name == 'nt'
hadoop_home = os.getenv('HADOOP_HOME')
assert hadoop_home is not None, 'Need to set HADOOP_HOME for YARN submission.'
hadoop_binary = os.path.join(hadoop_home, 'bin', 'hadoop')
assert os.path.exists(hadoop_binary), "HADOOP_HOME does not contain the hadoop binary"
if args.jobname is None:
if args.num_servers == 0:
prefix = ('DMLC[nworker=%d]:' % args.num_workers)
else:
prefix = ('DMLC[nworker=%d,nsever=%d]:' % (args.num_workers, args.num_servers))
args.jobname = prefix + args.command[0].split('/')[-1]
# Determine path for Yarn helpers
YARN_JAR_PATH = os.path.join(args.yarn_app_dir, 'dmlc-yarn.jar')
curr_path = os.path.dirname(os.path.abspath(os.path.expanduser(__file__)))
YARN_BOOT_PY = os.path.join(curr_path, 'launcher.py')
if not os.path.exists(YARN_JAR_PATH):
warnings.warn("cannot find \"%s\", I will try to run build" % YARN_JAR_PATH)
cmd = 'cd %s;./build.%s' % \
(os.path.join(os.path.dirname(__file__), os.pardir, 'yarn'),
'bat' if is_windows else 'sh')
print(cmd)
subprocess.check_call(cmd, shell=True, env=os.environ)
assert os.path.exists(YARN_JAR_PATH), "failed to build dmlc-yarn.jar, try it manually"
# detech hadoop version
(out, _) = subprocess.Popen('%s version' % hadoop_binary,
shell=True, stdout=subprocess.PIPE).communicate()
out = out.split('\n')[0].split()
assert out[0] == 'Hadoop', 'cannot parse hadoop version string'
hadoop_version = int(out[1].split('.')[0])
(classpath, _) = subprocess.Popen('%s classpath' % hadoop_binary,
shell=True, stdout=subprocess.PIPE).communicate()
classpath = classpath.strip()
if hadoop_version < 2:
raise RuntimeError('Hadoop Version is %s, dmlc_yarn will need Yarn(Hadoop 2.0)' % out[1])
fset, new_command = opts.get_cache_file_set(args)
fset.add(YARN_JAR_PATH)
fset.add(YARN_BOOT_PY)
ar_list = []
for fname in args.archives:
fset.add(fname)
ar_list.append(os.path.basename(fname))
JAVA_HOME = os.getenv('JAVA_HOME')
if JAVA_HOME is None:
JAVA = 'java'
else:
JAVA = os.path.join(JAVA_HOME, 'bin', 'java')
cmd = '%s -cp %s%s%s org.apache.hadoop.yarn.dmlc.Client '\
% (JAVA, classpath, ';' if is_windows else ':', YARN_JAR_PATH)
env = os.environ.copy()
for k, v in pass_env.items():
env[k] = str(v)
# ship lib-stdc++.so
if args.ship_libcxx is not None:
if platform.architecture()[0] == '64bit':
libcxx = args.ship_libcxx + '/libstdc++.so.6'
else:
libcxx = args.ship_libcxx + '/libstdc++.so'
fset.add(libcxx)
# update local LD_LIBRARY_PATH
LD_LIBRARY_PATH = env['LD_LIBRARY_PATH'] if 'LD_LIBRARY_PATH' in env else ''
env['LD_LIBRARY_PATH'] = args.ship_libcxx + ':' + LD_LIBRARY_PATH
env['DMLC_JOB_CLUSTER'] = 'yarn'
env['DMLC_WORKER_CORES'] = str(args.worker_cores)
env['DMLC_WORKER_MEMORY_MB'] = str(args.worker_memory_mb)
env['DMLC_SERVER_CORES'] = str(args.server_cores)
env['DMLC_SERVER_MEMORY_MB'] = str(args.server_memory_mb)
env['DMLC_NUM_WORKER'] = str(args.num_workers)
env['DMLC_NUM_SERVER'] = str(args.num_servers)
env['DMLC_JOB_ARCHIVES'] = ':'.join(ar_list)
for f in fset:
cmd += ' -file %s' % f
cmd += ' -jobname %s ' % args.jobname
cmd += ' -tempdir %s ' % args.hdfs_tempdir
cmd += ' -queue %s ' % args.queue
if args.yarn_app_classpath:
cmd += ' -appcp %s ' % args.yarn_app_classpath
for entry in args.env:
cmd += ' -env %s ' % entry
cmd += (' '.join(['./launcher.py'] + new_command))
logging.debug("Submit job with %d workers and %d servers", nworker, nserver)
def run():
"""internal running function."""
logging.debug(cmd)
subprocess.check_call(cmd, shell=True, env=env)
thread = Thread(target=run, args=())
thread.setDaemon(True)
thread.start()
return thread
def submit(args):
submit_thread = []
def yarn_submit_pass(nworker, nserver, pass_env):
submit_thread.append(yarn_submit(args, nworker, nserver, pass_env))
curr_path = os.path.dirname(os.path.abspath(os.path.expanduser(__file__)))
YARN_BOOT_PY = os.path.join(curr_path, 'launcher.py')
tracker.submit(args.num_workers, args.num_servers,
fun_submit=yarn_submit_pass,
pscmd=(' '.join([YARN_BOOT_PY] + args.command)))
for thread in submit_thread:
thread.join()
( run in 1.201 second using v1.01-cache-2.11-cpan-5b529ec07f3 )