From 0a3a8a1c525e173a2619378dbda9050c73ea4717 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Thu, 5 Mar 2020 13:13:03 +0200 Subject: [PATCH] Add support for mounting dockerized experiment folders to host when running on K8s in daemon mode --- trains_agent/commands/worker.py | 151 +++++++++++++++++++++++++------ trains_agent/definitions.py | 1 + trains_agent/helper/base.py | 11 +++ trains_agent/helper/singleton.py | 48 +++++++--- 4 files changed, 171 insertions(+), 40 deletions(-) diff --git a/trains_agent/commands/worker.py b/trains_agent/commands/worker.py index 35f80fe..7852336 100644 --- a/trains_agent/commands/worker.py +++ b/trains_agent/commands/worker.py @@ -38,7 +38,8 @@ from trains_agent.definitions import ( INVALID_WORKER_ID, PROGRAM_NAME, DEFAULT_VENV_UPDATE_URL, - ENV_TASK_EXECUTE_AS_USER + ENV_TASK_EXECUTE_AS_USER, + ENV_K8S_HOST_MOUNT ) from trains_agent.definitions import WORKING_REPOSITORY_DIR, PIP_EXTRA_INDICES from trains_agent.errors import APIError, CommandFailedError, Sigterm @@ -58,13 +59,17 @@ from trains_agent.helper.base import ( is_conda, named_temporary_file, ExecutionInfo, - HOCONEncoder, error, get_python_path, is_linux_platform) + HOCONEncoder, + error, + get_python_path, + is_linux_platform, + rm_file +) from trains_agent.helper.console import ensure_text from trains_agent.helper.package.base import PackageManager from trains_agent.helper.package.conda_api import CondaAPI from trains_agent.helper.package.horovod_req import HorovodRequirement from trains_agent.helper.package.external_req import ExternalRequirements -from trains_agent.helper.package.pip_api.system import SystemPip from trains_agent.helper.package.pip_api.venv import VirtualenvPip from trains_agent.helper.package.poetry_api import PoetryConfig, PoetryAPI from trains_agent.helper.package.pytorch import PytorchRequirement @@ -72,13 +77,16 @@ from trains_agent.helper.package.requirements import RequirementsManager from trains_agent.helper.package.venv_update_api import VenvUpdateAPI from trains_agent.helper.process import ( kill_all_child_processes, - check_if_command_exists, WorkerParams, ExitStatus, Argv, COMMAND_SUCCESS, Executable, - get_bash_output, shutdown_docker_process, get_docker_id, commit_docker) + get_bash_output, + shutdown_docker_process, + get_docker_id, + commit_docker +) from trains_agent.helper.package.cython_req import CythonRequirement from trains_agent.helper.repo import clone_repository_cached, RepoInfo, VCS from trains_agent.helper.resource_monitor import ResourceMonitor @@ -1191,7 +1199,6 @@ class Worker(ServiceCommandSection): exit_code = -1 try: if disable_monitoring: - use_execv = is_linux_platform() and not isinstance(self.package_api, (PoetryAPI, CondaAPI)) try: sys.stdout.flush() sys.stderr.flush() @@ -1528,10 +1535,7 @@ class Worker(ServiceCommandSection): raise e finally: if self._session.debug_mode and temp_file: - try: - Path(temp_file.name).unlink() - except OSError: - pass + rm_file(temp_file.name) # call post installation callback requirements_manager.post_install() # mark as successful installation @@ -1893,10 +1897,39 @@ class Worker(ServiceCommandSection): pass base_cmd += ['-e', 'NVIDIA_VISIBLE_DEVICES={}'.format(dockers_nvidia_visible_devices)] - base_cmd += ['-e', 'TRAINS_WORKER_ID='+worker_id, ] + # check if we need to map host folders + if os.environ.get(ENV_K8S_HOST_MOUNT): + # expect TRAINS_AGENT_K8S_HOST_MOUNT = '/mnt/host/data:/root/.trains' + k8s_node_mnt, _, k8s_pod_mnt = os.environ.get(ENV_K8S_HOST_MOUNT).partition(':') + # search and replace all the host folders with the k8s + host_mounts = [host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache] + for i, m in enumerate(host_mounts): + if k8s_pod_mnt not in m: + print('Warning: K8S mount missing, ignoring cached folder {}'.format(m)) + host_mounts[i] = None + else: + host_mounts[i] = m.replace(k8s_pod_mnt, k8s_node_mnt) + host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache = host_mounts - if host_ssh_cache: - base_cmd += ['-v', host_ssh_cache+':/root/.ssh', ] + # copy the configuration file into the mounted folder + new_conf_file = os.path.join(k8s_pod_mnt, '.trains_agent.{}.cfg'.format(quote(worker_id, safe=""))) + try: + rm_file(new_conf_file) + shutil.copy(conf_file, new_conf_file) + conf_file = new_conf_file.replace(k8s_pod_mnt, k8s_node_mnt) + except Exception: + raise ValueError('Error: could not copy configuration file into: {}'.format(new_conf_file)) + + if host_ssh_cache: + new_ssh_cache = os.path.join(k8s_pod_mnt, '.trains_agent.{}.ssh'.format(quote(worker_id, safe=""))) + try: + rm_tree(new_ssh_cache) + shutil.copytree(host_ssh_cache, new_ssh_cache) + host_ssh_cache = new_ssh_cache.replace(k8s_pod_mnt, k8s_node_mnt) + except Exception: + raise ValueError('Error: could not copy .ssh directory into: {}'.format(new_ssh_cache)) + + base_cmd += ['-e', 'TRAINS_WORKER_ID='+worker_id, ] # if we are running a RC version, install the same version in the docker # because the default latest, will be a release version (not RC) @@ -1923,21 +1956,86 @@ class Worker(ServiceCommandSection): python=python_version, pip_version=PackageManager.get_pip_version(), specify_version=specify_version) - base_cmd += [ - '-v', conf_file+':/root/trains.conf', - '-v', host_apt_cache+':/var/cache/apt/archives', - '-v', host_pip_cache+':/root/.cache/pip', - '-v', host_pip_dl+':'+mounted_pip_dl, - '-v', host_cache+':'+mounted_cache, - '-v', host_vcs_cache+':'+mounted_vcs_cache, - '--rm', docker_image, 'bash', '-c', - update_scheme + - extra_shell_script + - "NVIDIA_VISIBLE_DEVICES=all {python} -u -m trains_agent ".format(python=python_version) - ] + base_cmd += ( + ['-v', conf_file+':/root/trains.conf'] + + (['-v', host_ssh_cache+':/root/.ssh'] if host_ssh_cache else []) + + (['-v', host_apt_cache+':/var/cache/apt/archives'] if host_apt_cache else []) + + (['-v', host_pip_cache+':/root/.cache/pip'] if host_pip_cache else []) + + (['-v', host_pip_dl+':'+mounted_pip_dl] if host_pip_dl else []) + + (['-v', host_cache+':'+mounted_cache] if host_cache else []) + + (['-v', host_vcs_cache+':'+mounted_vcs_cache] if host_vcs_cache else []) + + ['--rm', docker_image, 'bash', '-c', + update_scheme + + extra_shell_script + + "NVIDIA_VISIBLE_DEVICES={nv_visible} {python} -u -m trains_agent ".format( + nv_visible=dockers_nvidia_visible_devices, python=python_version) + ]) return base_cmd + def _run_as_user_patch(self, command, script_dir, venv_folder, sdk_cache_folder, user_uid): + class RunasArgv(Argv): + def __init__(self, *args): + super(RunasArgv, self).__init__(*args) + self.uid = 0 + self.gid = 0 + + def call_subprocess(self, func, censor_password=False, *args, **kwargs): + self._log.debug("running: %s: %s", func.__name__, list(self)) + with self.normalize_exception(censor_password): + return func(list(self), *args, preexec_fn=self._change_uid, **kwargs) + + def set_uid(self, user_uid, user_gid): + from pwd import getpwnam + self.uid = getpwnam(user_uid).pw_uid + self.gid = getpwnam(user_gid).pw_gid + + def _change_uid(self): + os.setgid(self.gid) + os.setuid(self.uid) + + # create a home folder for our user + try: + home_folder = '/trains_agent_home' + rm_tree(home_folder) + Path(home_folder).mkdir(parents=True, exist_ok=True) + except: + home_folder = '/home/trains_agent_home' + rm_tree(home_folder) + Path(home_folder).mkdir(parents=True, exist_ok=True) + + # move our entire venv into the new home + venv_folder = venv_folder.as_posix() + if not venv_folder.endswith(os.path.sep): + venv_folder += os.path.sep + new_venv_folder = os.path.join(home_folder, 'venv/') + shutil.move(venv_folder, new_venv_folder) + # allow everyone to access it + for f in Path(new_venv_folder).rglob('*'): + try: + f.chmod(0o0777) + except: + pass + # make sure we will be able to access the cache folder (we assume we have the ability change mod) + if sdk_cache_folder: + sdk_cache_folder = Path(os.path.expandvars(sdk_cache_folder)).expanduser().absolute() + for f in sdk_cache_folder.rglob('*'): + try: + f.chmod(0o0777) + except: + pass + + # patch venv folder to new location + script_dir = script_dir.replace(venv_folder, new_venv_folder) + # New command line execution + command = RunasArgv('bash', '-c', 'HOME=\"{}\" PATH=\"{}\" {}'.format( + home_folder, + os.environ.get('PATH', '').replace(venv_folder, new_venv_folder), + command.serialize().replace(venv_folder, new_venv_folder))) + command.set_uid(user_uid=user_uid, user_gid=user_uid) + + return command, script_dir + def _singleton(self): # ensure singleton worker_id = self._session.config["agent.worker_id"] @@ -1951,7 +2049,8 @@ class Worker(ServiceCommandSection): else: worker_name = '{}:cpu'.format(worker_name) - self.worker_id, worker_slot = Singleton.register_instance(unique_worker_id=worker_id, worker_name=worker_name) + self.worker_id, worker_slot = Singleton.register_instance(unique_worker_id=worker_id, worker_name=worker_name, + api_client=self._session.api_client) if self.worker_id is None: error('Instance with the same WORKER_ID [{}] is already running'.format(worker_id)) exit(1) diff --git a/trains_agent/definitions.py b/trains_agent/definitions.py index 20b1f95..a612227 100644 --- a/trains_agent/definitions.py +++ b/trains_agent/definitions.py @@ -115,6 +115,7 @@ PIP_EXTRA_INDICES = [ ] DEFAULT_PIP_DOWNLOAD_CACHE = normalize_path(CONFIG_DIR, "pip-download-cache") ENV_TASK_EXECUTE_AS_USER = 'TRAINS_AGENT_EXEC_USER' +ENV_K8S_HOST_MOUNT = 'TRAINS_AGENT_K8S_HOST_MOUNT' class FileBuffering(IntEnum): diff --git a/trains_agent/helper/base.py b/trains_agent/helper/base.py index decbcb8..5170685 100644 --- a/trains_agent/helper/base.py +++ b/trains_agent/helper/base.py @@ -463,6 +463,17 @@ def rm_tree(root): # type: (Union[Path, Text]) -> None return shutil.rmtree(os.path.expanduser(os.path.expandvars(Text(root))), onerror=on_error) +def rm_file(filename): # type: (Union[Path, Text]) -> None + """ + A version of os.unlink that will not raise error + """ + try: + os.unlink(os.path.expanduser(os.path.expandvars(Text(filename)))) + except: + return False + return True + + def is_conda(config): return config['agent.package_manager.type'].lower() == 'conda' diff --git a/trains_agent/helper/singleton.py b/trains_agent/helper/singleton.py index a45353e..36a95da 100644 --- a/trains_agent/helper/singleton.py +++ b/trains_agent/helper/singleton.py @@ -4,11 +4,12 @@ from time import sleep from glob import glob from tempfile import gettempdir, NamedTemporaryFile +from trains_agent.definitions import ENV_K8S_HOST_MOUNT from trains_agent.helper.base import warning class Singleton(object): - prefix = 'trainsagent' + prefix = '.trainsagent' sep = '_' ext = '.tmp' worker_id = None @@ -19,7 +20,7 @@ class Singleton(object): _lock_timeout = 10 @classmethod - def register_instance(cls, unique_worker_id=None, worker_name=None): + def register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None): """ # Exit the process if another instance of us is using the same worker_id @@ -28,7 +29,7 @@ class Singleton(object): :return: (str worker_id, int slot_number) Return None value on instance already running """ # try to lock file - lock_file = os.path.join(gettempdir(), cls._lock_file_name) + lock_file = os.path.join(cls._get_temp_folder(), cls._lock_file_name) timeout = 0 while os.path.exists(lock_file): if timeout > cls._lock_timeout: @@ -46,7 +47,8 @@ class Singleton(object): f.write(bytes(os.getpid())) f.flush() try: - ret = cls._register_instance(unique_worker_id=unique_worker_id, worker_name=worker_name) + ret = cls._register_instance(unique_worker_id=unique_worker_id, worker_name=worker_name, + api_client=api_client) except: ret = None, None @@ -58,12 +60,12 @@ class Singleton(object): return ret @classmethod - def _register_instance(cls, unique_worker_id=None, worker_name=None): + def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None): if cls.worker_id: return cls.worker_id, cls.instance_slot # make sure we have a unique name instance_num = 0 - temp_folder = gettempdir() + temp_folder = cls._get_temp_folder() files = glob(os.path.join(temp_folder, cls.prefix + cls.sep + '*' + cls.ext)) slots = {} for file in files: @@ -73,8 +75,24 @@ class Singleton(object): except Exception: # something is wrong, use non existing pid and delete the file pid = -1 + + uid, slot = None, None + try: + with open(file, 'r') as f: + uid, slot = str(f.read()).split('\n') + slot = int(slot) + except Exception: + pass + + worker = None + if api_client and os.environ.get(ENV_K8S_HOST_MOUNT) and uid: + try: + worker = [w for w in api_client.workers.get_all() if w.id == uid] + except Exception: + worker = None + # count active instances and delete dead files - if not psutil.pid_exists(pid): + if not worker and not psutil.pid_exists(pid): # delete the file try: os.remove(os.path.join(file)) @@ -83,11 +101,7 @@ class Singleton(object): continue instance_num += 1 - try: - with open(file, 'r') as f: - uid, slot = str(f.read()).split('\n') - slot = int(slot) - except Exception: + if slot is None: continue if uid == unique_worker_id: @@ -110,10 +124,16 @@ class Singleton(object): unique_worker_id = worker_name + cls.worker_name_sep + str(cls.instance_slot) # create lock - cls._pid_file = NamedTemporaryFile(dir=gettempdir(), prefix=cls.prefix + cls.sep + str(os.getpid()) + cls.sep, - suffix=cls.ext) + cls._pid_file = NamedTemporaryFile(dir=cls._get_temp_folder(), + prefix=cls.prefix + cls.sep + str(os.getpid()) + cls.sep, suffix=cls.ext) cls._pid_file.write(('{}\n{}'.format(unique_worker_id, cls.instance_slot)).encode()) cls._pid_file.flush() cls.worker_id = unique_worker_id return cls.worker_id, cls.instance_slot + + @classmethod + def _get_temp_folder(cls): + if os.environ.get(ENV_K8S_HOST_MOUNT): + return os.environ.get(ENV_K8S_HOST_MOUNT).split(':')[-1] + return gettempdir()