Compare commits

..

18 Commits

Author SHA1 Message Date
allegroai
121dec2a62 Version bump to v0.16.0 2020-08-10 17:28:00 +03:00
allegroai
4aacf9005e Fix GPU Windows monitoring support (Trains Issue #177) 2020-08-10 08:07:51 +03:00
allegroai
6b333202e9 Sync generated conf file with latest Trains 2020-08-08 14:44:45 +03:00
allegroai
ce6831368f Fix GPU monitoring on Windows machines 2020-08-08 14:43:25 +03:00
allegroai
e4111c830b Fix GIT user/pass in requirements and support for '-e git+http' lines 2020-07-30 14:30:23 +03:00
allegroai
52c1772b04 Add requirement_parser into trains-agent instead as a dependency. Fix requirement_parser to support 'package @ git+http' lines 2020-07-30 14:29:37 +03:00
allegroai
699d13bbb3 Fix task status change to queued should also never happen during Task runtime 2020-07-14 23:42:11 +03:00
allegroai
2c8d7d3d9a Fix --debug to set all specified loggers to DEBUG
Add set_urllib_log_level, in debug set urllib log level to DEBUG
2020-07-11 01:45:46 +03:00
allegroai
b13cc1e8e7 Add error message when Trains API Server is not accessible on startup 2020-07-11 01:44:45 +03:00
allegroai
17d2bf2a3e Change daemon --stop without any specific flag to terminate the agents by worker id lexicographic order 2020-07-11 01:43:54 +03:00
allegroai
94997f9c88 Add daemon --order-fairness for round-robin queue pulling
Add daemon --stop to terminate running agent (assume all the rest of the arguments are the same)
Clean up all log files on termination unless executed with --debug
2020-07-11 01:42:56 +03:00
allegroai
c6d998c4df Add terminate process and rmtree utilities 2020-07-11 01:40:50 +03:00
allegroai
f8ea445339 Fix docker to use UTF-8 encoding, so prints won't break it 2020-07-11 01:40:14 +03:00
allegroai
712efa208b version bump 2020-07-06 21:09:21 +03:00
allegroai
09b6b6a9de Fix non-root docker image usage
Fix broken trains-agent build
Improve support for dockers with preinstalled conda env
Improve trains-agent-docker spinning
2020-07-06 21:09:11 +03:00
allegroai
98ff9a50e6 Changed agent.docker_init_bash_script default value in comment 2020-07-06 21:05:55 +03:00
allegroai
1f4d358316 Changed default docker image from nvidia/cuda to "nvidia/cuda:10.1-runtime-ubuntu18.04" to support cudnn frameworks (TF) 2020-07-02 01:35:57 +03:00
allegroai
f693fa165c Fix .git-credentials and .gitconfig mapping into docker
Add agent.docker_init_bash_script allow finer control over docker startup script
2020-07-02 01:33:13 +03:00
26 changed files with 702 additions and 102 deletions

View File

@@ -5,8 +5,17 @@ WORKDIR /usr/agent
COPY . /usr/agent
ENV LC_ALL=en_US.UTF-8
ENV LANG=en_US.UTF-8
ENV LANGUAGE=en_US.UTF-8
ENV PYTHONIOENCODING=UTF-8
RUN apt-get update
RUN apt-get dist-upgrade -y
RUN apt-get install -y locales
RUN locale-gen en_US.UTF-8
RUN apt-get install -y curl python3-pip git
RUN curl -sSL https://get.docker.com/ | sh
RUN python3 -m pip install -U pip

View File

@@ -11,4 +11,4 @@ TRAINS_API_HOST=${TRAINS_API_HOST:-"http://$TRAINS_HOST_IP:8008"}
echo $TRAINS_FILES_HOST $TRAINS_WEB_HOST $TRAINS_API_HOST 1>&2
python3 -m pip install -q -U "trains-agent${TRAINS_AGENT_UPDATE_VERSION}"
trains-agent daemon --services-mode --queue services --create-queue --docker $TRAINS_AGENT_DEFAULT_BASE_DOCKER --cpu-only $TRAINS_AGENT_EXTRA_ARGS
trains-agent daemon --services-mode --queue services --create-queue --docker "$TRAINS_AGENT_DEFAULT_BASE_DOCKER" --cpu-only $TRAINS_AGENT_EXTRA_ARGS

View File

@@ -106,7 +106,7 @@ agent {
default_docker: {
# default docker image to use when running in docker mode
image: "nvidia/cuda"
image: "nvidia/cuda:10.1-runtime-ubuntu18.04"
# optional arguments to pass to docker image
# arguments: ["--ipc=host"]

View File

@@ -13,7 +13,6 @@ pyjwt>=1.6.4
PyYAML>=3.12
requests-file>=1.4.2
requests>=2.20.0
requirements_parser>=0.2.0
six>=1.11.0
tqdm>=4.19.5
typing>=3.6.4

View File

@@ -92,12 +92,23 @@
default_docker: {
# default docker image to use when running in docker mode
image: "nvidia/cuda"
image: "nvidia/cuda:10.1-runtime-ubuntu18.04"
# optional arguments to pass to docker image
# arguments: ["--ipc=host", ]
}
# set the initial bash script to execute at the startup of any docker.
# all lines will be executed regardless of their exit code.
# {python_single_digit} is translated to 'python3' or 'python2' according to requested python version
# docker_init_bash_script = [
# "echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean",
# "chown -R root /root/.cache/pip",
# "apt-get update",
# "apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0",
# "(which {python_single_digit} && {python_single_digit} -m pip --version) || apt-get install -y {python_single_digit}-pip",
# ]
# cuda versions used for solving pytorch wheel packages
# should be detected automatically. Override with os environment CUDA_VERSION / CUDNN_VERSION
# cuda_version: 10.1

View File

@@ -37,6 +37,9 @@
quality: 87
subsampling: 0
}
# Support plot-per-graph fully matching Tensorboard behavior (i.e. if this is set to true, each series should have its own graph)
tensorboard_single_series_per_graph: false
}
network {
@@ -117,11 +120,11 @@
log {
# debugging feature: set this to true to make null log propagate messages to root logger (so they appear in stdout)
null_log_propagate: False
null_log_propagate: false
task_log_buffer_capacity: 66
# disable urllib info and lower levels
disable_urllib3_info: True
disable_urllib3_info: true
}
development {
@@ -131,14 +134,30 @@
task_reuse_time_window_in_hours: 72.0
# Run VCS repository detection asynchronously
vcs_repo_detect_async: True
vcs_repo_detect_async: true
# Store uncommitted git/hg source code diff in experiment manifest when training in development mode
# This stores "git diff" or "hg diff" into the experiment's "script.requirements.diff" section
store_uncommitted_code_diff_on_train: True
store_uncommitted_code_diff: true
# Support stopping an experiment in case it was externally stopped, status was changed or task was reset
support_stopping: True
support_stopping: true
# Default Task output_uri. if output_uri is not provided to Task.init, default_output_uri will be used instead.
default_output_uri: ""
# Default auto generated requirements optimize for smaller requirements
# If True, analyze the entire repository regardless of the entry point.
# If False, first analyze the entry point script, if it does not contain other to local files,
# do not analyze the entire repository.
force_analyze_entire_repo: false
# If set to true, *trains* update message will not be printed to the console
# this value can be overwritten with os environment variable TRAINS_SUPPRESS_UPDATE_MESSAGE=1
suppress_update_message: false
# If this flag is true (default is false), instead of analyzing the code with Pigar, analyze with `pip freeze`
detect_with_pip_freeze: false
# Development mode worker
worker {
@@ -149,7 +168,11 @@
ping_period_sec: 30
# Log all stdout & stderr
log_stdout: True
log_stdout: true
# compatibility feature, report memory usage for the entire machine
# default (false), report only on the running process and its sub-processes
report_global_mem_used: false
}
}
}
}

View File

@@ -40,6 +40,7 @@ class Session(TokenManager):
_session_requests = 0
_session_initial_timeout = (3.0, 10.)
_session_timeout = (10.0, 30.)
_session_initial_connect_retry = 4
_write_session_data_size = 15000
_write_session_timeout = (30.0, 30.)
@@ -96,7 +97,7 @@ class Session(TokenManager):
else:
self.config = load()
if initialize_logging:
self.config.initialize_logging()
self.config.initialize_logging(debug=kwargs.get('debug', False))
token_expiration_threshold_sec = self.config.get(
"auth.token_expiration_threshold_sec", 60
@@ -134,7 +135,6 @@ class Session(TokenManager):
"api.http.retries", ConfigTree()
).as_plain_ordered_dict()
http_retries_config["status_forcelist"] = self._retry_codes
self.__http_session = get_http_session_with_retry(**http_retries_config)
self.__worker = worker or gethostname()
@@ -144,7 +144,14 @@ class Session(TokenManager):
self.client = client or "api-{}".format(__version__)
# limit the reconnect retries, so we get an error if we are starting the session
http_no_retries_config = dict(**http_retries_config)
http_no_retries_config['connect'] = self._session_initial_connect_retry
self.__http_session = get_http_session_with_retry(**http_no_retries_config)
# try to connect with the server
self.refresh_token()
# create the default session with many retries
self.__http_session = get_http_session_with_retry(**http_retries_config)
# update api version from server response
try:
@@ -546,6 +553,9 @@ class Session(TokenManager):
else:
raise LoginError("Response data mismatch: No 'token' in 'data' value from res, receive : {}, "
"exception: {}".format(res, ex))
except requests.ConnectionError as ex:
raise ValueError('Connection Error: it seems *api_server* is misconfigured. '
'Is this the TRAINS API server {} ?'.format('/'.join(ex.request.url.split('/')[:3])))
except Exception as ex:
raise LoginError('Unrecognized Authentication Error: {} {}'.format(type(ex), ex))

View File

@@ -190,7 +190,7 @@ class Config(object):
def reload(self):
self.replace(self._reload())
def initialize_logging(self):
def initialize_logging(self, debug=False):
logging_config = self._config.get("logging", None)
if not logging_config:
return False
@@ -217,6 +217,8 @@ class Config(object):
)
for logger in loggers:
handlers = logger.get("handlers", None)
if debug:
logger['level'] = 'DEBUG'
if not handlers:
continue
logger["handlers"] = [h for h in handlers if h not in deleted]

View File

@@ -142,6 +142,7 @@ def main():
with open(str(conf_file), 'wt') as f:
header = '# TRAINS-AGENT configuration file\n' \
'api {\n' \
' # Notice: \'host\' is the api server (default port 8008), not the web server.\n' \
' api_server: %s\n' \
' web_server: %s\n' \
' files_server: %s\n' \

View File

@@ -17,7 +17,7 @@ from datetime import datetime
from distutils.spawn import find_executable
from functools import partial
from itertools import chain
from tempfile import mkdtemp, gettempdir
from tempfile import mkdtemp, NamedTemporaryFile
from time import sleep, time
from typing import Text, Optional, Any, Tuple
@@ -66,7 +66,7 @@ from trains_agent.helper.base import (
get_python_path,
is_linux_platform,
rm_file,
add_python_path)
add_python_path, safe_remove_tree, )
from trains_agent.helper.console import ensure_text, print_text, decode_binary_lines
from trains_agent.helper.os.daemonize import daemonize_process
from trains_agent.helper.package.base import PackageManager
@@ -89,7 +89,7 @@ from trains_agent.helper.process import (
get_bash_output,
shutdown_docker_process,
get_docker_id,
commit_docker
commit_docker, terminate_process,
)
from trains_agent.helper.package.cython_req import CythonRequirement
from trains_agent.helper.repo import clone_repository_cached, RepoInfo, VCS
@@ -104,6 +104,7 @@ log = logging.getLogger(__name__)
DOCKER_ROOT_CONF_FILE = "/root/trains.conf"
DOCKER_DEFAULT_CONF_FILE = "/root/default_trains.conf"
@attr.s
class LiteralScriptManager(object):
"""
@@ -212,6 +213,7 @@ class TaskStopSignal(object):
statuses.stopped,
statuses.failed,
statuses.published,
statuses.queued,
]
default = TaskStopReason.no_stop
stopping_message = "stopping"
@@ -317,6 +319,10 @@ class Worker(ServiceCommandSection):
# last message before passing control to the actual task
_task_logging_pass_control_message = "Running task id [{}]:"
_run_as_user_home = '/trains_agent_home'
_docker_fixed_user_cache = '/trains_agent_cache'
_temp_cleanup_list = []
@property
def service(self):
""" Worker command service endpoint """
@@ -329,6 +335,8 @@ class Worker(ServiceCommandSection):
@staticmethod
def register_signal_handler():
def handler(*_):
for f in Worker._temp_cleanup_list + [Singleton.get_pid_file()]:
safe_remove_tree(f)
raise Sigterm()
signal.signal(signal.SIGTERM, handler)
@@ -385,6 +393,7 @@ class Worker(ServiceCommandSection):
self._standalone_mode = None
self._services_mode = None
self._force_current_version = None
self._redirected_stdout_file_no = None
@classmethod
def _verify_command_states(cls, kwargs):
@@ -430,7 +439,7 @@ class Worker(ServiceCommandSection):
pass
def run_one_task(self, queue, task_id, worker_args, docker=None):
# type: (Text, Text, WorkerParams) -> ()
# type: (Text, Text, WorkerParams, Optional[Text]) -> ()
"""
Run one task pulled from queue.
:param queue: ID of queue that task was pulled from
@@ -566,12 +575,12 @@ class Worker(ServiceCommandSection):
else:
self.handle_task_termination(task_id, status, stop_signal_status)
# remove temp files after we sent everything to the backend
safe_remove_file(temp_stdout_name)
safe_remove_file(temp_stderr_name)
if self.docker_image_func:
shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(task_id))
safe_remove_file(temp_stdout_name)
safe_remove_file(temp_stderr_name)
def run_tasks_loop(self, queues, worker_params):
def run_tasks_loop(self, queues, worker_params, priority_order=True):
"""
:summary: Pull and run tasks from queues.
:description: 1. Go through ``queues`` by order.
@@ -581,6 +590,9 @@ class Worker(ServiceCommandSection):
:type queues: list of ``Text``
:param worker_params: Worker command line arguments
:type worker_params: ``trains_agent.helper.process.WorkerParams``
:param priority_order: If True pull order in priority manner. always from the first
If False, pull from each queue once in a round robin manner
:type priority_order: bool
"""
if not self._daemon_foreground:
@@ -611,6 +623,16 @@ class Worker(ServiceCommandSection):
print("No tasks in queue {}".format(queue))
continue
# clear output log if we start a new Task
if not worker_params.debug and self._redirected_stdout_file_no is not None and \
self._redirected_stdout_file_no > 2:
# noinspection PyBroadException
try:
os.lseek(self._redirected_stdout_file_no, 0, 0)
os.ftruncate(self._redirected_stdout_file_no, 0)
except:
pass
self.send_logs(
task_id=task_id,
lines=["task {} pulled from {} by worker {}\n".format(task_id, queue, self.worker_id)],
@@ -619,7 +641,11 @@ class Worker(ServiceCommandSection):
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
self.run_one_task(queue, task_id, worker_params)
self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues))
break
# if we are using priority start pulling from the first always,
# if we are doing round robin, pull from the next one
if priority_order:
break
else:
# sleep and retry polling
if self._daemon_foreground or worker_params.debug:
@@ -671,7 +697,7 @@ class Worker(ServiceCommandSection):
self._session.print_configuration()
def daemon(self, queues, log_level, foreground=False, docker=False, detached=False, **kwargs):
def daemon(self, queues, log_level, foreground=False, docker=False, detached=False, order_fairness=False, **kwargs):
# if we do not need to create queues, make sure they are valid
# match previous behaviour when we validated queue names before everything else
queues = self._resolve_queue_names(queues, create_if_missing=kwargs.get('create_queue', False))
@@ -683,6 +709,11 @@ class Worker(ServiceCommandSection):
kwargs = self._verify_command_states(kwargs)
docker = docker or kwargs.get('docker')
# We are not running a daemon we are killing one.
# find the pid send termination signal and leave
if kwargs.get('stop', False):
return 1 if not self._kill_daemon() else 0
# make sure we only have a single instance,
# also make sure we set worker_id properly and cache folders
self._singleton()
@@ -708,9 +739,8 @@ class Worker(ServiceCommandSection):
self._register(queues)
# create temp config file with current configuration
self.temp_config_path = safe_mkstemp(
suffix=".cfg", prefix=".trains_agent.", text=True, name_only=True
)
self.temp_config_path = NamedTemporaryFile(
suffix=".cfg", prefix=".trains_agent.", mode='w+t').name
# print docker image
if docker is not False and docker is not None:
@@ -750,6 +780,9 @@ class Worker(ServiceCommandSection):
)
)
if not self._session.debug_mode:
self._temp_cleanup_list.append(name)
if not detached:
# redirect std out/err to new file
sys.stdout = sys.stderr = out_file
@@ -757,6 +790,7 @@ class Worker(ServiceCommandSection):
# in detached mode
# fully detach stdin.stdout/stderr and leave main process, running in the background
daemonize_process(out_file.fileno())
self._redirected_stdout_file_no = out_file.fileno()
# make sure we update the singleton lock file to the new pid
Singleton.update_pid_file()
# reprint headers to std file (we are now inside the daemon process)
@@ -776,6 +810,7 @@ class Worker(ServiceCommandSection):
debug=self._session.debug_mode,
trace=self._session.trace,
),
priority_order=not order_fairness,
)
except Exception:
tb = six.text_type(traceback.format_exc())
@@ -1171,7 +1206,7 @@ class Worker(ServiceCommandSection):
clone=("--clone" if entry_point == "clone_task" else ""),
)
else:
change = 'ENTRYPOINT bash'
change = 'ENTRYPOINT []'
print('Committing docker container to: {}'.format(target))
print(commit_docker(container_name=target, docker_id=docker_id, apply_change=change))
@@ -1998,7 +2033,7 @@ class Worker(ServiceCommandSection):
print("Running in Docker {} mode (v19.03 and above) - using default docker image: {} running {}\n".format(
'*standalone*' if self._standalone_mode else '', docker_image, python_version))
temp_config = deepcopy(self._session.config)
mounted_cache_dir = '/root/.trains/cache'
mounted_cache_dir = self._docker_fixed_user_cache # '/root/.trains/cache'
mounted_pip_dl_dir = '/root/.trains/pip-download-cache'
mounted_vcs_cache = '/root/.trains/vcs-cache'
mounted_venv_dir = '/root/.trains/venvs-builds'
@@ -2022,6 +2057,7 @@ class Worker(ServiceCommandSection):
host_pip_cache = Path(os.path.expandvars(self._session.config.get(
"agent.docker_pip_cache", '~/.trains/pip-cache'))).expanduser().as_posix()
host_ssh_cache = mkdtemp(prefix='trains_agent.ssh.')
self._temp_cleanup_list.append(host_ssh_cache)
# make sure all folders are valid
Path(host_apt_cache).mkdir(parents=True, exist_ok=True)
@@ -2043,12 +2079,10 @@ class Worker(ServiceCommandSection):
pass
# check if the .git credentials exist:
host_git_credentials = Path('~/.git-credentials').expanduser()
try:
if host_git_credentials.is_file():
host_git_credentials = host_git_credentials.as_posix()
else:
host_git_credentials = None
host_git_credentials = [
f.as_posix() for f in [Path('~/.git-credentials').expanduser(), Path('~/.gitconfig').expanduser()]
if f.is_file()]
except Exception:
host_git_credentials = None
@@ -2063,6 +2097,8 @@ class Worker(ServiceCommandSection):
cmds = [cmds]
extra_shell_script_str = " ; ".join(map(str, cmds)) + " ; "
bash_script = self._session.config.get("agent.docker_init_bash_script", None)
self.temp_config_path = self.temp_config_path or safe_mkstemp(
suffix=".cfg", prefix=".trains_agent.", text=True, name_only=True
)
@@ -2079,7 +2115,10 @@ class Worker(ServiceCommandSection):
host_cache=host_cache, mounted_cache=mounted_cache_dir,
host_pip_dl=host_pip_dl, mounted_pip_dl=mounted_pip_dl_dir,
host_vcs_cache=host_vcs_cache, mounted_vcs_cache=mounted_vcs_cache,
standalone_mode=self._standalone_mode, force_current_version=self._force_current_version)
standalone_mode=self._standalone_mode,
force_current_version=self._force_current_version,
bash_script=bash_script,
)
return temp_config, partial(docker_cmd_functor, docker_cmd, temp_config)
@staticmethod
@@ -2092,7 +2131,7 @@ class Worker(ServiceCommandSection):
host_pip_dl, mounted_pip_dl,
host_vcs_cache, mounted_vcs_cache,
standalone_mode=False, extra_docker_arguments=None, extra_shell_script=None,
force_current_version=None, host_git_credentials=None):
force_current_version=None, host_git_credentials=None, bash_script=None):
docker = 'docker'
base_cmd = [docker, 'run', '-t']
@@ -2193,20 +2232,32 @@ class Worker(ServiceCommandSection):
trains_agent_wheel = 'trains-agent{specify_version}'.format(specify_version=specify_version)
if not standalone_mode:
update_scheme += \
"echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean ; " \
"chown -R root /root/.cache/pip ; " \
"apt-get update ; " \
"apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0 {python_single_digit}-pip ; " \
"{python} -m pip install -U \"pip{pip_version}\" ; " \
"{python} -m pip install -U {trains_agent_wheel} ; ".format(
python_single_digit=python_version.split('.')[0],
python=python_version, pip_version=PackageManager.get_pip_version(),
trains_agent_wheel=trains_agent_wheel)
if not bash_script:
bash_script = [
"echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean",
"chown -R root /root/.cache/pip",
"apt-get update",
"apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0",
"(which {python_single_digit} && {python_single_digit} -m pip --version) || " +
"apt-get install -y {python_single_digit}-pip",
]
docker_bash_script = " ; ".join(bash_script) if not isinstance(bash_script, str) else bash_script
update_scheme += (
docker_bash_script + " ; " +
"{python} -m pip install -U \"pip{pip_version}\" ; " +
"{python} -m pip install -U {trains_agent_wheel} ; ").format(
python_single_digit=python_version.split('.')[0],
python=python_version, pip_version=PackageManager.get_pip_version(),
trains_agent_wheel=trains_agent_wheel)
if host_git_credentials:
for git_credentials in host_git_credentials:
base_cmd += ['-v', '{}:/root/{}'.format(git_credentials, Path(git_credentials).name)]
base_cmd += (
['-v', conf_file+':'+DOCKER_ROOT_CONF_FILE] +
(['-v', host_git_credentials+':/root/.git-credentials'] if host_git_credentials else []) +
(['-v', host_ssh_cache+':/root/.ssh'] if host_ssh_cache else []) +
(['-v', host_apt_cache+':/var/cache/apt/archives'] if host_apt_cache else []) +
(['-v', host_pip_cache+':/root/.cache/pip'] if host_pip_cache else []) +
@@ -2248,13 +2299,13 @@ class Worker(ServiceCommandSection):
os.setuid(self.uid)
# create a home folder for our user
trains_agent_home = 'trains_agent_home{}'.format('.'+str(Singleton.get_slot()) if Singleton.get_slot() else '')
trains_agent_home = self._run_as_user_home + '{}'.format('.'+str(Singleton.get_slot()) if Singleton.get_slot() else '')
try:
home_folder = '/trains_agent_home'
home_folder = self._run_as_user_home
rm_tree(home_folder)
Path(home_folder).mkdir(parents=True, exist_ok=True)
except:
home_folder = '/home/trains_agent_home'
home_folder = os.path.join('/home', self._run_as_user_home)
rm_tree(home_folder)
Path(home_folder).mkdir(parents=True, exist_ok=True)
@@ -2273,6 +2324,10 @@ class Worker(ServiceCommandSection):
# make sure we will be able to access the cache folder (we assume we have the ability change mod)
if sdk_cache_folder:
sdk_cache_folder = Path(os.path.expandvars(sdk_cache_folder)).expanduser().absolute()
try:
sdk_cache_folder.chmod(0o0777)
except:
pass
for f in sdk_cache_folder.rglob('*'):
try:
f.chmod(0o0777)
@@ -2300,8 +2355,41 @@ class Worker(ServiceCommandSection):
return command, script_dir
def _kill_daemon(self):
worker_id, worker_name = self._generate_worker_id_name()
# Iterate over all running process
for pid, uid, slot, file in sorted(Singleton.get_running_pids(), key=lambda x: x[1] or ''):
# wither we have a match for the worker_id or we just pick the first one
if pid >= 0 and (
(worker_id and uid == worker_id) or
(not worker_id and uid.startswith('{}:'.format(worker_name)))):
# this is us kill it
print('Terminating trains-agent worker_id={} pid={}'.format(uid, pid))
if not terminate_process(pid, timeout=10):
error('Could not terminate process pid={}'.format(pid))
return True
print('Could not find a running trains-agent instance with worker_name={} worker_id={}'.format(
worker_name, worker_id))
return False
def _singleton(self):
# ensure singleton
worker_id, worker_name = self._generate_worker_id_name()
# if we are running in services mode, we allow double register since
# docker-compose will kill instances before they cleanup
self.worker_id, worker_slot = Singleton.register_instance(
unique_worker_id=worker_id, worker_name=worker_name, api_client=self._session.api_client,
allow_double=bool(ENV_DOCKER_HOST_MOUNT.get()) # and bool(self._services_mode),
)
if self.worker_id is None:
error('Instance with the same WORKER_ID [{}] is already running'.format(worker_id))
exit(1)
# update folders based on free slot
self._session.create_cache_folders(slot_index=worker_slot)
def _generate_worker_id_name(self):
worker_id = self._session.config["agent.worker_id"]
worker_name = self._session.config["agent.worker_name"]
if not worker_id and os.environ.get('NVIDIA_VISIBLE_DEVICES') is not None:
@@ -2312,18 +2400,7 @@ class Worker(ServiceCommandSection):
pass
else:
worker_name = '{}:cpu'.format(worker_name)
# if we are running in services mode, we allow double register since
# docker-compose will kill instances before they cleanup
self.worker_id, worker_slot = Singleton.register_instance(
unique_worker_id=worker_id, worker_name=worker_name, api_client=self._session.api_client,
allow_double=bool(self._services_mode) and bool(ENV_DOCKER_HOST_MOUNT.get()))
if self.worker_id is None:
error('Instance with the same WORKER_ID [{}] is already running'.format(worker_id))
exit(1)
# update folders based on free slot
self._session.create_cache_folders(slot_index=worker_slot)
return worker_id, worker_name
def _resolve_queue_names(self, queues, create_if_missing=False):
if not queues:

0
trains_agent/external/__init__.py vendored Normal file
View File

View File

@@ -0,0 +1,22 @@
from .parser import parse # noqa
_MAJOR = 0
_MINOR = 2
_PATCH = 0
def version_tuple():
'''
Returns a 3-tuple of ints that represent the version
'''
return (_MAJOR, _MINOR, _PATCH)
def version():
'''
Returns a string representation of the version
'''
return '%d.%d.%d' % (version_tuple())
__version__ = version()

View File

@@ -0,0 +1,44 @@
import re
# Copied from pip
# https://github.com/pypa/pip/blob/281eb61b09d87765d7c2b92f6982b3fe76ccb0af/pip/index.py#L947
HASH_ALGORITHMS = set(['sha1', 'sha224', 'sha384', 'sha256', 'sha512', 'md5'])
extras_require_search = re.compile(
r'(?P<name>.+)\[(?P<extras>[^\]]+)\]').search
def parse_fragment(fragment_string):
"""Takes a fragment string nd returns a dict of the components"""
fragment_string = fragment_string.lstrip('#')
try:
return dict(
key_value_string.split('=')
for key_value_string in fragment_string.split('&')
)
except ValueError:
raise ValueError(
'Invalid fragment string {fragment_string}'.format(
fragment_string=fragment_string
)
)
def get_hash_info(d):
"""Returns the first matching hashlib name and value from a dict"""
for key in d.keys():
if key.lower() in HASH_ALGORITHMS:
return key, d[key]
return None, None
def parse_extras_require(egg):
if egg is not None:
match = extras_require_search(egg)
if match is not None:
name = match.group('name')
extras = match.group('extras')
return name, [extra.strip() for extra in extras.split(',')]
return egg, []

View File

@@ -0,0 +1,50 @@
import os
import warnings
from .requirement import Requirement
def parse(reqstr):
"""
Parse a requirements file into a list of Requirements
See: pip/req.py:parse_requirements()
:param reqstr: a string or file like object containing requirements
:returns: a *generator* of Requirement objects
"""
filename = getattr(reqstr, 'name', None)
try:
# Python 2.x compatibility
if not isinstance(reqstr, basestring):
reqstr = reqstr.read()
except NameError:
# Python 3.x only
if not isinstance(reqstr, str):
reqstr = reqstr.read()
for line in reqstr.splitlines():
line = line.strip()
if line == '':
continue
elif not line or line.startswith('#'):
# comments are lines that start with # only
continue
elif line.startswith('-r') or line.startswith('--requirement'):
_, new_filename = line.split()
new_file_path = os.path.join(os.path.dirname(filename or '.'),
new_filename)
with open(new_file_path) as f:
for requirement in parse(f):
yield requirement
elif line.startswith('-f') or line.startswith('--find-links') or \
line.startswith('-i') or line.startswith('--index-url') or \
line.startswith('--extra-index-url') or \
line.startswith('--no-index'):
warnings.warn('Private repos not supported. Skipping.')
continue
elif line.startswith('-Z') or line.startswith('--always-unzip'):
warnings.warn('Unused option --always-unzip. Skipping.')
continue
else:
yield Requirement.parse(line)

View File

@@ -0,0 +1,236 @@
from __future__ import unicode_literals
import re
from pkg_resources import Requirement as Req
from .fragment import get_hash_info, parse_fragment, parse_extras_require
from .vcs import VCS, VCS_SCHEMES
URI_REGEX = re.compile(
r'^(?P<scheme>https?|file|ftps?)://(?P<path>[^#]+)'
r'(#(?P<fragment>\S+))?'
)
VCS_REGEX = re.compile(
r'^(?P<scheme>{0})://'.format(r'|'.join(
[scheme.replace('+', r'\+') for scheme in VCS_SCHEMES])) +
r'((?P<login>[^/@]+)@)?'
r'(?P<path>[^#@]+)'
r'(@(?P<revision>[^#]+))?'
r'(#(?P<fragment>\S+))?'
)
# This matches just about everyting
LOCAL_REGEX = re.compile(
r'^((?P<scheme>file)://)?'
r'(?P<path>[^#]+)' +
r'(#(?P<fragment>\S+))?'
)
class Requirement(object):
"""
Represents a single requirementfrom trains_agent.external.requirements_parser.requirement import Requirement
Typically instances of this class are created with ``Requirement.parse``.
For local file requirements, there's no verification that the file
exists. This class attempts to be *dict-like*.
See: http://www.pip-installer.org/en/latest/logic.html
**Members**:
* ``line`` - the actual requirement line being parsed
* ``editable`` - a boolean whether this requirement is "editable"
* ``local_file`` - a boolean whether this requirement is a local file/path
* ``specifier`` - a boolean whether this requirement used a requirement
specifier (eg. "django>=1.5" or "requirements")
* ``vcs`` - a string specifying the version control system
* ``revision`` - a version control system specifier
* ``name`` - the name of the requirement
* ``uri`` - the URI if this requirement was specified by URI
* ``subdirectory`` - the subdirectory fragment of the URI
* ``path`` - the local path to the requirement
* ``hash_name`` - the type of hashing algorithm indicated in the line
* ``hash`` - the hash value indicated by the requirement line
* ``extras`` - a list of extras for this requirement
(eg. "mymodule[extra1, extra2]")
* ``specs`` - a list of specs for this requirement
(eg. "mymodule>1.5,<1.6" => [('>', '1.5'), ('<', '1.6')])
"""
def __init__(self, line):
# Do not call this private method
self.line = line
self.editable = False
self.local_file = False
self.specifier = False
self.vcs = None
self.name = None
self.subdirectory = None
self.uri = None
self.path = None
self.revision = None
self.hash_name = None
self.hash = None
self.extras = []
self.specs = []
def __repr__(self):
return '<Requirement: "{0}">'.format(self.line)
def __getitem__(self, key):
return getattr(self, key)
def keys(self):
return self.__dict__.keys()
@classmethod
def parse_editable(cls, line):
"""
Parses a Requirement from an "editable" requirement which is either
a local project path or a VCS project URI.
See: pip/req.py:from_editable()
:param line: an "editable" requirement
:returns: a Requirement instance for the given line
:raises: ValueError on an invalid requirement
"""
req = cls('-e {0}'.format(line))
req.editable = True
vcs_match = VCS_REGEX.match(line)
local_match = LOCAL_REGEX.match(line)
if vcs_match is not None:
groups = vcs_match.groupdict()
if groups.get('login'):
req.uri = '{scheme}://{login}@{path}'.format(**groups)
else:
req.uri = '{scheme}://{path}'.format(**groups)
req.revision = groups['revision']
if groups['fragment']:
fragment = parse_fragment(groups['fragment'])
egg = fragment.get('egg')
req.name, req.extras = parse_extras_require(egg)
req.hash_name, req.hash = get_hash_info(fragment)
req.subdirectory = fragment.get('subdirectory')
for vcs in VCS:
if req.uri.startswith(vcs):
req.vcs = vcs
else:
assert local_match is not None, 'This should match everything'
groups = local_match.groupdict()
req.local_file = True
if groups['fragment']:
fragment = parse_fragment(groups['fragment'])
egg = fragment.get('egg')
req.name, req.extras = parse_extras_require(egg)
req.hash_name, req.hash = get_hash_info(fragment)
req.subdirectory = fragment.get('subdirectory')
req.path = groups['path']
return req
@classmethod
def parse_line(cls, line):
"""
Parses a Requirement from a non-editable requirement.
See: pip/req.py:from_line()
:param line: a "non-editable" requirement
:returns: a Requirement instance for the given line
:raises: ValueError on an invalid requirement
"""
req = cls(line)
vcs_match = VCS_REGEX.match(line)
uri_match = URI_REGEX.match(line)
local_match = LOCAL_REGEX.match(line)
if vcs_match is not None:
groups = vcs_match.groupdict()
if groups.get('login'):
req.uri = '{scheme}://{login}@{path}'.format(**groups)
else:
req.uri = '{scheme}://{path}'.format(**groups)
req.revision = groups['revision']
if groups['fragment']:
fragment = parse_fragment(groups['fragment'])
egg = fragment.get('egg')
req.name, req.extras = parse_extras_require(egg)
req.hash_name, req.hash = get_hash_info(fragment)
req.subdirectory = fragment.get('subdirectory')
for vcs in VCS:
if req.uri.startswith(vcs):
req.vcs = vcs
elif uri_match is not None:
groups = uri_match.groupdict()
req.uri = '{scheme}://{path}'.format(**groups)
if groups['fragment']:
fragment = parse_fragment(groups['fragment'])
egg = fragment.get('egg')
req.name, req.extras = parse_extras_require(egg)
req.hash_name, req.hash = get_hash_info(fragment)
req.subdirectory = fragment.get('subdirectory')
if groups['scheme'] == 'file':
req.local_file = True
elif '#egg=' in line:
# Assume a local file match
assert local_match is not None, 'This should match everything'
groups = local_match.groupdict()
req.local_file = True
if groups['fragment']:
fragment = parse_fragment(groups['fragment'])
egg = fragment.get('egg')
name, extras = parse_extras_require(egg)
req.name = fragment.get('egg')
req.hash_name, req.hash = get_hash_info(fragment)
req.subdirectory = fragment.get('subdirectory')
req.path = groups['path']
else:
# This is a requirement specifier.
# Delegate to pkg_resources and hope for the best
req.specifier = True
pkg_req = Req.parse(line)
req.name = pkg_req.unsafe_name
req.extras = list(pkg_req.extras)
req.specs = pkg_req.specs
return req
@classmethod
def parse(cls, line):
"""
Parses a Requirement from a line of a requirement file.
:param line: a line of a requirement file
:returns: a Requirement instance for the given line
:raises: ValueError on an invalid requirement
"""
line = line.lstrip()
if line.startswith('-e') or line.startswith('--editable'):
# Editable installs are either a local project path
# or a VCS project URI
return cls.parse_editable(
re.sub(r'^(-e|--editable=?)\s*', '', line))
elif '@' in line:
# Allegro bug fix: support 'name @ git+' entries
name, vcs = line.split('@', 1)
name = name.strip()
vcs = vcs.strip()
# noinspection PyBroadException
try:
# check if the name is valid & parsed
Req.parse(name)
# if we are here, name is a valid package name, check if the vcs part is valid
if VCS_REGEX.match(vcs):
req = cls.parse_line(vcs)
req.name = name
return req
except Exception:
pass
return cls.parse_line(line)

View File

@@ -0,0 +1,30 @@
from __future__ import unicode_literals
VCS = [
'git',
'hg',
'svn',
'bzr',
]
VCS_SCHEMES = [
'git',
'git+https',
'git+ssh',
'git+git',
'hg+http',
'hg+https',
'hg+static-http',
'hg+ssh',
'svn',
'svn+svn',
'svn+http',
'svn+https',
'svn+ssh',
'bzr+http',
'bzr+https',
'bzr+ssh',
'bzr+sftp',
'bzr+ftp',
'bzr+lp',
]

View File

@@ -173,13 +173,30 @@ def normalize_path(*paths):
def safe_remove_file(filename, error_message=None):
# noinspection PyBroadException
try:
os.remove(filename)
if filename:
os.remove(filename)
except Exception:
if error_message:
print(error_message)
def safe_remove_tree(filename):
if not filename:
return
# noinspection PyBroadException
try:
shutil.rmtree(filename, ignore_errors=True)
except Exception:
pass
# noinspection PyBroadException
try:
os.remove(filename)
except Exception:
pass
def get_python_path(script_dir, entry_point, package_api):
try:
python_path_sep = ';' if is_windows_platform() else ':'

View File

@@ -200,24 +200,30 @@ class GPUStatCollection(object):
GPUStatCollection.global_processes[nv_process.pid] = \
psutil.Process(pid=nv_process.pid)
ps_process = GPUStatCollection.global_processes[nv_process.pid]
process['username'] = ps_process.username()
# cmdline returns full path;
# as in `ps -o comm`, get short cmdnames.
_cmdline = ps_process.cmdline()
if not _cmdline:
# sometimes, zombie or unknown (e.g. [kworker/8:2H])
process['command'] = '?'
process['full_command'] = ['?']
else:
process['command'] = os.path.basename(_cmdline[0])
process['full_command'] = _cmdline
# Bytes to MBytes
process['gpu_memory_usage'] = nv_process.usedGpuMemory // MB
process['cpu_percent'] = ps_process.cpu_percent()
process['cpu_memory_usage'] = \
round((ps_process.memory_percent() / 100.0) *
psutil.virtual_memory().total)
process['pid'] = nv_process.pid
# noinspection PyBroadException
try:
# we do not actually use these, so no point in collecting them
# process['username'] = ps_process.username()
# # cmdline returns full path;
# # as in `ps -o comm`, get short cmdnames.
# _cmdline = ps_process.cmdline()
# if not _cmdline:
# # sometimes, zombie or unknown (e.g. [kworker/8:2H])
# process['command'] = '?'
# process['full_command'] = ['?']
# else:
# process['command'] = os.path.basename(_cmdline[0])
# process['full_command'] = _cmdline
# process['cpu_percent'] = ps_process.cpu_percent()
# process['cpu_memory_usage'] = \
# round((ps_process.memory_percent() / 100.0) *
# psutil.virtual_memory().total)
# Bytes to MBytes
process['gpu_memory_usage'] = nv_process.usedGpuMemory // MB
except Exception:
# insufficient permissions
pass
return process
if not GPUStatCollection._gpu_device_info.get(index):
@@ -285,12 +291,13 @@ class GPUStatCollection(object):
# e.g. nvidia-smi reset or reboot the system
pass
# TODO: Do not block if full process info is not requested
time.sleep(0.1)
for process in processes:
pid = process['pid']
cache_process = GPUStatCollection.global_processes[pid]
process['cpu_percent'] = cache_process.cpu_percent()
# we do not actually use these, so no point in collecting them
# # TODO: Do not block if full process info is not requested
# time.sleep(0.1)
# for process in processes:
# pid = process['pid']
# cache_process = GPUStatCollection.global_processes[pid]
# process['cpu_percent'] = cache_process.cpu_percent()
index = N.nvmlDeviceGetIndex(handle)
gpu_info = {

View File

@@ -14,8 +14,8 @@ import yaml
from time import time
from attr import attrs, attrib, Factory
from pathlib2 import Path
from requirements import parse
from requirements.requirement import Requirement
from trains_agent.external.requirements_parser import parse
from trains_agent.external.requirements_parser.requirement import Requirement
from trains_agent.errors import CommandFailedError
from trains_agent.helper.base import rm_tree, NonStrictAttrs, select_for_platform, is_windows_platform

View File

@@ -1,3 +1,4 @@
import re
from collections import OrderedDict
from typing import Text
@@ -33,6 +34,9 @@ class ExternalRequirements(SimpleSubstitution):
freeze_base = ''
req_line = req.tostr(markers=False)
if req_line.strip().startswith('-e ') or req_line.strip().startswith('--editable'):
req_line = re.sub(r'^(-e|--editable=?)\s*', '', req_line, count=1)
if req.req.vcs and req_line.startswith('git+'):
try:
url_no_frag = furl(req_line)
@@ -47,9 +51,10 @@ class ExternalRequirements(SimpleSubstitution):
vcs._set_ssh_url()
new_req_line = 'git+{}{}'.format(vcs.url_with_auth, fragment)
if new_req_line != req_line:
url_pass = furl(new_req_line).password
furl_line = furl(new_req_line)
print('Replacing original pip vcs \'{}\' with \'{}\''.format(
req_line, new_req_line.replace(url_pass, '****', 1) if url_pass else new_req_line))
req_line,
furl_line.set(password='xxxxxx').tostr() if furl_line.password else new_req_line))
req_line = new_req_line
except Exception:
print('WARNING: Failed parsing pip git install, using original line {}'.format(req_line))

View File

@@ -12,15 +12,15 @@ from typing import Text, List, Type, Optional, Tuple, Dict
from pathlib2 import Path
from pyhocon import ConfigTree
from requirements import parse
# noinspection PyPackageRequirements
from requirements.requirement import Requirement
import six
from trains_agent.definitions import PIP_EXTRA_INDICES
from trains_agent.helper.base import warning, is_conda, which, join_lines, is_windows_platform
from trains_agent.helper.process import Argv, PathLike
from trains_agent.session import Session, normalize_cuda_version
from trains_agent.external.requirements_parser import parse
from trains_agent.external.requirements_parser.requirement import Requirement
from .translator import RequirementsTranslator
@@ -57,7 +57,7 @@ class MarkerRequirement(object):
elif self.vcs:
# leave the line as is, let pip handle it
if self.line:
parts = [self.line]
return self.line
else:
# let's build the line manually
parts = [

View File

@@ -11,6 +11,7 @@ from copy import deepcopy
from distutils.spawn import find_executable
from itertools import chain, repeat, islice
from os.path import devnull
from time import sleep
from typing import Union, Text, Sequence, Any, TypeVar, Callable
import psutil
@@ -41,6 +42,30 @@ def get_bash_output(cmd, strip=False, stderr=subprocess.STDOUT, stdin=False):
return output if not strip or not output else output.strip()
def terminate_process(pid, timeout=10.):
# noinspection PyBroadException
try:
proc = psutil.Process(pid)
proc.terminate()
cnt = 0
while proc.is_running() and cnt < timeout:
sleep(1.)
cnt += 1
proc.terminate()
cnt = 0
while proc.is_running() and cnt < timeout:
sleep(1.)
cnt += 1
proc.kill()
except Exception:
pass
# noinspection PyBroadException
try:
return not psutil.Process(pid).is_running()
except Exception:
return True
def kill_all_child_processes(pid=None):
# get current process if pid not provided
include_parent = True

View File

@@ -37,6 +37,10 @@ class Singleton(object):
except:
pass
@classmethod
def get_lock_filename(cls):
return os.path.join(cls._get_temp_folder(), cls._lock_file_name)
@classmethod
def register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False):
"""
@@ -47,7 +51,7 @@ class Singleton(object):
:return: (str worker_id, int slot_number) Return None value on instance already running
"""
# try to lock file
lock_file = os.path.join(cls._get_temp_folder(), cls._lock_file_name)
lock_file = cls.get_lock_filename()
timeout = 0
while os.path.exists(lock_file):
if timeout > cls._lock_timeout:
@@ -79,30 +83,41 @@ class Singleton(object):
return ret
@classmethod
def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False):
if cls.worker_id:
return cls.worker_id, cls.instance_slot
# make sure we have a unique name
instance_num = 0
def get_running_pids(cls):
temp_folder = cls._get_temp_folder()
files = glob(os.path.join(temp_folder, cls.prefix + cls.sep + '*' + cls.ext))
slots = {}
pids = []
for file in files:
parts = file.split(cls.sep)
# noinspection PyBroadException
try:
pid = int(parts[1])
if not psutil.pid_exists(pid):
pid = -1
except Exception:
# something is wrong, use non existing pid and delete the file
pid = -1
uid, slot = None, None
# noinspection PyBroadException
try:
with open(file, 'r') as f:
uid, slot = str(f.read()).split('\n')
slot = int(slot)
except Exception:
pass
pids.append((pid, uid, slot, file))
return pids
@classmethod
def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False):
if cls.worker_id:
return cls.worker_id, cls.instance_slot
# make sure we have a unique name
instance_num = 0
slots = {}
for pid, uid, slot, file in cls.get_running_pids():
worker = None
if api_client and ENV_DOCKER_HOST_MOUNT.get() and uid:
try:
@@ -111,7 +126,7 @@ class Singleton(object):
worker = None
# count active instances and delete dead files
if not worker and not psutil.pid_exists(pid):
if not worker and pid < 0:
# delete the file
try:
os.remove(os.path.join(file))
@@ -165,3 +180,9 @@ class Singleton(object):
@classmethod
def get_slot(cls):
return cls.instance_slot or 0
@classmethod
def get_pid_file(cls):
if not cls._pid_file:
return None
return cls._pid_file.name

View File

@@ -68,6 +68,10 @@ DAEMON_ARGS = dict({
'dest': 'queues',
'type': foreign_object_id('queues'),
},
'--order-fairness': {
'help': 'Pull from each queue in a round-robin order, instead of priority order.',
'action': 'store_true',
},
'--standalone-mode': {
'help': 'Do not use any network connects, assume everything is pre-installed',
'action': 'store_true',
@@ -85,6 +89,10 @@ DAEMON_ARGS = dict({
'action': 'store_true',
'aliases': ['-d'],
},
'--stop': {
'help': 'Stop the running agent (based on the same set of arguments)',
'action': 'store_true',
},
}, **WORKER_ARGS)

View File

@@ -73,9 +73,11 @@ class Session(_Session):
os.environ[LOCAL_CONFIG_FILE_OVERRIDE_VAR] = config_file
if not Path(config_file).is_file():
raise ValueError("Could not open configuration file: {}".format(config_file))
cpu_only = kwargs.get('cpu_only')
if cpu_only:
os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['NVIDIA_VISIBLE_DEVICES'] = 'none'
if kwargs.get('gpus') and not os.environ.get('KUBERNETES_SERVICE_HOST') \
and not os.environ.get('KUBERNETES_PORT'):
# CUDA_VISIBLE_DEVICES does not support 'all'
@@ -84,6 +86,7 @@ class Session(_Session):
os.environ['NVIDIA_VISIBLE_DEVICES'] = kwargs.get('gpus')
else:
os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['NVIDIA_VISIBLE_DEVICES'] = kwargs.get('gpus')
if kwargs.get('only_load_config'):
from trains_agent.backend_api.config import load
self.config = load()

View File

@@ -1 +1 @@
__version__ = '0.15.1'
__version__ = '0.16.0'