Alien-XGBoost
view release on metacpan or search on metacpan
xgboost/dmlc-core/tracker/dmlc_tracker/tracker.py view on Meta::CPAN
# send parent rank
self.sock.sendint(parent_map[rank])
# send world size
self.sock.sendint(len(tree_map))
self.sock.sendint(len(nnset))
# send the rprev and next link
for r in nnset:
self.sock.sendint(r)
# send prev link
if rprev != -1 and rprev != rank:
nnset.add(rprev)
self.sock.sendint(rprev)
else:
self.sock.sendint(-1)
# send next link
if rnext != -1 and rnext != rank:
nnset.add(rnext)
self.sock.sendint(rnext)
else:
self.sock.sendint(-1)
while True:
ngood = self.sock.recvint()
goodset = set([])
for _ in range(ngood):
goodset.add(self.sock.recvint())
assert goodset.issubset(nnset)
badset = nnset - goodset
conset = []
for r in badset:
if r in wait_conn:
conset.append(r)
self.sock.sendint(len(conset))
self.sock.sendint(len(badset) - len(conset))
for r in conset:
self.sock.sendstr(wait_conn[r].host)
self.sock.sendint(wait_conn[r].port)
self.sock.sendint(r)
nerr = self.sock.recvint()
if nerr != 0:
continue
self.port = self.sock.recvint()
rmset = []
# all connection was successuly setup
for r in conset:
wait_conn[r].wait_accept -= 1
if wait_conn[r].wait_accept == 0:
rmset.append(r)
for r in rmset:
wait_conn.pop(r, None)
self.wait_accept = len(badset) - len(conset)
return rmset
class RabitTracker(object):
"""
tracker for rabit
"""
def __init__(self, hostIP, nslave, port=9091, port_end=9999):
sock = socket.socket(get_family(hostIP), socket.SOCK_STREAM)
for port in range(port, port_end):
try:
sock.bind((hostIP, port))
self.port = port
break
except socket.error as e:
if e.errno in [98, 48]:
continue
else:
raise
sock.listen(256)
self.sock = sock
self.hostIP = hostIP
self.thread = None
self.start_time = None
self.end_time = None
self.nslave = nslave
logging.info('start listen on %s:%d', hostIP, self.port)
def __del__(self):
self.sock.close()
@staticmethod
def get_neighbor(rank, nslave):
rank = rank + 1
ret = []
if rank > 1:
ret.append(rank // 2 - 1)
if rank * 2 - 1 < nslave:
ret.append(rank * 2 - 1)
if rank * 2 < nslave:
ret.append(rank * 2)
return ret
def slave_envs(self):
"""
get enviroment variables for slaves
can be passed in as args or envs
"""
return {'DMLC_TRACKER_URI': self.hostIP,
'DMLC_TRACKER_PORT': self.port}
def get_tree(self, nslave):
tree_map = {}
parent_map = {}
for r in range(nslave):
tree_map[r] = self.get_neighbor(r, nslave)
parent_map[r] = (r + 1) // 2 - 1
return tree_map, parent_map
def find_share_ring(self, tree_map, parent_map, r):
"""
get a ring structure that tends to share nodes with the tree
return a list starting from r
"""
nset = set(tree_map[r])
cset = nset - set([parent_map[r]])
if len(cset) == 0:
return [r]
rlst = [r]
cnt = 0
for v in cset:
vlst = self.find_share_ring(tree_map, parent_map, v)
xgboost/dmlc-core/tracker/dmlc_tracker/tracker.py view on Meta::CPAN
assert s.world_size == -1 or s.world_size == nslave
if s.cmd == 'recover':
assert s.rank >= 0
rank = s.decide_rank(job_map)
# batch assignment of ranks
if rank == -1:
assert len(todo_nodes) != 0
pending.append(s)
if len(pending) == len(todo_nodes):
pending.sort(key=lambda x: x.host)
for s in pending:
rank = todo_nodes.pop(0)
if s.jobid != 'NULL':
job_map[s.jobid] = rank
s.assign_rank(rank, wait_conn, tree_map, parent_map, ring_map)
if s.wait_accept > 0:
wait_conn[rank] = s
logging.debug('Recieve %s signal from %s; assign rank %d',
s.cmd, s.host, s.rank)
if len(todo_nodes) == 0:
logging.info('@tracker All of %d nodes getting started', nslave)
self.start_time = time.time()
else:
s.assign_rank(rank, wait_conn, tree_map, parent_map, ring_map)
logging.debug('Recieve %s signal from %d', s.cmd, s.rank)
if s.wait_accept > 0:
wait_conn[rank] = s
logging.info('@tracker All nodes finishes job')
self.end_time = time.time()
logging.info('@tracker %s secs between node start and job finish',
str(self.end_time - self.start_time))
def start(self, nslave):
def run():
self.accept_slaves(nslave)
self.thread = Thread(target=run, args=())
self.thread.setDaemon(True)
self.thread.start()
def join(self):
while self.thread.isAlive():
self.thread.join(100)
class PSTracker(object):
"""
Tracker module for PS
"""
def __init__(self, hostIP, cmd, port=9091, port_end=9999, envs=None):
"""
Starts the PS scheduler
"""
self.cmd = cmd
if cmd is None:
return
envs = {} if envs is None else envs
self.hostIP = hostIP
sock = socket.socket(get_family(hostIP), socket.SOCK_STREAM)
for port in range(port, port_end):
try:
sock.bind(('', port))
self.port = port
sock.close()
break
except socket.error:
continue
env = os.environ.copy()
env['DMLC_ROLE'] = 'scheduler'
env['DMLC_PS_ROOT_URI'] = str(self.hostIP)
env['DMLC_PS_ROOT_PORT'] = str(self.port)
for k, v in envs.items():
env[k] = str(v)
self.thread = Thread(
target=(lambda: subprocess.check_call(self.cmd, env=env, shell=True)), args=())
self.thread.setDaemon(True)
self.thread.start()
def join(self):
if self.cmd is not None:
while self.thread.isAlive():
self.thread.join(100)
def slave_envs(self):
if self.cmd is None:
return {}
else:
return {'DMLC_PS_ROOT_URI': self.hostIP,
'DMLC_PS_ROOT_PORT': self.port}
def get_host_ip(hostIP=None):
if hostIP is None or hostIP == 'auto':
hostIP = 'ip'
if hostIP == 'dns':
hostIP = socket.getfqdn()
elif hostIP == 'ip':
from socket import gaierror
try:
hostIP = socket.gethostbyname(socket.getfqdn())
except gaierror:
logging.warn('gethostbyname(socket.getfqdn()) failed... trying on hostname()')
hostIP = socket.gethostbyname(socket.gethostname())
if hostIP.startswith("127."):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# doesn't have to be reachable
s.connect(('10.255.255.255', 1))
hostIP = s.getsockname()[0]
return hostIP
def submit(nworker, nserver, fun_submit, hostIP='auto', pscmd=None):
if nserver == 0:
pscmd = None
envs = {'DMLC_NUM_WORKER' : nworker,
'DMLC_NUM_SERVER' : nserver}
hostIP = get_host_ip(hostIP)
if nserver == 0:
( run in 0.625 second using v1.01-cache-2.11-cpan-2398b32b56e )