Compare commits

..

30 Commits

Author SHA1 Message Date
allegroai
00eb2f10ec Version bump to v0.13.3 2020-03-09 16:07:50 +02:00
allegroai
3393372b9c Do not share apt cache among agents on the same machine 2020-03-09 12:38:51 +02:00
allegroai
f2d2d702de Fix k8s support to allow a specific network for the docker (do not use the parent daemon network definition) 2020-03-09 12:38:32 +02:00
allegroai
e3d0680d39 Improve Unicode/UTF stdout handling 2020-03-09 12:34:48 +02:00
allegroai
618c2ac5c4 Add default storage environment vars to generated agent configuration 2020-03-09 12:33:03 +02:00
allegroai
0272c4c79c Add "--force-current-version" daemon command-line flag 2020-03-09 12:31:43 +02:00
allegroai
ff8cf63abf Add "--force-current-version" daemon command-line flag 2020-03-09 12:27:39 +02:00
allegroai
2c7c7f5b44 Add K8s/trains glue service example 2020-03-05 14:10:08 +02:00
allegroai
01f57c1e44 Create missing queues when starting the AWS dynamic cluster management service 2020-03-05 14:08:32 +02:00
allegroai
47bcd3839a Pass correct GPU limit when skipping gpus flag in docker mode 2020-03-05 14:07:44 +02:00
allegroai
0a3a8a1c52 Add support for mounting dockerized experiment folders to host when running on K8s in daemon mode 2020-03-05 13:13:03 +02:00
allegroai
231a907cff Add support for running daemon inside a K8s pod in daemon mode 2020-03-05 13:03:36 +02:00
allegroai
8f95eecf2e Add TRAINS_AGENT_EXEC_USER support for multiple daemon instances 2020-03-05 12:46:53 +02:00
allegroai
81008ee00e Add support for launching a specific python version based on Task.script.binary 2020-03-01 17:15:18 +02:00
allegroai
25bc44c0cf Add poetry to the list of supported package managers 2020-03-01 17:13:15 +02:00
allegroai
f838c8fc70 Allow providing queue names to daemon 2020-02-26 16:58:25 +02:00
allegroai
596093aac6 Version bump to v0.13.2 2020-02-23 16:25:14 +02:00
allegroai
8f23f3b4c0 Add support for pulling recursive git modules as as well as main project 2020-02-23 15:48:12 +02:00
allegroai
95d503afdd Fix pip install or upgrade with limit in conda 2020-02-23 15:47:28 +02:00
allegroai
73ee33be99 Print error in case Poetry configuration failed 2020-02-23 14:43:21 +02:00
allegroai
ee3adf625f Add single-series-per-graph setting to the configuration example 2020-02-23 12:38:14 +02:00
allegroai
afec38a50e Add missing models service 2020-02-18 11:31:58 +02:00
allegroai
f9c60904f4 version bump 2020-02-12 11:23:53 +02:00
allegroai
a09dc85c67 Limit virtualenv version to <20 due to an import issue in v20.0.0 2020-02-12 11:23:48 +02:00
allegroai
5d74f4b376 version bump 2020-02-10 10:47:20 +02:00
allegroai
d558c66d3c Do not stop experiments if network is down 2020-02-10 10:47:13 +02:00
allegroai
714c6a05d0 Add .bashrc reloading before running trains-agent in the AWS dynamic cluster management service 2020-02-10 10:36:00 +02:00
allegroai
43b2f7f41d version bump 2020-02-04 18:06:45 +02:00
allegroai
28d752d568 Preinstall numpy if it exists in the requirements (temporary fix) 2020-02-04 18:06:25 +02:00
allegroai
6d091d8e08 Add experiment archiving example 2020-02-02 14:51:09 +02:00
20 changed files with 3425 additions and 90 deletions

View File

@@ -38,7 +38,7 @@ agent {
# currently supported pip and conda
# poetry is used if pip selected and repository contains poetry.lock file
package_manager: {
# supported options: pip, conda
# supported options: pip, conda, poetry
type: pip,
# specify pip version to use (examples "<20", "==19.3.1", "", empty string will install the latest version)
@@ -141,6 +141,9 @@ sdk {
quality: 87
subsampling: 0
}
# Support plot-per-graph fully matching Tensorboard behavior (i.e. if this is set to True, each series should have its own graph)
tensorboard_single_series_per_graph: False
}
network {

View File

@@ -0,0 +1,59 @@
#!/usr/bin/python3
"""
An example script that cleans up failed experiments by moving them to the archive
"""
import argparse
from datetime import datetime
from trains_agent import APIClient
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--project", "-P", help="Project ID. Only clean up experiments from this project")
parser.add_argument("--user", "-U", help="User ID. Only clean up experiments assigned to this user")
parser.add_argument(
"--status", "-S", default="failed",
help="Experiment status. Only clean up experiments with this status (default %(default)s)"
)
parser.add_argument(
"--iterations", "-I", type=int,
help="Number of iterations. Only clean up experiments with less or equal number of iterations"
)
parser.add_argument(
"--sec-from-start", "-T", type=int,
help="Seconds from start time. "
"Only clean up experiments if less or equal number of seconds have elapsed since started"
)
args = parser.parse_args()
client = APIClient()
tasks = client.tasks.get_all(
project=[args.project] if args.project else None,
user=[args.user] if args.user else None,
status=[args.status] if args.status else None,
system_tags=["-archived"]
)
count = 0
for task in tasks:
if args.iterations and (task.last_iteration or 0) > args.iterations:
continue
if args.sec_from_start:
if not task.started:
continue
if (datetime.utcnow() - task.started.replace(tzinfo=None)).total_seconds() > args.sec_from_start:
continue
try:
client.tasks.edit(
task=task.id,
system_tags=(task.system_tags or []) + ["archived"],
force=True
)
count += 1
except Exception as ex:
print("Failed editing experiment: {}".format(ex))
print("Cleaned up {} experiments".format(count))

View File

@@ -292,6 +292,7 @@
" export TRAINS_API_ACCESS_KEY='{access_key}'\n",
" export TRAINS_API_SECRET_KEY='{secret_key}'\n",
" {bash_script}\n",
" source ~/.bashrc\n",
" python -m trains_agent --config-file '/root/trains.conf' daemon --queue '{queue}' {docker}\n",
" shutdown\n",
" \"\"\".format(\n",
@@ -443,6 +444,12 @@
" os.environ[\"TRAINS_API_SECRET_KEY\"] = TRAINS_SECRET_KEY\n",
" api_client = APIClient()\n",
"\n",
" # Verify the requested queues exist and create those that doesn't exist\n",
" all_queues = [q.name for q in list(api_client.queues.get_all())]\n",
" missing_queues = [q for q in QUEUES if q not in all_queues]\n",
" for q in missing_queues:\n",
" api_client.queues.create(q)\n",
"\n",
" idle_workers = {}\n",
" while True:\n",
" queue_name_to_id = {\n",

View File

@@ -20,4 +20,4 @@ six>=1.11.0
tqdm>=4.19.5
typing>=3.6.4
urllib3>=1.21.1
virtualenv>=16
virtualenv>=16,<20

View File

@@ -4,6 +4,7 @@ from .v2_4 import queues
from .v2_4 import tasks
from .v2_4 import workers
from .v2_4 import events
from .v2_4 import models
__all__ = [
'auth',
@@ -12,4 +13,5 @@ __all__ = [
'tasks',
'workers',
'events',
'models',
]

File diff suppressed because it is too large Load Diff

View File

@@ -17,7 +17,7 @@ from datetime import datetime
from distutils.spawn import find_executable
from functools import partial
from itertools import chain
from tempfile import gettempdir, mkdtemp
from tempfile import mkdtemp
from time import sleep, time
from typing import Text, Optional, Any, Tuple
@@ -28,9 +28,6 @@ from trains_agent.backend_api.services import queues as queues_api
from trains_agent.backend_api.services import tasks as tasks_api
from pathlib2 import Path
from pyhocon import ConfigTree, ConfigFactory
from requests import Session as HTTPSession
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
from six.moves.urllib.parse import quote
from trains_agent.helper.check_update import start_check_update_daemon
@@ -40,7 +37,10 @@ from trains_agent.definitions import (
ENVIRONMENT_SDK_PARAMS,
INVALID_WORKER_ID,
PROGRAM_NAME,
DEFAULT_VENV_UPDATE_URL)
DEFAULT_VENV_UPDATE_URL,
ENV_TASK_EXECUTE_AS_USER,
ENV_K8S_HOST_MOUNT
)
from trains_agent.definitions import WORKING_REPOSITORY_DIR, PIP_EXTRA_INDICES
from trains_agent.errors import APIError, CommandFailedError, Sigterm
from trains_agent.helper.base import (
@@ -59,13 +59,17 @@ from trains_agent.helper.base import (
is_conda,
named_temporary_file,
ExecutionInfo,
HOCONEncoder, error, get_python_path, is_linux_platform)
from trains_agent.helper.console import ensure_text
HOCONEncoder,
error,
get_python_path,
is_linux_platform,
rm_file
)
from trains_agent.helper.console import ensure_text, print_text, decode_binary_lines
from trains_agent.helper.package.base import PackageManager
from trains_agent.helper.package.conda_api import CondaAPI
from trains_agent.helper.package.horovod_req import HorovodRequirement
from trains_agent.helper.package.external_req import ExternalRequirements
from trains_agent.helper.package.pip_api.system import SystemPip
from trains_agent.helper.package.pip_api.venv import VirtualenvPip
from trains_agent.helper.package.poetry_api import PoetryConfig, PoetryAPI
from trains_agent.helper.package.pytorch import PytorchRequirement
@@ -73,7 +77,6 @@ from trains_agent.helper.package.requirements import RequirementsManager
from trains_agent.helper.package.venv_update_api import VenvUpdateAPI
from trains_agent.helper.process import (
kill_all_child_processes,
check_if_command_exists,
WorkerParams,
ExitStatus,
Argv,
@@ -229,6 +232,8 @@ class TaskStopSignal(object):
return self._test()
except Exception as ex:
self.command.log_traceback(ex)
# make sure we break nothing
return TaskStopSignal.default
def _test(self):
# type: () -> TaskStopReason
@@ -361,6 +366,7 @@ class Worker(ServiceCommandSection):
self._docker_force_pull = self._session.config.get("agent.docker_force_pull", False)
self._daemon_foreground = None
self._standalone_mode = None
self._force_current_version = None
def _get_requirements_manager(self, os_override=None, base_interpreter=None):
requirements_manager = RequirementsManager(
@@ -629,11 +635,13 @@ class Worker(ServiceCommandSection):
self.log.debug("starting resource monitor thread")
print("Worker \"{}\" - ".format(self.worker_id), end='')
if not queues:
if queues:
queues = return_list(queues)
queues = [self._resolve_name(q, "queues") for q in queues]
else:
default_queue = self._session.send_api(queues_api.GetDefaultRequest())
queues = [default_queue.id]
queues = return_list(queues)
queues_info = [
self._session.send_api(
queues_api.GetByIdRequest(queue)
@@ -654,6 +662,7 @@ class Worker(ServiceCommandSection):
# print docker image
if docker is not False and docker is not None:
self._force_current_version = kwargs.get('force_current_version', False)
temp_config, docker_image_func = self.get_docker_config_cmd(docker)
self.dump_config(temp_config)
self.docker_image_func = docker_image_func
@@ -749,9 +758,11 @@ class Worker(ServiceCommandSection):
):
# type: (...) -> Tuple[Optional[int], TaskStopReason]
def _print_file(file_path, prev_line_count):
with open(file_path, "rt") as f:
with open(file_path, "rb") as f:
binary_text = f.read()
# skip the previously printed lines,
return f.readlines()[prev_line_count:]
blines = binary_text.split(b'\n')[prev_line_count:]
return decode_binary_lines(blines)
stdout = open(stdout_path, "wt")
stderr = open(stderr_path, "wt") if stderr_path else stdout
@@ -780,7 +791,7 @@ class Worker(ServiceCommandSection):
if daemon:
self.send_logs(
task_id=task_id,
lines=["User aborted: stopping task\n"],
lines=["User aborted: stopping task ({})\n".format(str(stop_reason))],
level="ERROR",
)
kill_all_child_processes(process.pid)
@@ -844,7 +855,8 @@ class Worker(ServiceCommandSection):
"""
if not lines:
return 0
print("".join(lines), end="")
print_text("".join(lines))
# remove backspaces from the text log, they look bad.
for i, l in enumerate(lines):
lines[i] = l.replace('\x08', '')
@@ -920,7 +932,15 @@ class Worker(ServiceCommandSection):
except AttributeError:
requirements = None
# TODO: make sure we pass the correct python_version
if not python_version:
try:
python_version = current_task.script.binary
python_version = python_version.split('/')[-1].replace('python', '')
# if we can cast it, we are good
python_version = '{:.1f}'.format(float(python_version))
except:
python_version = None
venv_folder, requirements_manager = self.install_virtualenv(venv_dir=target,
requested_python_version=python_version)
@@ -1085,7 +1105,16 @@ class Worker(ServiceCommandSection):
except AttributeError:
requirements = None
venv_folder, requirements_manager = self.install_virtualenv(standalone_mode=standalone_mode)
try:
python_ver = current_task.script.binary
python_ver = python_ver.split('/')[-1].replace('python', '')
# if we can cast it, we are good
python_ver = '{:.1f}'.format(float(python_ver))
except:
python_ver = None
venv_folder, requirements_manager = self.install_virtualenv(standalone_mode=standalone_mode,
requested_python_version=python_ver)
if not standalone_mode:
if self._default_pip:
@@ -1157,11 +1186,20 @@ class Worker(ServiceCommandSection):
if python_path:
os.environ['PYTHONPATH'] = python_path
# check if we want to run as another user, only supported on linux
if os.environ.get(ENV_TASK_EXECUTE_AS_USER, None) and is_linux_platform():
command, script_dir = self._run_as_user_patch(
command, script_dir, venv_folder,
self._session.config.get('sdk.storage.cache.default_base_dir'),
os.environ.get(ENV_TASK_EXECUTE_AS_USER))
use_execv = False
else:
use_execv = is_linux_platform() and not isinstance(self.package_api, (PoetryAPI, CondaAPI))
print("Starting Task Execution:\n".format(task_id))
exit_code = -1
try:
if disable_monitoring:
use_execv = is_linux_platform() and not isinstance(self.package_api, (PoetryAPI, CondaAPI))
try:
sys.stdout.flush()
sys.stderr.flush()
@@ -1498,10 +1536,7 @@ class Worker(ServiceCommandSection):
raise e
finally:
if self._session.debug_mode and temp_file:
try:
Path(temp_file.name).unlink()
except OSError:
pass
rm_file(temp_file.name)
# call post installation callback
requirements_manager.post_install()
# mark as successful installation
@@ -1627,7 +1662,7 @@ class Worker(ServiceCommandSection):
)
def install_virtualenv(self, venv_dir=None, requested_python_version=None, standalone_mode=False):
# type: (str, str) -> Tuple[Path, RequirementsManager]
# type: (str, str, bool) -> Tuple[Path, RequirementsManager]
"""
Install a new python virtual environment, removing the old one if exists
:return: virtualenv directory and requirements manager to use with task
@@ -1640,9 +1675,16 @@ class Worker(ServiceCommandSection):
requested_python_version[max(requested_python_version.find('python'), 0):].replace('python', '')
executable_name = 'python'
else:
executable_version, executable_version_suffix, executable_name = self.find_python_executable_for_version(
requested_python_version
)
try:
executable_version, executable_version_suffix, executable_name = \
self.find_python_executable_for_version(requested_python_version)
except Exception:
def_python_version = Text(self._session.config.get("agent.python_binary", None)) or \
Text(self._session.config.get("agent.default_python", None))
print('Warning: could not locate requested Python version {}, reverting to version {}'.format(
requested_python_version, def_python_version))
executable_version, executable_version_suffix, executable_name = \
self.find_python_executable_for_version(def_python_version)
self._session.config.put("agent.default_python", executable_version)
self._session.config.put("agent.python_binary", executable_name)
@@ -1811,7 +1853,7 @@ class Worker(ServiceCommandSection):
host_cache=host_cache, mounted_cache=mounted_cache_dir,
host_pip_dl=host_pip_dl, mounted_pip_dl=mounted_pip_dl_dir,
host_vcs_cache=host_vcs_cache, mounted_vcs_cache=mounted_vcs_cache,
standalone_mode=self._standalone_mode)
standalone_mode=self._standalone_mode, force_current_version=self._force_current_version)
return temp_config, partial(docker_cmd_functor, docker_cmd)
@staticmethod
@@ -1823,15 +1865,25 @@ class Worker(ServiceCommandSection):
host_cache, mounted_cache,
host_pip_dl, mounted_pip_dl,
host_vcs_cache, mounted_vcs_cache,
standalone_mode=False, extra_docker_arguments=None, extra_shell_script=None):
standalone_mode=False, extra_docker_arguments=None, extra_shell_script=None,
force_current_version=None):
docker = 'docker'
base_cmd = [docker, 'run', '-t']
update_scheme = ""
dockers_nvidia_visible_devices = 'all'
gpu_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES', None)
if gpu_devices is None or gpu_devices.lower().strip() == 'all':
base_cmd += ['--gpus', 'all', ]
if os.environ.get('TRAINS_DOCKER_SKIP_GPUS_FLAG', None):
dockers_nvidia_visible_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES') or \
dockers_nvidia_visible_devices
else:
base_cmd += ['--gpus', 'all', ]
elif gpu_devices.strip() and gpu_devices.strip() != 'none':
base_cmd += ['--gpus', 'device='+gpu_devices, ]
if os.environ.get('TRAINS_DOCKER_SKIP_GPUS_FLAG', None):
dockers_nvidia_visible_devices = gpu_devices
else:
base_cmd += ['--gpus', 'device='+gpu_devices, ]
# We are using --gpu, so we should not pass NVIDIA_VISIBLE_DEVICES, I think.
# base_cmd += ['-e', 'NVIDIA_VISIBLE_DEVICES=' + gpu_devices, ]
@@ -1845,10 +1897,51 @@ class Worker(ServiceCommandSection):
if isinstance(extra_docker_arguments, six.string_types) else extra_docker_arguments
base_cmd += [str(a) for a in extra_docker_arguments if a]
base_cmd += ['-e', 'TRAINS_WORKER_ID='+worker_id, ]
# check if running inside a kubernetes
if os.environ.get('KUBERNETES_SERVICE_HOST') and os.environ.get('KUBERNETES_PORT'):
# map network to sibling docker, unless we have other network argument
if not any(a.strip().startswith('--network') for a in base_cmd):
try:
network_mode = get_bash_output(
'docker inspect --format=\'{{.HostConfig.NetworkMode}}\' $(basename $(cat /proc/1/cpuset))')
base_cmd += ['--network', network_mode]
except:
pass
base_cmd += ['-e', 'NVIDIA_VISIBLE_DEVICES={}'.format(dockers_nvidia_visible_devices)]
if host_ssh_cache:
base_cmd += ['-v', host_ssh_cache+':/root/.ssh', ]
# check if we need to map host folders
if os.environ.get(ENV_K8S_HOST_MOUNT):
# expect TRAINS_AGENT_K8S_HOST_MOUNT = '/mnt/host/data:/root/.trains'
k8s_node_mnt, _, k8s_pod_mnt = os.environ.get(ENV_K8S_HOST_MOUNT).partition(':')
# search and replace all the host folders with the k8s
host_mounts = [host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache]
for i, m in enumerate(host_mounts):
if k8s_pod_mnt not in m:
print('Warning: K8S mount missing, ignoring cached folder {}'.format(m))
host_mounts[i] = None
else:
host_mounts[i] = m.replace(k8s_pod_mnt, k8s_node_mnt)
host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache = host_mounts
# copy the configuration file into the mounted folder
new_conf_file = os.path.join(k8s_pod_mnt, '.trains_agent.{}.cfg'.format(quote(worker_id, safe="")))
try:
rm_file(new_conf_file)
shutil.copy(conf_file, new_conf_file)
conf_file = new_conf_file.replace(k8s_pod_mnt, k8s_node_mnt)
except Exception:
raise ValueError('Error: could not copy configuration file into: {}'.format(new_conf_file))
if host_ssh_cache:
new_ssh_cache = os.path.join(k8s_pod_mnt, '.trains_agent.{}.ssh'.format(quote(worker_id, safe="")))
try:
rm_tree(new_ssh_cache)
shutil.copytree(host_ssh_cache, new_ssh_cache)
host_ssh_cache = new_ssh_cache.replace(k8s_pod_mnt, k8s_node_mnt)
except Exception:
raise ValueError('Error: could not copy .ssh directory into: {}'.format(new_ssh_cache))
base_cmd += ['-e', 'TRAINS_WORKER_ID='+worker_id, ]
# if we are running a RC version, install the same version in the docker
# because the default latest, will be a release version (not RC)
@@ -1856,15 +1949,13 @@ class Worker(ServiceCommandSection):
try:
from trains_agent.version import __version__
_version_parts = __version__.split('.')
if 'rc' in _version_parts[-1].lower() or 'rc' in _version_parts[-2].lower():
if force_current_version or 'rc' in _version_parts[-1].lower() or 'rc' in _version_parts[-2].lower():
specify_version = '=={}'.format(__version__)
except:
pass
if standalone_mode:
update_scheme = ""
else:
update_scheme = \
if not standalone_mode:
update_scheme += \
"echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean ; " \
"chown -R root /root/.cache/pip ; " \
"apt-get update ; " \
@@ -1875,21 +1966,86 @@ class Worker(ServiceCommandSection):
python=python_version, pip_version=PackageManager.get_pip_version(),
specify_version=specify_version)
base_cmd += [
'-v', conf_file+':/root/trains.conf',
'-v', host_apt_cache+':/var/cache/apt/archives',
'-v', host_pip_cache+':/root/.cache/pip',
'-v', host_pip_dl+':'+mounted_pip_dl,
'-v', host_cache+':'+mounted_cache,
'-v', host_vcs_cache+':'+mounted_vcs_cache,
'--rm', docker_image, 'bash', '-c',
update_scheme +
extra_shell_script +
"NVIDIA_VISIBLE_DEVICES=all {python} -u -m trains_agent ".format(python=python_version)
]
base_cmd += (
['-v', conf_file+':/root/trains.conf'] +
(['-v', host_ssh_cache+':/root/.ssh'] if host_ssh_cache else []) +
(['-v', host_apt_cache+':/var/cache/apt/archives'] if host_apt_cache else []) +
(['-v', host_pip_cache+':/root/.cache/pip'] if host_pip_cache else []) +
(['-v', host_pip_dl+':'+mounted_pip_dl] if host_pip_dl else []) +
(['-v', host_cache+':'+mounted_cache] if host_cache else []) +
(['-v', host_vcs_cache+':'+mounted_vcs_cache] if host_vcs_cache else []) +
['--rm', docker_image, 'bash', '-c',
update_scheme +
extra_shell_script +
"NVIDIA_VISIBLE_DEVICES={nv_visible} {python} -u -m trains_agent ".format(
nv_visible=dockers_nvidia_visible_devices, python=python_version)
])
return base_cmd
def _run_as_user_patch(self, command, script_dir, venv_folder, sdk_cache_folder, user_uid):
class RunasArgv(Argv):
def __init__(self, *args):
super(RunasArgv, self).__init__(*args)
self.uid = 0
self.gid = 0
def call_subprocess(self, func, censor_password=False, *args, **kwargs):
self._log.debug("running: %s: %s", func.__name__, list(self))
with self.normalize_exception(censor_password):
return func(list(self), *args, preexec_fn=self._change_uid, **kwargs)
def set_uid(self, user_uid, user_gid):
from pwd import getpwnam
self.uid = getpwnam(user_uid).pw_uid
self.gid = getpwnam(user_gid).pw_gid
def _change_uid(self):
os.setgid(self.gid)
os.setuid(self.uid)
# create a home folder for our user
try:
home_folder = '/trains_agent_home'
rm_tree(home_folder)
Path(home_folder).mkdir(parents=True, exist_ok=True)
except:
home_folder = '/home/trains_agent_home'
rm_tree(home_folder)
Path(home_folder).mkdir(parents=True, exist_ok=True)
# move our entire venv into the new home
venv_folder = venv_folder.as_posix()
if not venv_folder.endswith(os.path.sep):
venv_folder += os.path.sep
new_venv_folder = os.path.join(home_folder, 'venv/')
shutil.move(venv_folder, new_venv_folder)
# allow everyone to access it
for f in Path(new_venv_folder).rglob('*'):
try:
f.chmod(0o0777)
except:
pass
# make sure we will be able to access the cache folder (we assume we have the ability change mod)
if sdk_cache_folder:
sdk_cache_folder = Path(os.path.expandvars(sdk_cache_folder)).expanduser().absolute()
for f in sdk_cache_folder.rglob('*'):
try:
f.chmod(0o0777)
except:
pass
# patch venv folder to new location
script_dir = script_dir.replace(venv_folder, new_venv_folder)
# New command line execution
command = RunasArgv('bash', '-c', 'HOME=\"{}\" PATH=\"{}\" {}'.format(
home_folder,
os.environ.get('PATH', '').replace(venv_folder, new_venv_folder),
command.serialize().replace(venv_folder, new_venv_folder)))
command.set_uid(user_uid=user_uid, user_gid=user_uid)
return command, script_dir
def _singleton(self):
# ensure singleton
worker_id = self._session.config["agent.worker_id"]
@@ -1903,7 +2059,8 @@ class Worker(ServiceCommandSection):
else:
worker_name = '{}:cpu'.format(worker_name)
self.worker_id, worker_slot = Singleton.register_instance(unique_worker_id=worker_id, worker_name=worker_name)
self.worker_id, worker_slot = Singleton.register_instance(unique_worker_id=worker_id, worker_name=worker_name,
api_client=self._session.api_client)
if self.worker_id is None:
error('Instance with the same WORKER_ID [{}] is already running'.format(worker_id))
exit(1)

View File

@@ -73,6 +73,12 @@ ENVIRONMENT_CONFIG = {
"agent.cpu_only": EnvironmentConfig(
"TRAINS_CPU_ONLY", "ALG_CPU_ONLY", "CPU_ONLY", type=bool
),
"sdk.aws.s3.key": EnvironmentConfig("AWS_ACCESS_KEY_ID"),
"sdk.aws.s3.secret": EnvironmentConfig("AWS_SECRET_ACCESS_KEY"),
"sdk.aws.s3.region": EnvironmentConfig("AWS_DEFAULT_REGION"),
"sdk.azure.storage.containers.0": {'account_name': EnvironmentConfig("AZURE_STORAGE_ACCOUNT"),
'account_key': EnvironmentConfig("AZURE_STORAGE_KEY")},
"sdk.google.storage.credentials_json": EnvironmentConfig("GOOGLE_APPLICATION_CREDENTIALS"),
}
CONFIG_FILE_ENV = EnvironmentConfig("ALG_CONFIG_FILE")
@@ -114,6 +120,8 @@ DEFAULT_VCS_CACHE = normalize_path(CONFIG_DIR, "vcs-cache")
PIP_EXTRA_INDICES = [
]
DEFAULT_PIP_DOWNLOAD_CACHE = normalize_path(CONFIG_DIR, "pip-download-cache")
ENV_TASK_EXECUTE_AS_USER = 'TRAINS_AGENT_EXEC_USER'
ENV_K8S_HOST_MOUNT = 'TRAINS_AGENT_K8S_HOST_MOUNT'
class FileBuffering(IntEnum):

169
trains_agent/glue/k8s.py Normal file
View File

@@ -0,0 +1,169 @@
from __future__ import print_function, division, unicode_literals
import logging
import os
import subprocess
from time import sleep
from typing import Text, List
from pyhocon import HOCONConverter
from trains_agent.commands.events import Events
from trains_agent.commands.worker import Worker
from trains_agent.helper.process import get_bash_output
from trains_agent.helper.resource_monitor import ResourceMonitor
class K8sIntegration(Worker):
K8S_PENDING_QUEUE = "k8s_scheduler"
KUBECTL_RUN_CMD = "kubectl run trains_id_{task_id} " \
"--image {docker_image} " \
"--restart=Never --replicas=1 " \
"--generator=run-pod/v1"
KUBECTL_DELETE_CMD = "kubectl delete pods " \
"--selector=TRAINS=agent " \
"--field-selector=status.phase!=Pending,status.phase!=Running"
CONTAINER_BASH_SCRIPT = "apt-get install -y git python-pip && " \
"pip install trains-agent && " \
"python -u -m trains_agent execute --full-monitoring --require-queue --id {}"
def __init__(self, k8s_pending_queue_name=None, kubectl_cmd=None, container_bash_script=None, debug=False):
"""
Initialize the k8s integration glue layer daemon
:param str k8s_pending_queue_name: queue name to use when task is pending in the k8s scheduler
:param str|callable kubectl_cmd: kubectl command line str, supports formating (default: KUBECTL_RUN_CMD)
example: "task={task_id} image={docker_image} queue_id={queue_id}"
or a callable function: kubectl_cmd(task_id, docker_image, queue_id, task_data)
:param str container_bash_script: container bash script to be executed in k8s (default: CONTAINER_BASH_SCRIPT)
:param bool debug: Switch logging on
"""
super(K8sIntegration, self).__init__()
self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE
self.kubectl_cmd = kubectl_cmd or self.KUBECTL_RUN_CMD
self.container_bash_script = container_bash_script or self.CONTAINER_BASH_SCRIPT
# Always do system packages, because by we will be running inside a docker
self._session.config.put("agent.package_manager.system_site_packages", True)
# Add debug logging
if debug:
self.log.logger.disabled = False
self.log.logger.setLevel(logging.INFO)
def run_one_task(self, queue: Text, task_id: Text, worker_args=None):
task_data = self._session.api_client.tasks.get_all(id=[task_id])[0]
# push task into the k8s queue, so we have visibility on pending tasks in the k8s scheduler
try:
self._session.api_client.tasks.enqueue(task_id, queue=self.k8s_pending_queue_name,
status_reason='k8s pending scheduler')
except Exception as e:
self.log.error("ERROR: Could not push back task [{}] to k8s pending queue [{}], error: {}".format(
task_id, self.k8s_pending_queue_name, e))
return
if task_data.execution.docker_cmd:
docker_image = task_data.execution.docker_cmd
else:
docker_image = str(os.environ.get("TRAINS_DOCKER_IMAGE") or
self._session.config.get("agent.default_docker.image", "nvidia/cuda"))
# take the first part, this is the docker image name (not arguments)
docker_image = docker_image.split()[0]
create_trains_conf = "echo '{}' >> ~/trains.conf && ".format(
HOCONConverter.to_hocon(self._session.config._config))
if callable(self.kubectl_cmd):
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data)
else:
kubectl_cmd = self.kubectl_cmd.format(task_id=task_id, docker_image=docker_image, queue_id=queue)
# make sure we gave a list
if isinstance(kubectl_cmd, str):
kubectl_cmd = kubectl_cmd.split()
kubectl_cmd += ["--labels=TRAINS=agent", "--command", "--", "/bin/sh", "-c",
create_trains_conf + self.container_bash_script.format(task_id)]
process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
self.log.info("K8s scheduling experiment task id={}".format(task_id))
if error:
self.log.error("Running kubectl encountered an error: {}".format(
error if isinstance(error, str) else error.decode()))
def run_tasks_loop(self, queues: List[Text], worker_params):
"""
:summary: Pull and run tasks from queues.
:description: 1. Go through ``queues`` by order.
2. Try getting the next task for each and run the first one that returns.
3. Go to step 1
:param queues: IDs of queues to pull tasks from
:type queues: list of ``Text``
:param worker_params: Worker command line arguments
:type worker_params: ``trains_agent.helper.process.WorkerParams``
"""
events_service = self.get_service(Events)
# make sure we have a k8s pending queue
try:
self._session.api_client.queues.create(self.k8s_pending_queue_name)
except Exception:
pass
# get queue id
self.k8s_pending_queue_name = self._resolve_name(self.k8s_pending_queue_name, "queues")
_last_machine_update_ts = 0
while True:
# iterate over queues (priority style, queues[0] is highest)
for queue in queues:
# delete old completed /failed pods
get_bash_output(self.KUBECTL_DELETE_CMD)
# get next task in queue
try:
response = self._session.api_client.queues.get_next_task(queue=queue)
except Exception as e:
print("Warning: Could not access task queue [{}], error: {}".format(queue, e))
continue
else:
try:
task_id = response.entry.task
except AttributeError:
print("No tasks in queue {}".format(queue))
continue
events_service.send_log_events(
self.worker_id,
task_id=task_id,
lines="task {} pulled from {} by worker {}".format(
task_id, queue, self.worker_id
),
level="INFO",
)
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
self.run_one_task(queue, task_id, worker_params)
self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues))
break
else:
# sleep and retry polling
print("No tasks in Queues, sleeping for {:.1f} seconds".format(self._polling_interval))
sleep(self._polling_interval)
if self._session.config["agent.reload_config"]:
self.reload_config()
def k8s_daemon(self, queues):
"""
Start the k8s Glue service.
This service will be pulling tasks from *queues* and scheduling them for execution using kubectl.
Notice all scheduled tasks are pushed back into K8S_PENDING_QUEUE,
and popped when execution actually starts. This creates full visibility into the k8s scheduler.
Manually popping a task from the K8S_PENDING_QUEUE,
will cause the k8s scheduler to skip the execution once the scheduled tasks needs to be executed
:param list(str) queues: List of queue names to pull from
"""
return self.daemon(queues=queues, log_level=logging.INFO, foreground=True, docker=False)

View File

@@ -463,6 +463,17 @@ def rm_tree(root): # type: (Union[Path, Text]) -> None
return shutil.rmtree(os.path.expanduser(os.path.expandvars(Text(root))), onerror=on_error)
def rm_file(filename): # type: (Union[Path, Text]) -> None
"""
A version of os.unlink that will not raise error
"""
try:
os.unlink(os.path.expanduser(os.path.expandvars(Text(filename))))
except:
return False
return True
def is_conda(config):
return config['agent.package_manager.type'].lower() == 'conda'

View File

@@ -22,6 +22,18 @@ def print_text(text, newline=True):
sys.stdout.write(data)
def decode_binary_lines(binary_lines, encoding='utf-8'):
# decode per line, if we failed decoding skip the line
lines = []
for b in binary_lines:
try:
l = b.decode(encoding=encoding, errors='replace').replace('\r', '\n')
except:
l = ''
lines.append(l + '\n' if l and l[-1] != '\n' else l)
return lines
def ensure_text(s, encoding='utf-8', errors='strict'):
"""Coerce *s* to six.text_type.
For Python 2:

View File

@@ -112,7 +112,7 @@ class CondaAPI(PackageManager):
return self.pip.bin
def upgrade_pip(self):
return self.pip.upgrade_pip()
return self._install("pip" + self.pip.get_pip_version())
def create(self):
"""

View File

@@ -6,14 +6,14 @@ from .requirements import SimpleSubstitution
class CythonRequirement(SimpleSubstitution):
name = "cython"
name = ("cython", "numpy", )
def __init__(self, *args, **kwargs):
super(CythonRequirement, self).__init__(*args, **kwargs)
def match(self, req):
# match both Cython & cython
return req.name and self.name == req.name.lower()
return req.name and req.name.lower() in self.name
def replace(self, req):
"""

View File

@@ -82,9 +82,13 @@ class PoetryConfig:
def initialize(self, cwd=None):
if not self._initialized:
self._initialized = True
self._config("--local", "virtualenvs.in-project", "true", cwd=cwd)
# self._config("repositories.{}".format(self.REPO_NAME), PYTHON_INDEX)
# self._config("http-basic.{}".format(self.REPO_NAME), *PYTHON_INDEX_CREDENTIALS)
try:
self._config("--local", "virtualenvs.in-project", "true", cwd=cwd)
# self._config("repositories.{}".format(self.REPO_NAME), PYTHON_INDEX)
# self._config("http-basic.{}".format(self.REPO_NAME), *PYTHON_INDEX_CREDENTIALS)
except Exception as ex:
print("Exception: {}\nError: Failed configuring Poetry virtualenvs.in-project".format(ex))
raise
def get_api(self, path):
# type: (Path) -> PoetryAPI

View File

@@ -456,7 +456,7 @@ class Git(VCS):
)
def pull(self):
self.call("fetch", "--all", cwd=self.location)
self.call("fetch", "--all", "--recurse-submodules", cwd=self.location)
info_commands = dict(
url=Argv(executable_name, "ls-remote", "--get-url", "origin"),

View File

@@ -4,11 +4,12 @@ from time import sleep
from glob import glob
from tempfile import gettempdir, NamedTemporaryFile
from trains_agent.definitions import ENV_K8S_HOST_MOUNT
from trains_agent.helper.base import warning
class Singleton(object):
prefix = 'trainsagent'
prefix = '.trainsagent'
sep = '_'
ext = '.tmp'
worker_id = None
@@ -19,7 +20,7 @@ class Singleton(object):
_lock_timeout = 10
@classmethod
def register_instance(cls, unique_worker_id=None, worker_name=None):
def register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None):
"""
# Exit the process if another instance of us is using the same worker_id
@@ -28,7 +29,7 @@ class Singleton(object):
:return: (str worker_id, int slot_number) Return None value on instance already running
"""
# try to lock file
lock_file = os.path.join(gettempdir(), cls._lock_file_name)
lock_file = os.path.join(cls._get_temp_folder(), cls._lock_file_name)
timeout = 0
while os.path.exists(lock_file):
if timeout > cls._lock_timeout:
@@ -46,7 +47,8 @@ class Singleton(object):
f.write(bytes(os.getpid()))
f.flush()
try:
ret = cls._register_instance(unique_worker_id=unique_worker_id, worker_name=worker_name)
ret = cls._register_instance(unique_worker_id=unique_worker_id, worker_name=worker_name,
api_client=api_client)
except:
ret = None, None
@@ -58,12 +60,12 @@ class Singleton(object):
return ret
@classmethod
def _register_instance(cls, unique_worker_id=None, worker_name=None):
def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None):
if cls.worker_id:
return cls.worker_id, cls.instance_slot
# make sure we have a unique name
instance_num = 0
temp_folder = gettempdir()
temp_folder = cls._get_temp_folder()
files = glob(os.path.join(temp_folder, cls.prefix + cls.sep + '*' + cls.ext))
slots = {}
for file in files:
@@ -73,8 +75,24 @@ class Singleton(object):
except Exception:
# something is wrong, use non existing pid and delete the file
pid = -1
uid, slot = None, None
try:
with open(file, 'r') as f:
uid, slot = str(f.read()).split('\n')
slot = int(slot)
except Exception:
pass
worker = None
if api_client and os.environ.get(ENV_K8S_HOST_MOUNT) and uid:
try:
worker = [w for w in api_client.workers.get_all() if w.id == uid]
except Exception:
worker = None
# count active instances and delete dead files
if not psutil.pid_exists(pid):
if not worker and not psutil.pid_exists(pid):
# delete the file
try:
os.remove(os.path.join(file))
@@ -83,11 +101,7 @@ class Singleton(object):
continue
instance_num += 1
try:
with open(file, 'r') as f:
uid, slot = str(f.read()).split('\n')
slot = int(slot)
except Exception:
if slot is None:
continue
if uid == unique_worker_id:
@@ -110,10 +124,16 @@ class Singleton(object):
unique_worker_id = worker_name + cls.worker_name_sep + str(cls.instance_slot)
# create lock
cls._pid_file = NamedTemporaryFile(dir=gettempdir(), prefix=cls.prefix + cls.sep + str(os.getpid()) + cls.sep,
suffix=cls.ext)
cls._pid_file = NamedTemporaryFile(dir=cls._get_temp_folder(),
prefix=cls.prefix + cls.sep + str(os.getpid()) + cls.sep, suffix=cls.ext)
cls._pid_file.write(('{}\n{}'.format(unique_worker_id, cls.instance_slot)).encode())
cls._pid_file.flush()
cls.worker_id = unique_worker_id
return cls.worker_id, cls.instance_slot
@classmethod
def _get_temp_folder(cls):
if os.environ.get(ENV_K8S_HOST_MOUNT):
return os.environ.get(ENV_K8S_HOST_MOUNT).split(':')[-1]
return gettempdir()

View File

@@ -1,3 +1,4 @@
import itertools
from functools import partial
from importlib import import_module
import argparse
@@ -24,8 +25,16 @@ def get_parser():
from .worker import COMMANDS
subparsers = top_parser.add_subparsers(dest='command')
for c in COMMANDS:
parser = subparsers.add_parser(name=c, help=COMMANDS[c]['help'])
for a in COMMANDS[c].get('args', {}).keys():
parser.add_argument(a, **COMMANDS[c]['args'][a])
parser = subparsers.add_parser(name=c, help=COMMANDS[c]["help"])
groups = itertools.groupby(
sorted(
COMMANDS[c].get("args", {}).items(), key=lambda x: x[1].get("group", "")
),
key=lambda x: x[1].pop("group", ""),
)
for group_name, group in groups:
p = parser if not group_name else parser.add_argument_group(group_name)
for key, value in group:
p.add_argument(key, **value)
return top_parser

View File

@@ -37,21 +37,29 @@ DAEMON_ARGS = dict({
'help': 'Pipe full log to stdout/stderr, should not be used if running in background',
'action': 'store_true',
},
'--gpus': {
'help': 'Specify active GPUs for the daemon to use (docker / virtual environment), '
'Equivalent to setting NVIDIA_VISIBLE_DEVICES '
'Examples: --gpus 0 or --gpu 0,1,2 or --gpus all',
},
'--cpu-only': {
'help': 'Disable GPU access for the daemon, only use CPU in either docker or virtual environment',
'action': 'store_true',
},
'--docker': {
'help': 'Run execution task inside a docker (v19.03 and above). Optional args <image> <arguments> or '
'specify default docker image in agent.default_docker.image / agent.default_docker.arguments'
'use --gpus/--cpu-only (or set NVIDIA_VISIBLE_DEVICES) to limit gpu visibility for docker',
'nargs': '*',
'default': False,
'group': 'Docker support',
},
'--gpus': {
'help': 'Specify active GPUs for the daemon to use (docker / virtual environment), '
'Equivalent to setting NVIDIA_VISIBLE_DEVICES '
'Examples: --gpus 0 or --gpu 0,1,2 or --gpus all',
'group': 'Docker support',
},
'--cpu-only': {
'help': 'Disable GPU access for the daemon, only use CPU in either docker or virtual environment',
'action': 'store_true',
'group': 'Docker support',
},
'--force-current-version': {
'help': 'Force trains-agent to use the current trains-agent version when running in the docker',
'action': 'store_true',
'group': 'Docker support',
},
'--queue': {
'help': 'Queue ID(s)/Name(s) to pull tasks from (\'default\' queue)',

View File

@@ -15,7 +15,7 @@ from pyhocon import ConfigFactory, HOCONConverter, ConfigTree
from trains_agent.backend_api.session import Session as _Session, Request
from trains_agent.backend_api.session.client import APIClient
from trains_agent.backend_config.defs import LOCAL_CONFIG_FILE_OVERRIDE_VAR, LOCAL_CONFIG_FILES
from trains_agent.definitions import ENVIRONMENT_CONFIG
from trains_agent.definitions import ENVIRONMENT_CONFIG, ENV_TASK_EXECUTE_AS_USER
from trains_agent.errors import APIError
from trains_agent.helper.base import HOCONEncoder
from trains_agent.helper.process import Argv
@@ -75,7 +75,8 @@ class Session(_Session):
cpu_only = kwargs.get('cpu_only')
if cpu_only:
os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['NVIDIA_VISIBLE_DEVICES'] = 'none'
if kwargs.get('gpus'):
if kwargs.get('gpus') and not os.environ.get('KUBERNETES_SERVICE_HOST') \
and not os.environ.get('KUBERNETES_PORT'):
os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['NVIDIA_VISIBLE_DEVICES'] = kwargs.get('gpus')
if kwargs.get('only_load_config'):
from trains_agent.backend_api.config import load
@@ -111,6 +112,17 @@ class Session(_Session):
# override with environment variables
# cuda_version & cudnn_version are overridden with os.environ here, and normalized in the next section
for config_key, env_config in ENVIRONMENT_CONFIG.items():
# check if the propery is of a list:
if config_key.endswith('.0'):
if all(not i.get() for i in env_config.values()):
continue
parent = config_key.partition('.0')[0]
if not self.config[parent]:
self.config.put(parent, [])
self.config.put(parent, self.config[parent] + [ConfigTree((k, v.get()) for k, v in env_config.items())])
continue
value = env_config.get()
if not value:
continue
@@ -165,7 +177,11 @@ class Session(_Session):
folder_keys = ('agent.venvs_dir', 'agent.vcs_cache.path',
'agent.pip_download_cache.path',
'agent.docker_pip_cache', 'agent.docker_apt_cache')
singleton_folders = ('agent.venvs_dir', 'agent.vcs_cache.path',)
singleton_folders = ('agent.venvs_dir', 'agent.vcs_cache.path', 'agent.docker_apt_cache')
if os.environ.get(ENV_TASK_EXECUTE_AS_USER):
folder_keys = tuple(list(folder_keys) + ['sdk.storage.cache.default_base_dir'])
singleton_folders = tuple(list(singleton_folders) + ['sdk.storage.cache.default_base_dir'])
for key in folder_keys:
folder_key = ConfigValue(self.config, key)

View File

@@ -1 +1 @@
__version__ = '0.13.1'
__version__ = '0.13.3'