Alien-XGBoost

 view release on metacpan or  search on metacpan

xgboost/dmlc-core/tracker/dmlc_tracker/tracker.py  view on Meta::CPAN

"""
Tracker script for DMLC
Implements the tracker control protocol
 - start dmlc jobs
 - start ps scheduler and rabit tracker
 - help nodes to establish links with each other

Tianqi Chen
"""
# pylint: disable=invalid-name, missing-docstring, too-many-arguments, too-many-locals
# pylint: disable=too-many-branches, too-many-statements
from __future__ import absolute_import

import os
import sys
import socket
import struct
import subprocess
import argparse
import time
import logging
from threading import Thread

class ExSocket(object):
    """
    Extension of socket to handle recv and send of special data
    """
    def __init__(self, sock):
        self.sock = sock
    def recvall(self, nbytes):
        res = []
        nread = 0
        while nread < nbytes:
            chunk = self.sock.recv(min(nbytes - nread, 1024))
            nread += len(chunk)
            res.append(chunk)
        return b''.join(res)
    def recvint(self):
        return struct.unpack('@i', self.recvall(4))[0]
    def sendint(self, n):
        self.sock.sendall(struct.pack('@i', n))
    def sendstr(self, s):
        self.sendint(len(s))
        self.sock.sendall(s.encode())
    def recvstr(self):
        slen = self.recvint()
        return self.recvall(slen).decode()

# magic number used to verify existence of data
kMagic = 0xff99

def get_some_ip(host):
    return socket.getaddrinfo(host, None)[0][4][0]

def get_family(addr):
    return socket.getaddrinfo(addr, None)[0][0]

class SlaveEntry(object):
    def __init__(self, sock, s_addr):
        slave = ExSocket(sock)
        self.sock = slave
        self.host = get_some_ip(s_addr[0])
        magic = slave.recvint()
        assert magic == kMagic, 'invalid magic number=%d from %s' % (magic, self.host)
        slave.sendint(kMagic)
        self.rank = slave.recvint()
        self.world_size = slave.recvint()
        self.jobid = slave.recvstr()
        self.cmd = slave.recvstr()
        self.wait_accept = 0
        self.port = None

    def decide_rank(self, job_map):
        if self.rank >= 0:
            return self.rank
        if self.jobid != 'NULL' and self.jobid in job_map:
            return job_map[self.jobid]
        return -1

    def assign_rank(self, rank, wait_conn, tree_map, parent_map, ring_map):
        self.rank = rank
        nnset = set(tree_map[rank])
        rprev, rnext = ring_map[rank]
        self.sock.sendint(rank)
        # send parent rank
        self.sock.sendint(parent_map[rank])
        # send world size
        self.sock.sendint(len(tree_map))
        self.sock.sendint(len(nnset))
        # send the rprev and next link
        for r in nnset:
            self.sock.sendint(r)
        # send prev link
        if rprev != -1 and rprev != rank:
            nnset.add(rprev)
            self.sock.sendint(rprev)
        else:
            self.sock.sendint(-1)
        # send next link
        if rnext != -1 and rnext != rank:
            nnset.add(rnext)
            self.sock.sendint(rnext)
        else:
            self.sock.sendint(-1)
        while True:
            ngood = self.sock.recvint()
            goodset = set([])
            for _ in range(ngood):
                goodset.add(self.sock.recvint())



( run in 0.472 second using v1.01-cache-2.11-cpan-13bb782fe5a )