mirror of
https://github.com/clearml/clearml-agent
synced 2025-06-26 18:16:15 +00:00
Compare commits
18 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
121dec2a62 | ||
|
|
4aacf9005e | ||
|
|
6b333202e9 | ||
|
|
ce6831368f | ||
|
|
e4111c830b | ||
|
|
52c1772b04 | ||
|
|
699d13bbb3 | ||
|
|
2c8d7d3d9a | ||
|
|
b13cc1e8e7 | ||
|
|
17d2bf2a3e | ||
|
|
94997f9c88 | ||
|
|
c6d998c4df | ||
|
|
f8ea445339 | ||
|
|
712efa208b | ||
|
|
09b6b6a9de | ||
|
|
98ff9a50e6 | ||
|
|
1f4d358316 | ||
|
|
f693fa165c |
@@ -5,8 +5,17 @@ WORKDIR /usr/agent
|
||||
|
||||
COPY . /usr/agent
|
||||
|
||||
ENV LC_ALL=en_US.UTF-8
|
||||
ENV LANG=en_US.UTF-8
|
||||
ENV LANGUAGE=en_US.UTF-8
|
||||
ENV PYTHONIOENCODING=UTF-8
|
||||
|
||||
RUN apt-get update
|
||||
RUN apt-get dist-upgrade -y
|
||||
RUN apt-get install -y locales
|
||||
|
||||
RUN locale-gen en_US.UTF-8
|
||||
|
||||
RUN apt-get install -y curl python3-pip git
|
||||
RUN curl -sSL https://get.docker.com/ | sh
|
||||
RUN python3 -m pip install -U pip
|
||||
|
||||
@@ -11,4 +11,4 @@ TRAINS_API_HOST=${TRAINS_API_HOST:-"http://$TRAINS_HOST_IP:8008"}
|
||||
echo $TRAINS_FILES_HOST $TRAINS_WEB_HOST $TRAINS_API_HOST 1>&2
|
||||
|
||||
python3 -m pip install -q -U "trains-agent${TRAINS_AGENT_UPDATE_VERSION}"
|
||||
trains-agent daemon --services-mode --queue services --create-queue --docker $TRAINS_AGENT_DEFAULT_BASE_DOCKER --cpu-only $TRAINS_AGENT_EXTRA_ARGS
|
||||
trains-agent daemon --services-mode --queue services --create-queue --docker "$TRAINS_AGENT_DEFAULT_BASE_DOCKER" --cpu-only $TRAINS_AGENT_EXTRA_ARGS
|
||||
|
||||
@@ -106,7 +106,7 @@ agent {
|
||||
|
||||
default_docker: {
|
||||
# default docker image to use when running in docker mode
|
||||
image: "nvidia/cuda"
|
||||
image: "nvidia/cuda:10.1-runtime-ubuntu18.04"
|
||||
|
||||
# optional arguments to pass to docker image
|
||||
# arguments: ["--ipc=host"]
|
||||
|
||||
@@ -13,7 +13,6 @@ pyjwt>=1.6.4
|
||||
PyYAML>=3.12
|
||||
requests-file>=1.4.2
|
||||
requests>=2.20.0
|
||||
requirements_parser>=0.2.0
|
||||
six>=1.11.0
|
||||
tqdm>=4.19.5
|
||||
typing>=3.6.4
|
||||
|
||||
@@ -92,12 +92,23 @@
|
||||
|
||||
default_docker: {
|
||||
# default docker image to use when running in docker mode
|
||||
image: "nvidia/cuda"
|
||||
image: "nvidia/cuda:10.1-runtime-ubuntu18.04"
|
||||
|
||||
# optional arguments to pass to docker image
|
||||
# arguments: ["--ipc=host", ]
|
||||
}
|
||||
|
||||
# set the initial bash script to execute at the startup of any docker.
|
||||
# all lines will be executed regardless of their exit code.
|
||||
# {python_single_digit} is translated to 'python3' or 'python2' according to requested python version
|
||||
# docker_init_bash_script = [
|
||||
# "echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean",
|
||||
# "chown -R root /root/.cache/pip",
|
||||
# "apt-get update",
|
||||
# "apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0",
|
||||
# "(which {python_single_digit} && {python_single_digit} -m pip --version) || apt-get install -y {python_single_digit}-pip",
|
||||
# ]
|
||||
|
||||
# cuda versions used for solving pytorch wheel packages
|
||||
# should be detected automatically. Override with os environment CUDA_VERSION / CUDNN_VERSION
|
||||
# cuda_version: 10.1
|
||||
|
||||
@@ -37,6 +37,9 @@
|
||||
quality: 87
|
||||
subsampling: 0
|
||||
}
|
||||
|
||||
# Support plot-per-graph fully matching Tensorboard behavior (i.e. if this is set to true, each series should have its own graph)
|
||||
tensorboard_single_series_per_graph: false
|
||||
}
|
||||
|
||||
network {
|
||||
@@ -117,11 +120,11 @@
|
||||
|
||||
log {
|
||||
# debugging feature: set this to true to make null log propagate messages to root logger (so they appear in stdout)
|
||||
null_log_propagate: False
|
||||
null_log_propagate: false
|
||||
task_log_buffer_capacity: 66
|
||||
|
||||
# disable urllib info and lower levels
|
||||
disable_urllib3_info: True
|
||||
disable_urllib3_info: true
|
||||
}
|
||||
|
||||
development {
|
||||
@@ -131,14 +134,30 @@
|
||||
task_reuse_time_window_in_hours: 72.0
|
||||
|
||||
# Run VCS repository detection asynchronously
|
||||
vcs_repo_detect_async: True
|
||||
vcs_repo_detect_async: true
|
||||
|
||||
# Store uncommitted git/hg source code diff in experiment manifest when training in development mode
|
||||
# This stores "git diff" or "hg diff" into the experiment's "script.requirements.diff" section
|
||||
store_uncommitted_code_diff_on_train: True
|
||||
store_uncommitted_code_diff: true
|
||||
|
||||
# Support stopping an experiment in case it was externally stopped, status was changed or task was reset
|
||||
support_stopping: True
|
||||
support_stopping: true
|
||||
|
||||
# Default Task output_uri. if output_uri is not provided to Task.init, default_output_uri will be used instead.
|
||||
default_output_uri: ""
|
||||
|
||||
# Default auto generated requirements optimize for smaller requirements
|
||||
# If True, analyze the entire repository regardless of the entry point.
|
||||
# If False, first analyze the entry point script, if it does not contain other to local files,
|
||||
# do not analyze the entire repository.
|
||||
force_analyze_entire_repo: false
|
||||
|
||||
# If set to true, *trains* update message will not be printed to the console
|
||||
# this value can be overwritten with os environment variable TRAINS_SUPPRESS_UPDATE_MESSAGE=1
|
||||
suppress_update_message: false
|
||||
|
||||
# If this flag is true (default is false), instead of analyzing the code with Pigar, analyze with `pip freeze`
|
||||
detect_with_pip_freeze: false
|
||||
|
||||
# Development mode worker
|
||||
worker {
|
||||
@@ -149,7 +168,11 @@
|
||||
ping_period_sec: 30
|
||||
|
||||
# Log all stdout & stderr
|
||||
log_stdout: True
|
||||
log_stdout: true
|
||||
|
||||
# compatibility feature, report memory usage for the entire machine
|
||||
# default (false), report only on the running process and its sub-processes
|
||||
report_global_mem_used: false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -40,6 +40,7 @@ class Session(TokenManager):
|
||||
_session_requests = 0
|
||||
_session_initial_timeout = (3.0, 10.)
|
||||
_session_timeout = (10.0, 30.)
|
||||
_session_initial_connect_retry = 4
|
||||
_write_session_data_size = 15000
|
||||
_write_session_timeout = (30.0, 30.)
|
||||
|
||||
@@ -96,7 +97,7 @@ class Session(TokenManager):
|
||||
else:
|
||||
self.config = load()
|
||||
if initialize_logging:
|
||||
self.config.initialize_logging()
|
||||
self.config.initialize_logging(debug=kwargs.get('debug', False))
|
||||
|
||||
token_expiration_threshold_sec = self.config.get(
|
||||
"auth.token_expiration_threshold_sec", 60
|
||||
@@ -134,7 +135,6 @@ class Session(TokenManager):
|
||||
"api.http.retries", ConfigTree()
|
||||
).as_plain_ordered_dict()
|
||||
http_retries_config["status_forcelist"] = self._retry_codes
|
||||
self.__http_session = get_http_session_with_retry(**http_retries_config)
|
||||
|
||||
self.__worker = worker or gethostname()
|
||||
|
||||
@@ -144,7 +144,14 @@ class Session(TokenManager):
|
||||
|
||||
self.client = client or "api-{}".format(__version__)
|
||||
|
||||
# limit the reconnect retries, so we get an error if we are starting the session
|
||||
http_no_retries_config = dict(**http_retries_config)
|
||||
http_no_retries_config['connect'] = self._session_initial_connect_retry
|
||||
self.__http_session = get_http_session_with_retry(**http_no_retries_config)
|
||||
# try to connect with the server
|
||||
self.refresh_token()
|
||||
# create the default session with many retries
|
||||
self.__http_session = get_http_session_with_retry(**http_retries_config)
|
||||
|
||||
# update api version from server response
|
||||
try:
|
||||
@@ -546,6 +553,9 @@ class Session(TokenManager):
|
||||
else:
|
||||
raise LoginError("Response data mismatch: No 'token' in 'data' value from res, receive : {}, "
|
||||
"exception: {}".format(res, ex))
|
||||
except requests.ConnectionError as ex:
|
||||
raise ValueError('Connection Error: it seems *api_server* is misconfigured. '
|
||||
'Is this the TRAINS API server {} ?'.format('/'.join(ex.request.url.split('/')[:3])))
|
||||
except Exception as ex:
|
||||
raise LoginError('Unrecognized Authentication Error: {} {}'.format(type(ex), ex))
|
||||
|
||||
|
||||
@@ -190,7 +190,7 @@ class Config(object):
|
||||
def reload(self):
|
||||
self.replace(self._reload())
|
||||
|
||||
def initialize_logging(self):
|
||||
def initialize_logging(self, debug=False):
|
||||
logging_config = self._config.get("logging", None)
|
||||
if not logging_config:
|
||||
return False
|
||||
@@ -217,6 +217,8 @@ class Config(object):
|
||||
)
|
||||
for logger in loggers:
|
||||
handlers = logger.get("handlers", None)
|
||||
if debug:
|
||||
logger['level'] = 'DEBUG'
|
||||
if not handlers:
|
||||
continue
|
||||
logger["handlers"] = [h for h in handlers if h not in deleted]
|
||||
|
||||
@@ -142,6 +142,7 @@ def main():
|
||||
with open(str(conf_file), 'wt') as f:
|
||||
header = '# TRAINS-AGENT configuration file\n' \
|
||||
'api {\n' \
|
||||
' # Notice: \'host\' is the api server (default port 8008), not the web server.\n' \
|
||||
' api_server: %s\n' \
|
||||
' web_server: %s\n' \
|
||||
' files_server: %s\n' \
|
||||
|
||||
@@ -17,7 +17,7 @@ from datetime import datetime
|
||||
from distutils.spawn import find_executable
|
||||
from functools import partial
|
||||
from itertools import chain
|
||||
from tempfile import mkdtemp, gettempdir
|
||||
from tempfile import mkdtemp, NamedTemporaryFile
|
||||
from time import sleep, time
|
||||
from typing import Text, Optional, Any, Tuple
|
||||
|
||||
@@ -66,7 +66,7 @@ from trains_agent.helper.base import (
|
||||
get_python_path,
|
||||
is_linux_platform,
|
||||
rm_file,
|
||||
add_python_path)
|
||||
add_python_path, safe_remove_tree, )
|
||||
from trains_agent.helper.console import ensure_text, print_text, decode_binary_lines
|
||||
from trains_agent.helper.os.daemonize import daemonize_process
|
||||
from trains_agent.helper.package.base import PackageManager
|
||||
@@ -89,7 +89,7 @@ from trains_agent.helper.process import (
|
||||
get_bash_output,
|
||||
shutdown_docker_process,
|
||||
get_docker_id,
|
||||
commit_docker
|
||||
commit_docker, terminate_process,
|
||||
)
|
||||
from trains_agent.helper.package.cython_req import CythonRequirement
|
||||
from trains_agent.helper.repo import clone_repository_cached, RepoInfo, VCS
|
||||
@@ -104,6 +104,7 @@ log = logging.getLogger(__name__)
|
||||
DOCKER_ROOT_CONF_FILE = "/root/trains.conf"
|
||||
DOCKER_DEFAULT_CONF_FILE = "/root/default_trains.conf"
|
||||
|
||||
|
||||
@attr.s
|
||||
class LiteralScriptManager(object):
|
||||
"""
|
||||
@@ -212,6 +213,7 @@ class TaskStopSignal(object):
|
||||
statuses.stopped,
|
||||
statuses.failed,
|
||||
statuses.published,
|
||||
statuses.queued,
|
||||
]
|
||||
default = TaskStopReason.no_stop
|
||||
stopping_message = "stopping"
|
||||
@@ -317,6 +319,10 @@ class Worker(ServiceCommandSection):
|
||||
# last message before passing control to the actual task
|
||||
_task_logging_pass_control_message = "Running task id [{}]:"
|
||||
|
||||
_run_as_user_home = '/trains_agent_home'
|
||||
_docker_fixed_user_cache = '/trains_agent_cache'
|
||||
_temp_cleanup_list = []
|
||||
|
||||
@property
|
||||
def service(self):
|
||||
""" Worker command service endpoint """
|
||||
@@ -329,6 +335,8 @@ class Worker(ServiceCommandSection):
|
||||
@staticmethod
|
||||
def register_signal_handler():
|
||||
def handler(*_):
|
||||
for f in Worker._temp_cleanup_list + [Singleton.get_pid_file()]:
|
||||
safe_remove_tree(f)
|
||||
raise Sigterm()
|
||||
|
||||
signal.signal(signal.SIGTERM, handler)
|
||||
@@ -385,6 +393,7 @@ class Worker(ServiceCommandSection):
|
||||
self._standalone_mode = None
|
||||
self._services_mode = None
|
||||
self._force_current_version = None
|
||||
self._redirected_stdout_file_no = None
|
||||
|
||||
@classmethod
|
||||
def _verify_command_states(cls, kwargs):
|
||||
@@ -430,7 +439,7 @@ class Worker(ServiceCommandSection):
|
||||
pass
|
||||
|
||||
def run_one_task(self, queue, task_id, worker_args, docker=None):
|
||||
# type: (Text, Text, WorkerParams) -> ()
|
||||
# type: (Text, Text, WorkerParams, Optional[Text]) -> ()
|
||||
"""
|
||||
Run one task pulled from queue.
|
||||
:param queue: ID of queue that task was pulled from
|
||||
@@ -566,12 +575,12 @@ class Worker(ServiceCommandSection):
|
||||
else:
|
||||
self.handle_task_termination(task_id, status, stop_signal_status)
|
||||
# remove temp files after we sent everything to the backend
|
||||
safe_remove_file(temp_stdout_name)
|
||||
safe_remove_file(temp_stderr_name)
|
||||
if self.docker_image_func:
|
||||
shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(task_id))
|
||||
safe_remove_file(temp_stdout_name)
|
||||
safe_remove_file(temp_stderr_name)
|
||||
|
||||
def run_tasks_loop(self, queues, worker_params):
|
||||
def run_tasks_loop(self, queues, worker_params, priority_order=True):
|
||||
"""
|
||||
:summary: Pull and run tasks from queues.
|
||||
:description: 1. Go through ``queues`` by order.
|
||||
@@ -581,6 +590,9 @@ class Worker(ServiceCommandSection):
|
||||
:type queues: list of ``Text``
|
||||
:param worker_params: Worker command line arguments
|
||||
:type worker_params: ``trains_agent.helper.process.WorkerParams``
|
||||
:param priority_order: If True pull order in priority manner. always from the first
|
||||
If False, pull from each queue once in a round robin manner
|
||||
:type priority_order: bool
|
||||
"""
|
||||
|
||||
if not self._daemon_foreground:
|
||||
@@ -611,6 +623,16 @@ class Worker(ServiceCommandSection):
|
||||
print("No tasks in queue {}".format(queue))
|
||||
continue
|
||||
|
||||
# clear output log if we start a new Task
|
||||
if not worker_params.debug and self._redirected_stdout_file_no is not None and \
|
||||
self._redirected_stdout_file_no > 2:
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
os.lseek(self._redirected_stdout_file_no, 0, 0)
|
||||
os.ftruncate(self._redirected_stdout_file_no, 0)
|
||||
except:
|
||||
pass
|
||||
|
||||
self.send_logs(
|
||||
task_id=task_id,
|
||||
lines=["task {} pulled from {} by worker {}\n".format(task_id, queue, self.worker_id)],
|
||||
@@ -619,7 +641,11 @@ class Worker(ServiceCommandSection):
|
||||
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
|
||||
self.run_one_task(queue, task_id, worker_params)
|
||||
self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues))
|
||||
break
|
||||
|
||||
# if we are using priority start pulling from the first always,
|
||||
# if we are doing round robin, pull from the next one
|
||||
if priority_order:
|
||||
break
|
||||
else:
|
||||
# sleep and retry polling
|
||||
if self._daemon_foreground or worker_params.debug:
|
||||
@@ -671,7 +697,7 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
self._session.print_configuration()
|
||||
|
||||
def daemon(self, queues, log_level, foreground=False, docker=False, detached=False, **kwargs):
|
||||
def daemon(self, queues, log_level, foreground=False, docker=False, detached=False, order_fairness=False, **kwargs):
|
||||
# if we do not need to create queues, make sure they are valid
|
||||
# match previous behaviour when we validated queue names before everything else
|
||||
queues = self._resolve_queue_names(queues, create_if_missing=kwargs.get('create_queue', False))
|
||||
@@ -683,6 +709,11 @@ class Worker(ServiceCommandSection):
|
||||
kwargs = self._verify_command_states(kwargs)
|
||||
docker = docker or kwargs.get('docker')
|
||||
|
||||
# We are not running a daemon we are killing one.
|
||||
# find the pid send termination signal and leave
|
||||
if kwargs.get('stop', False):
|
||||
return 1 if not self._kill_daemon() else 0
|
||||
|
||||
# make sure we only have a single instance,
|
||||
# also make sure we set worker_id properly and cache folders
|
||||
self._singleton()
|
||||
@@ -708,9 +739,8 @@ class Worker(ServiceCommandSection):
|
||||
self._register(queues)
|
||||
|
||||
# create temp config file with current configuration
|
||||
self.temp_config_path = safe_mkstemp(
|
||||
suffix=".cfg", prefix=".trains_agent.", text=True, name_only=True
|
||||
)
|
||||
self.temp_config_path = NamedTemporaryFile(
|
||||
suffix=".cfg", prefix=".trains_agent.", mode='w+t').name
|
||||
|
||||
# print docker image
|
||||
if docker is not False and docker is not None:
|
||||
@@ -750,6 +780,9 @@ class Worker(ServiceCommandSection):
|
||||
)
|
||||
)
|
||||
|
||||
if not self._session.debug_mode:
|
||||
self._temp_cleanup_list.append(name)
|
||||
|
||||
if not detached:
|
||||
# redirect std out/err to new file
|
||||
sys.stdout = sys.stderr = out_file
|
||||
@@ -757,6 +790,7 @@ class Worker(ServiceCommandSection):
|
||||
# in detached mode
|
||||
# fully detach stdin.stdout/stderr and leave main process, running in the background
|
||||
daemonize_process(out_file.fileno())
|
||||
self._redirected_stdout_file_no = out_file.fileno()
|
||||
# make sure we update the singleton lock file to the new pid
|
||||
Singleton.update_pid_file()
|
||||
# reprint headers to std file (we are now inside the daemon process)
|
||||
@@ -776,6 +810,7 @@ class Worker(ServiceCommandSection):
|
||||
debug=self._session.debug_mode,
|
||||
trace=self._session.trace,
|
||||
),
|
||||
priority_order=not order_fairness,
|
||||
)
|
||||
except Exception:
|
||||
tb = six.text_type(traceback.format_exc())
|
||||
@@ -1171,7 +1206,7 @@ class Worker(ServiceCommandSection):
|
||||
clone=("--clone" if entry_point == "clone_task" else ""),
|
||||
)
|
||||
else:
|
||||
change = 'ENTRYPOINT bash'
|
||||
change = 'ENTRYPOINT []'
|
||||
|
||||
print('Committing docker container to: {}'.format(target))
|
||||
print(commit_docker(container_name=target, docker_id=docker_id, apply_change=change))
|
||||
@@ -1998,7 +2033,7 @@ class Worker(ServiceCommandSection):
|
||||
print("Running in Docker {} mode (v19.03 and above) - using default docker image: {} running {}\n".format(
|
||||
'*standalone*' if self._standalone_mode else '', docker_image, python_version))
|
||||
temp_config = deepcopy(self._session.config)
|
||||
mounted_cache_dir = '/root/.trains/cache'
|
||||
mounted_cache_dir = self._docker_fixed_user_cache # '/root/.trains/cache'
|
||||
mounted_pip_dl_dir = '/root/.trains/pip-download-cache'
|
||||
mounted_vcs_cache = '/root/.trains/vcs-cache'
|
||||
mounted_venv_dir = '/root/.trains/venvs-builds'
|
||||
@@ -2022,6 +2057,7 @@ class Worker(ServiceCommandSection):
|
||||
host_pip_cache = Path(os.path.expandvars(self._session.config.get(
|
||||
"agent.docker_pip_cache", '~/.trains/pip-cache'))).expanduser().as_posix()
|
||||
host_ssh_cache = mkdtemp(prefix='trains_agent.ssh.')
|
||||
self._temp_cleanup_list.append(host_ssh_cache)
|
||||
|
||||
# make sure all folders are valid
|
||||
Path(host_apt_cache).mkdir(parents=True, exist_ok=True)
|
||||
@@ -2043,12 +2079,10 @@ class Worker(ServiceCommandSection):
|
||||
pass
|
||||
|
||||
# check if the .git credentials exist:
|
||||
host_git_credentials = Path('~/.git-credentials').expanduser()
|
||||
try:
|
||||
if host_git_credentials.is_file():
|
||||
host_git_credentials = host_git_credentials.as_posix()
|
||||
else:
|
||||
host_git_credentials = None
|
||||
host_git_credentials = [
|
||||
f.as_posix() for f in [Path('~/.git-credentials').expanduser(), Path('~/.gitconfig').expanduser()]
|
||||
if f.is_file()]
|
||||
except Exception:
|
||||
host_git_credentials = None
|
||||
|
||||
@@ -2063,6 +2097,8 @@ class Worker(ServiceCommandSection):
|
||||
cmds = [cmds]
|
||||
extra_shell_script_str = " ; ".join(map(str, cmds)) + " ; "
|
||||
|
||||
bash_script = self._session.config.get("agent.docker_init_bash_script", None)
|
||||
|
||||
self.temp_config_path = self.temp_config_path or safe_mkstemp(
|
||||
suffix=".cfg", prefix=".trains_agent.", text=True, name_only=True
|
||||
)
|
||||
@@ -2079,7 +2115,10 @@ class Worker(ServiceCommandSection):
|
||||
host_cache=host_cache, mounted_cache=mounted_cache_dir,
|
||||
host_pip_dl=host_pip_dl, mounted_pip_dl=mounted_pip_dl_dir,
|
||||
host_vcs_cache=host_vcs_cache, mounted_vcs_cache=mounted_vcs_cache,
|
||||
standalone_mode=self._standalone_mode, force_current_version=self._force_current_version)
|
||||
standalone_mode=self._standalone_mode,
|
||||
force_current_version=self._force_current_version,
|
||||
bash_script=bash_script,
|
||||
)
|
||||
return temp_config, partial(docker_cmd_functor, docker_cmd, temp_config)
|
||||
|
||||
@staticmethod
|
||||
@@ -2092,7 +2131,7 @@ class Worker(ServiceCommandSection):
|
||||
host_pip_dl, mounted_pip_dl,
|
||||
host_vcs_cache, mounted_vcs_cache,
|
||||
standalone_mode=False, extra_docker_arguments=None, extra_shell_script=None,
|
||||
force_current_version=None, host_git_credentials=None):
|
||||
force_current_version=None, host_git_credentials=None, bash_script=None):
|
||||
docker = 'docker'
|
||||
|
||||
base_cmd = [docker, 'run', '-t']
|
||||
@@ -2193,20 +2232,32 @@ class Worker(ServiceCommandSection):
|
||||
trains_agent_wheel = 'trains-agent{specify_version}'.format(specify_version=specify_version)
|
||||
|
||||
if not standalone_mode:
|
||||
update_scheme += \
|
||||
"echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean ; " \
|
||||
"chown -R root /root/.cache/pip ; " \
|
||||
"apt-get update ; " \
|
||||
"apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0 {python_single_digit}-pip ; " \
|
||||
"{python} -m pip install -U \"pip{pip_version}\" ; " \
|
||||
"{python} -m pip install -U {trains_agent_wheel} ; ".format(
|
||||
python_single_digit=python_version.split('.')[0],
|
||||
python=python_version, pip_version=PackageManager.get_pip_version(),
|
||||
trains_agent_wheel=trains_agent_wheel)
|
||||
if not bash_script:
|
||||
bash_script = [
|
||||
"echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean",
|
||||
"chown -R root /root/.cache/pip",
|
||||
"apt-get update",
|
||||
"apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0",
|
||||
"(which {python_single_digit} && {python_single_digit} -m pip --version) || " +
|
||||
"apt-get install -y {python_single_digit}-pip",
|
||||
]
|
||||
|
||||
docker_bash_script = " ; ".join(bash_script) if not isinstance(bash_script, str) else bash_script
|
||||
|
||||
update_scheme += (
|
||||
docker_bash_script + " ; " +
|
||||
"{python} -m pip install -U \"pip{pip_version}\" ; " +
|
||||
"{python} -m pip install -U {trains_agent_wheel} ; ").format(
|
||||
python_single_digit=python_version.split('.')[0],
|
||||
python=python_version, pip_version=PackageManager.get_pip_version(),
|
||||
trains_agent_wheel=trains_agent_wheel)
|
||||
|
||||
if host_git_credentials:
|
||||
for git_credentials in host_git_credentials:
|
||||
base_cmd += ['-v', '{}:/root/{}'.format(git_credentials, Path(git_credentials).name)]
|
||||
|
||||
base_cmd += (
|
||||
['-v', conf_file+':'+DOCKER_ROOT_CONF_FILE] +
|
||||
(['-v', host_git_credentials+':/root/.git-credentials'] if host_git_credentials else []) +
|
||||
(['-v', host_ssh_cache+':/root/.ssh'] if host_ssh_cache else []) +
|
||||
(['-v', host_apt_cache+':/var/cache/apt/archives'] if host_apt_cache else []) +
|
||||
(['-v', host_pip_cache+':/root/.cache/pip'] if host_pip_cache else []) +
|
||||
@@ -2248,13 +2299,13 @@ class Worker(ServiceCommandSection):
|
||||
os.setuid(self.uid)
|
||||
|
||||
# create a home folder for our user
|
||||
trains_agent_home = 'trains_agent_home{}'.format('.'+str(Singleton.get_slot()) if Singleton.get_slot() else '')
|
||||
trains_agent_home = self._run_as_user_home + '{}'.format('.'+str(Singleton.get_slot()) if Singleton.get_slot() else '')
|
||||
try:
|
||||
home_folder = '/trains_agent_home'
|
||||
home_folder = self._run_as_user_home
|
||||
rm_tree(home_folder)
|
||||
Path(home_folder).mkdir(parents=True, exist_ok=True)
|
||||
except:
|
||||
home_folder = '/home/trains_agent_home'
|
||||
home_folder = os.path.join('/home', self._run_as_user_home)
|
||||
rm_tree(home_folder)
|
||||
Path(home_folder).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
@@ -2273,6 +2324,10 @@ class Worker(ServiceCommandSection):
|
||||
# make sure we will be able to access the cache folder (we assume we have the ability change mod)
|
||||
if sdk_cache_folder:
|
||||
sdk_cache_folder = Path(os.path.expandvars(sdk_cache_folder)).expanduser().absolute()
|
||||
try:
|
||||
sdk_cache_folder.chmod(0o0777)
|
||||
except:
|
||||
pass
|
||||
for f in sdk_cache_folder.rglob('*'):
|
||||
try:
|
||||
f.chmod(0o0777)
|
||||
@@ -2300,8 +2355,41 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
return command, script_dir
|
||||
|
||||
def _kill_daemon(self):
|
||||
worker_id, worker_name = self._generate_worker_id_name()
|
||||
# Iterate over all running process
|
||||
for pid, uid, slot, file in sorted(Singleton.get_running_pids(), key=lambda x: x[1] or ''):
|
||||
# wither we have a match for the worker_id or we just pick the first one
|
||||
if pid >= 0 and (
|
||||
(worker_id and uid == worker_id) or
|
||||
(not worker_id and uid.startswith('{}:'.format(worker_name)))):
|
||||
# this is us kill it
|
||||
print('Terminating trains-agent worker_id={} pid={}'.format(uid, pid))
|
||||
if not terminate_process(pid, timeout=10):
|
||||
error('Could not terminate process pid={}'.format(pid))
|
||||
return True
|
||||
print('Could not find a running trains-agent instance with worker_name={} worker_id={}'.format(
|
||||
worker_name, worker_id))
|
||||
return False
|
||||
|
||||
def _singleton(self):
|
||||
# ensure singleton
|
||||
worker_id, worker_name = self._generate_worker_id_name()
|
||||
|
||||
# if we are running in services mode, we allow double register since
|
||||
# docker-compose will kill instances before they cleanup
|
||||
self.worker_id, worker_slot = Singleton.register_instance(
|
||||
unique_worker_id=worker_id, worker_name=worker_name, api_client=self._session.api_client,
|
||||
allow_double=bool(ENV_DOCKER_HOST_MOUNT.get()) # and bool(self._services_mode),
|
||||
)
|
||||
|
||||
if self.worker_id is None:
|
||||
error('Instance with the same WORKER_ID [{}] is already running'.format(worker_id))
|
||||
exit(1)
|
||||
# update folders based on free slot
|
||||
self._session.create_cache_folders(slot_index=worker_slot)
|
||||
|
||||
def _generate_worker_id_name(self):
|
||||
worker_id = self._session.config["agent.worker_id"]
|
||||
worker_name = self._session.config["agent.worker_name"]
|
||||
if not worker_id and os.environ.get('NVIDIA_VISIBLE_DEVICES') is not None:
|
||||
@@ -2312,18 +2400,7 @@ class Worker(ServiceCommandSection):
|
||||
pass
|
||||
else:
|
||||
worker_name = '{}:cpu'.format(worker_name)
|
||||
|
||||
# if we are running in services mode, we allow double register since
|
||||
# docker-compose will kill instances before they cleanup
|
||||
self.worker_id, worker_slot = Singleton.register_instance(
|
||||
unique_worker_id=worker_id, worker_name=worker_name, api_client=self._session.api_client,
|
||||
allow_double=bool(self._services_mode) and bool(ENV_DOCKER_HOST_MOUNT.get()))
|
||||
|
||||
if self.worker_id is None:
|
||||
error('Instance with the same WORKER_ID [{}] is already running'.format(worker_id))
|
||||
exit(1)
|
||||
# update folders based on free slot
|
||||
self._session.create_cache_folders(slot_index=worker_slot)
|
||||
return worker_id, worker_name
|
||||
|
||||
def _resolve_queue_names(self, queues, create_if_missing=False):
|
||||
if not queues:
|
||||
|
||||
0
trains_agent/external/__init__.py
vendored
Normal file
0
trains_agent/external/__init__.py
vendored
Normal file
22
trains_agent/external/requirements_parser/__init__.py
vendored
Normal file
22
trains_agent/external/requirements_parser/__init__.py
vendored
Normal file
@@ -0,0 +1,22 @@
|
||||
from .parser import parse # noqa
|
||||
|
||||
_MAJOR = 0
|
||||
_MINOR = 2
|
||||
_PATCH = 0
|
||||
|
||||
|
||||
def version_tuple():
|
||||
'''
|
||||
Returns a 3-tuple of ints that represent the version
|
||||
'''
|
||||
return (_MAJOR, _MINOR, _PATCH)
|
||||
|
||||
|
||||
def version():
|
||||
'''
|
||||
Returns a string representation of the version
|
||||
'''
|
||||
return '%d.%d.%d' % (version_tuple())
|
||||
|
||||
|
||||
__version__ = version()
|
||||
44
trains_agent/external/requirements_parser/fragment.py
vendored
Normal file
44
trains_agent/external/requirements_parser/fragment.py
vendored
Normal file
@@ -0,0 +1,44 @@
|
||||
import re
|
||||
|
||||
# Copied from pip
|
||||
# https://github.com/pypa/pip/blob/281eb61b09d87765d7c2b92f6982b3fe76ccb0af/pip/index.py#L947
|
||||
HASH_ALGORITHMS = set(['sha1', 'sha224', 'sha384', 'sha256', 'sha512', 'md5'])
|
||||
|
||||
extras_require_search = re.compile(
|
||||
r'(?P<name>.+)\[(?P<extras>[^\]]+)\]').search
|
||||
|
||||
|
||||
def parse_fragment(fragment_string):
|
||||
"""Takes a fragment string nd returns a dict of the components"""
|
||||
fragment_string = fragment_string.lstrip('#')
|
||||
|
||||
try:
|
||||
return dict(
|
||||
key_value_string.split('=')
|
||||
for key_value_string in fragment_string.split('&')
|
||||
)
|
||||
except ValueError:
|
||||
raise ValueError(
|
||||
'Invalid fragment string {fragment_string}'.format(
|
||||
fragment_string=fragment_string
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def get_hash_info(d):
|
||||
"""Returns the first matching hashlib name and value from a dict"""
|
||||
for key in d.keys():
|
||||
if key.lower() in HASH_ALGORITHMS:
|
||||
return key, d[key]
|
||||
|
||||
return None, None
|
||||
|
||||
|
||||
def parse_extras_require(egg):
|
||||
if egg is not None:
|
||||
match = extras_require_search(egg)
|
||||
if match is not None:
|
||||
name = match.group('name')
|
||||
extras = match.group('extras')
|
||||
return name, [extra.strip() for extra in extras.split(',')]
|
||||
return egg, []
|
||||
50
trains_agent/external/requirements_parser/parser.py
vendored
Normal file
50
trains_agent/external/requirements_parser/parser.py
vendored
Normal file
@@ -0,0 +1,50 @@
|
||||
import os
|
||||
import warnings
|
||||
|
||||
from .requirement import Requirement
|
||||
|
||||
|
||||
def parse(reqstr):
|
||||
"""
|
||||
Parse a requirements file into a list of Requirements
|
||||
|
||||
See: pip/req.py:parse_requirements()
|
||||
|
||||
:param reqstr: a string or file like object containing requirements
|
||||
:returns: a *generator* of Requirement objects
|
||||
"""
|
||||
filename = getattr(reqstr, 'name', None)
|
||||
try:
|
||||
# Python 2.x compatibility
|
||||
if not isinstance(reqstr, basestring):
|
||||
reqstr = reqstr.read()
|
||||
except NameError:
|
||||
# Python 3.x only
|
||||
if not isinstance(reqstr, str):
|
||||
reqstr = reqstr.read()
|
||||
|
||||
for line in reqstr.splitlines():
|
||||
line = line.strip()
|
||||
if line == '':
|
||||
continue
|
||||
elif not line or line.startswith('#'):
|
||||
# comments are lines that start with # only
|
||||
continue
|
||||
elif line.startswith('-r') or line.startswith('--requirement'):
|
||||
_, new_filename = line.split()
|
||||
new_file_path = os.path.join(os.path.dirname(filename or '.'),
|
||||
new_filename)
|
||||
with open(new_file_path) as f:
|
||||
for requirement in parse(f):
|
||||
yield requirement
|
||||
elif line.startswith('-f') or line.startswith('--find-links') or \
|
||||
line.startswith('-i') or line.startswith('--index-url') or \
|
||||
line.startswith('--extra-index-url') or \
|
||||
line.startswith('--no-index'):
|
||||
warnings.warn('Private repos not supported. Skipping.')
|
||||
continue
|
||||
elif line.startswith('-Z') or line.startswith('--always-unzip'):
|
||||
warnings.warn('Unused option --always-unzip. Skipping.')
|
||||
continue
|
||||
else:
|
||||
yield Requirement.parse(line)
|
||||
236
trains_agent/external/requirements_parser/requirement.py
vendored
Normal file
236
trains_agent/external/requirements_parser/requirement.py
vendored
Normal file
@@ -0,0 +1,236 @@
|
||||
from __future__ import unicode_literals
|
||||
import re
|
||||
from pkg_resources import Requirement as Req
|
||||
|
||||
from .fragment import get_hash_info, parse_fragment, parse_extras_require
|
||||
from .vcs import VCS, VCS_SCHEMES
|
||||
|
||||
|
||||
URI_REGEX = re.compile(
|
||||
r'^(?P<scheme>https?|file|ftps?)://(?P<path>[^#]+)'
|
||||
r'(#(?P<fragment>\S+))?'
|
||||
)
|
||||
|
||||
VCS_REGEX = re.compile(
|
||||
r'^(?P<scheme>{0})://'.format(r'|'.join(
|
||||
[scheme.replace('+', r'\+') for scheme in VCS_SCHEMES])) +
|
||||
r'((?P<login>[^/@]+)@)?'
|
||||
r'(?P<path>[^#@]+)'
|
||||
r'(@(?P<revision>[^#]+))?'
|
||||
r'(#(?P<fragment>\S+))?'
|
||||
)
|
||||
|
||||
# This matches just about everyting
|
||||
LOCAL_REGEX = re.compile(
|
||||
r'^((?P<scheme>file)://)?'
|
||||
r'(?P<path>[^#]+)' +
|
||||
r'(#(?P<fragment>\S+))?'
|
||||
)
|
||||
|
||||
|
||||
class Requirement(object):
|
||||
"""
|
||||
Represents a single requirementfrom trains_agent.external.requirements_parser.requirement import Requirement
|
||||
|
||||
Typically instances of this class are created with ``Requirement.parse``.
|
||||
For local file requirements, there's no verification that the file
|
||||
exists. This class attempts to be *dict-like*.
|
||||
|
||||
See: http://www.pip-installer.org/en/latest/logic.html
|
||||
|
||||
**Members**:
|
||||
|
||||
* ``line`` - the actual requirement line being parsed
|
||||
* ``editable`` - a boolean whether this requirement is "editable"
|
||||
* ``local_file`` - a boolean whether this requirement is a local file/path
|
||||
* ``specifier`` - a boolean whether this requirement used a requirement
|
||||
specifier (eg. "django>=1.5" or "requirements")
|
||||
* ``vcs`` - a string specifying the version control system
|
||||
* ``revision`` - a version control system specifier
|
||||
* ``name`` - the name of the requirement
|
||||
* ``uri`` - the URI if this requirement was specified by URI
|
||||
* ``subdirectory`` - the subdirectory fragment of the URI
|
||||
* ``path`` - the local path to the requirement
|
||||
* ``hash_name`` - the type of hashing algorithm indicated in the line
|
||||
* ``hash`` - the hash value indicated by the requirement line
|
||||
* ``extras`` - a list of extras for this requirement
|
||||
(eg. "mymodule[extra1, extra2]")
|
||||
* ``specs`` - a list of specs for this requirement
|
||||
(eg. "mymodule>1.5,<1.6" => [('>', '1.5'), ('<', '1.6')])
|
||||
"""
|
||||
|
||||
def __init__(self, line):
|
||||
# Do not call this private method
|
||||
self.line = line
|
||||
self.editable = False
|
||||
self.local_file = False
|
||||
self.specifier = False
|
||||
self.vcs = None
|
||||
self.name = None
|
||||
self.subdirectory = None
|
||||
self.uri = None
|
||||
self.path = None
|
||||
self.revision = None
|
||||
self.hash_name = None
|
||||
self.hash = None
|
||||
self.extras = []
|
||||
self.specs = []
|
||||
|
||||
def __repr__(self):
|
||||
return '<Requirement: "{0}">'.format(self.line)
|
||||
|
||||
def __getitem__(self, key):
|
||||
return getattr(self, key)
|
||||
|
||||
def keys(self):
|
||||
return self.__dict__.keys()
|
||||
|
||||
@classmethod
|
||||
def parse_editable(cls, line):
|
||||
"""
|
||||
Parses a Requirement from an "editable" requirement which is either
|
||||
a local project path or a VCS project URI.
|
||||
|
||||
See: pip/req.py:from_editable()
|
||||
|
||||
:param line: an "editable" requirement
|
||||
:returns: a Requirement instance for the given line
|
||||
:raises: ValueError on an invalid requirement
|
||||
"""
|
||||
|
||||
req = cls('-e {0}'.format(line))
|
||||
req.editable = True
|
||||
vcs_match = VCS_REGEX.match(line)
|
||||
local_match = LOCAL_REGEX.match(line)
|
||||
|
||||
if vcs_match is not None:
|
||||
groups = vcs_match.groupdict()
|
||||
if groups.get('login'):
|
||||
req.uri = '{scheme}://{login}@{path}'.format(**groups)
|
||||
else:
|
||||
req.uri = '{scheme}://{path}'.format(**groups)
|
||||
req.revision = groups['revision']
|
||||
if groups['fragment']:
|
||||
fragment = parse_fragment(groups['fragment'])
|
||||
egg = fragment.get('egg')
|
||||
req.name, req.extras = parse_extras_require(egg)
|
||||
req.hash_name, req.hash = get_hash_info(fragment)
|
||||
req.subdirectory = fragment.get('subdirectory')
|
||||
for vcs in VCS:
|
||||
if req.uri.startswith(vcs):
|
||||
req.vcs = vcs
|
||||
else:
|
||||
assert local_match is not None, 'This should match everything'
|
||||
groups = local_match.groupdict()
|
||||
req.local_file = True
|
||||
if groups['fragment']:
|
||||
fragment = parse_fragment(groups['fragment'])
|
||||
egg = fragment.get('egg')
|
||||
req.name, req.extras = parse_extras_require(egg)
|
||||
req.hash_name, req.hash = get_hash_info(fragment)
|
||||
req.subdirectory = fragment.get('subdirectory')
|
||||
req.path = groups['path']
|
||||
|
||||
return req
|
||||
|
||||
@classmethod
|
||||
def parse_line(cls, line):
|
||||
"""
|
||||
Parses a Requirement from a non-editable requirement.
|
||||
|
||||
See: pip/req.py:from_line()
|
||||
|
||||
:param line: a "non-editable" requirement
|
||||
:returns: a Requirement instance for the given line
|
||||
:raises: ValueError on an invalid requirement
|
||||
"""
|
||||
|
||||
req = cls(line)
|
||||
|
||||
vcs_match = VCS_REGEX.match(line)
|
||||
uri_match = URI_REGEX.match(line)
|
||||
local_match = LOCAL_REGEX.match(line)
|
||||
|
||||
if vcs_match is not None:
|
||||
groups = vcs_match.groupdict()
|
||||
if groups.get('login'):
|
||||
req.uri = '{scheme}://{login}@{path}'.format(**groups)
|
||||
else:
|
||||
req.uri = '{scheme}://{path}'.format(**groups)
|
||||
req.revision = groups['revision']
|
||||
if groups['fragment']:
|
||||
fragment = parse_fragment(groups['fragment'])
|
||||
egg = fragment.get('egg')
|
||||
req.name, req.extras = parse_extras_require(egg)
|
||||
req.hash_name, req.hash = get_hash_info(fragment)
|
||||
req.subdirectory = fragment.get('subdirectory')
|
||||
for vcs in VCS:
|
||||
if req.uri.startswith(vcs):
|
||||
req.vcs = vcs
|
||||
elif uri_match is not None:
|
||||
groups = uri_match.groupdict()
|
||||
req.uri = '{scheme}://{path}'.format(**groups)
|
||||
if groups['fragment']:
|
||||
fragment = parse_fragment(groups['fragment'])
|
||||
egg = fragment.get('egg')
|
||||
req.name, req.extras = parse_extras_require(egg)
|
||||
req.hash_name, req.hash = get_hash_info(fragment)
|
||||
req.subdirectory = fragment.get('subdirectory')
|
||||
if groups['scheme'] == 'file':
|
||||
req.local_file = True
|
||||
elif '#egg=' in line:
|
||||
# Assume a local file match
|
||||
assert local_match is not None, 'This should match everything'
|
||||
groups = local_match.groupdict()
|
||||
req.local_file = True
|
||||
if groups['fragment']:
|
||||
fragment = parse_fragment(groups['fragment'])
|
||||
egg = fragment.get('egg')
|
||||
name, extras = parse_extras_require(egg)
|
||||
req.name = fragment.get('egg')
|
||||
req.hash_name, req.hash = get_hash_info(fragment)
|
||||
req.subdirectory = fragment.get('subdirectory')
|
||||
req.path = groups['path']
|
||||
else:
|
||||
# This is a requirement specifier.
|
||||
# Delegate to pkg_resources and hope for the best
|
||||
req.specifier = True
|
||||
pkg_req = Req.parse(line)
|
||||
req.name = pkg_req.unsafe_name
|
||||
req.extras = list(pkg_req.extras)
|
||||
req.specs = pkg_req.specs
|
||||
return req
|
||||
|
||||
@classmethod
|
||||
def parse(cls, line):
|
||||
"""
|
||||
Parses a Requirement from a line of a requirement file.
|
||||
|
||||
:param line: a line of a requirement file
|
||||
:returns: a Requirement instance for the given line
|
||||
:raises: ValueError on an invalid requirement
|
||||
"""
|
||||
line = line.lstrip()
|
||||
if line.startswith('-e') or line.startswith('--editable'):
|
||||
# Editable installs are either a local project path
|
||||
# or a VCS project URI
|
||||
return cls.parse_editable(
|
||||
re.sub(r'^(-e|--editable=?)\s*', '', line))
|
||||
elif '@' in line:
|
||||
# Allegro bug fix: support 'name @ git+' entries
|
||||
name, vcs = line.split('@', 1)
|
||||
name = name.strip()
|
||||
vcs = vcs.strip()
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
# check if the name is valid & parsed
|
||||
Req.parse(name)
|
||||
# if we are here, name is a valid package name, check if the vcs part is valid
|
||||
if VCS_REGEX.match(vcs):
|
||||
req = cls.parse_line(vcs)
|
||||
req.name = name
|
||||
return req
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return cls.parse_line(line)
|
||||
30
trains_agent/external/requirements_parser/vcs.py
vendored
Normal file
30
trains_agent/external/requirements_parser/vcs.py
vendored
Normal file
@@ -0,0 +1,30 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
VCS = [
|
||||
'git',
|
||||
'hg',
|
||||
'svn',
|
||||
'bzr',
|
||||
]
|
||||
|
||||
VCS_SCHEMES = [
|
||||
'git',
|
||||
'git+https',
|
||||
'git+ssh',
|
||||
'git+git',
|
||||
'hg+http',
|
||||
'hg+https',
|
||||
'hg+static-http',
|
||||
'hg+ssh',
|
||||
'svn',
|
||||
'svn+svn',
|
||||
'svn+http',
|
||||
'svn+https',
|
||||
'svn+ssh',
|
||||
'bzr+http',
|
||||
'bzr+https',
|
||||
'bzr+ssh',
|
||||
'bzr+sftp',
|
||||
'bzr+ftp',
|
||||
'bzr+lp',
|
||||
]
|
||||
@@ -173,13 +173,30 @@ def normalize_path(*paths):
|
||||
|
||||
|
||||
def safe_remove_file(filename, error_message=None):
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
os.remove(filename)
|
||||
if filename:
|
||||
os.remove(filename)
|
||||
except Exception:
|
||||
if error_message:
|
||||
print(error_message)
|
||||
|
||||
|
||||
def safe_remove_tree(filename):
|
||||
if not filename:
|
||||
return
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
shutil.rmtree(filename, ignore_errors=True)
|
||||
except Exception:
|
||||
pass
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
os.remove(filename)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def get_python_path(script_dir, entry_point, package_api):
|
||||
try:
|
||||
python_path_sep = ';' if is_windows_platform() else ':'
|
||||
|
||||
@@ -200,24 +200,30 @@ class GPUStatCollection(object):
|
||||
GPUStatCollection.global_processes[nv_process.pid] = \
|
||||
psutil.Process(pid=nv_process.pid)
|
||||
ps_process = GPUStatCollection.global_processes[nv_process.pid]
|
||||
process['username'] = ps_process.username()
|
||||
# cmdline returns full path;
|
||||
# as in `ps -o comm`, get short cmdnames.
|
||||
_cmdline = ps_process.cmdline()
|
||||
if not _cmdline:
|
||||
# sometimes, zombie or unknown (e.g. [kworker/8:2H])
|
||||
process['command'] = '?'
|
||||
process['full_command'] = ['?']
|
||||
else:
|
||||
process['command'] = os.path.basename(_cmdline[0])
|
||||
process['full_command'] = _cmdline
|
||||
# Bytes to MBytes
|
||||
process['gpu_memory_usage'] = nv_process.usedGpuMemory // MB
|
||||
process['cpu_percent'] = ps_process.cpu_percent()
|
||||
process['cpu_memory_usage'] = \
|
||||
round((ps_process.memory_percent() / 100.0) *
|
||||
psutil.virtual_memory().total)
|
||||
process['pid'] = nv_process.pid
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
# we do not actually use these, so no point in collecting them
|
||||
# process['username'] = ps_process.username()
|
||||
# # cmdline returns full path;
|
||||
# # as in `ps -o comm`, get short cmdnames.
|
||||
# _cmdline = ps_process.cmdline()
|
||||
# if not _cmdline:
|
||||
# # sometimes, zombie or unknown (e.g. [kworker/8:2H])
|
||||
# process['command'] = '?'
|
||||
# process['full_command'] = ['?']
|
||||
# else:
|
||||
# process['command'] = os.path.basename(_cmdline[0])
|
||||
# process['full_command'] = _cmdline
|
||||
# process['cpu_percent'] = ps_process.cpu_percent()
|
||||
# process['cpu_memory_usage'] = \
|
||||
# round((ps_process.memory_percent() / 100.0) *
|
||||
# psutil.virtual_memory().total)
|
||||
# Bytes to MBytes
|
||||
process['gpu_memory_usage'] = nv_process.usedGpuMemory // MB
|
||||
except Exception:
|
||||
# insufficient permissions
|
||||
pass
|
||||
return process
|
||||
|
||||
if not GPUStatCollection._gpu_device_info.get(index):
|
||||
@@ -285,12 +291,13 @@ class GPUStatCollection(object):
|
||||
# e.g. nvidia-smi reset or reboot the system
|
||||
pass
|
||||
|
||||
# TODO: Do not block if full process info is not requested
|
||||
time.sleep(0.1)
|
||||
for process in processes:
|
||||
pid = process['pid']
|
||||
cache_process = GPUStatCollection.global_processes[pid]
|
||||
process['cpu_percent'] = cache_process.cpu_percent()
|
||||
# we do not actually use these, so no point in collecting them
|
||||
# # TODO: Do not block if full process info is not requested
|
||||
# time.sleep(0.1)
|
||||
# for process in processes:
|
||||
# pid = process['pid']
|
||||
# cache_process = GPUStatCollection.global_processes[pid]
|
||||
# process['cpu_percent'] = cache_process.cpu_percent()
|
||||
|
||||
index = N.nvmlDeviceGetIndex(handle)
|
||||
gpu_info = {
|
||||
|
||||
@@ -14,8 +14,8 @@ import yaml
|
||||
from time import time
|
||||
from attr import attrs, attrib, Factory
|
||||
from pathlib2 import Path
|
||||
from requirements import parse
|
||||
from requirements.requirement import Requirement
|
||||
from trains_agent.external.requirements_parser import parse
|
||||
from trains_agent.external.requirements_parser.requirement import Requirement
|
||||
|
||||
from trains_agent.errors import CommandFailedError
|
||||
from trains_agent.helper.base import rm_tree, NonStrictAttrs, select_for_platform, is_windows_platform
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import re
|
||||
from collections import OrderedDict
|
||||
from typing import Text
|
||||
|
||||
@@ -33,6 +34,9 @@ class ExternalRequirements(SimpleSubstitution):
|
||||
freeze_base = ''
|
||||
|
||||
req_line = req.tostr(markers=False)
|
||||
if req_line.strip().startswith('-e ') or req_line.strip().startswith('--editable'):
|
||||
req_line = re.sub(r'^(-e|--editable=?)\s*', '', req_line, count=1)
|
||||
|
||||
if req.req.vcs and req_line.startswith('git+'):
|
||||
try:
|
||||
url_no_frag = furl(req_line)
|
||||
@@ -47,9 +51,10 @@ class ExternalRequirements(SimpleSubstitution):
|
||||
vcs._set_ssh_url()
|
||||
new_req_line = 'git+{}{}'.format(vcs.url_with_auth, fragment)
|
||||
if new_req_line != req_line:
|
||||
url_pass = furl(new_req_line).password
|
||||
furl_line = furl(new_req_line)
|
||||
print('Replacing original pip vcs \'{}\' with \'{}\''.format(
|
||||
req_line, new_req_line.replace(url_pass, '****', 1) if url_pass else new_req_line))
|
||||
req_line,
|
||||
furl_line.set(password='xxxxxx').tostr() if furl_line.password else new_req_line))
|
||||
req_line = new_req_line
|
||||
except Exception:
|
||||
print('WARNING: Failed parsing pip git install, using original line {}'.format(req_line))
|
||||
|
||||
@@ -12,15 +12,15 @@ from typing import Text, List, Type, Optional, Tuple, Dict
|
||||
|
||||
from pathlib2 import Path
|
||||
from pyhocon import ConfigTree
|
||||
from requirements import parse
|
||||
# noinspection PyPackageRequirements
|
||||
from requirements.requirement import Requirement
|
||||
|
||||
import six
|
||||
from trains_agent.definitions import PIP_EXTRA_INDICES
|
||||
from trains_agent.helper.base import warning, is_conda, which, join_lines, is_windows_platform
|
||||
from trains_agent.helper.process import Argv, PathLike
|
||||
from trains_agent.session import Session, normalize_cuda_version
|
||||
from trains_agent.external.requirements_parser import parse
|
||||
from trains_agent.external.requirements_parser.requirement import Requirement
|
||||
|
||||
from .translator import RequirementsTranslator
|
||||
|
||||
|
||||
@@ -57,7 +57,7 @@ class MarkerRequirement(object):
|
||||
elif self.vcs:
|
||||
# leave the line as is, let pip handle it
|
||||
if self.line:
|
||||
parts = [self.line]
|
||||
return self.line
|
||||
else:
|
||||
# let's build the line manually
|
||||
parts = [
|
||||
|
||||
@@ -11,6 +11,7 @@ from copy import deepcopy
|
||||
from distutils.spawn import find_executable
|
||||
from itertools import chain, repeat, islice
|
||||
from os.path import devnull
|
||||
from time import sleep
|
||||
from typing import Union, Text, Sequence, Any, TypeVar, Callable
|
||||
|
||||
import psutil
|
||||
@@ -41,6 +42,30 @@ def get_bash_output(cmd, strip=False, stderr=subprocess.STDOUT, stdin=False):
|
||||
return output if not strip or not output else output.strip()
|
||||
|
||||
|
||||
def terminate_process(pid, timeout=10.):
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
proc = psutil.Process(pid)
|
||||
proc.terminate()
|
||||
cnt = 0
|
||||
while proc.is_running() and cnt < timeout:
|
||||
sleep(1.)
|
||||
cnt += 1
|
||||
proc.terminate()
|
||||
cnt = 0
|
||||
while proc.is_running() and cnt < timeout:
|
||||
sleep(1.)
|
||||
cnt += 1
|
||||
proc.kill()
|
||||
except Exception:
|
||||
pass
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
return not psutil.Process(pid).is_running()
|
||||
except Exception:
|
||||
return True
|
||||
|
||||
|
||||
def kill_all_child_processes(pid=None):
|
||||
# get current process if pid not provided
|
||||
include_parent = True
|
||||
|
||||
@@ -37,6 +37,10 @@ class Singleton(object):
|
||||
except:
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def get_lock_filename(cls):
|
||||
return os.path.join(cls._get_temp_folder(), cls._lock_file_name)
|
||||
|
||||
@classmethod
|
||||
def register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False):
|
||||
"""
|
||||
@@ -47,7 +51,7 @@ class Singleton(object):
|
||||
:return: (str worker_id, int slot_number) Return None value on instance already running
|
||||
"""
|
||||
# try to lock file
|
||||
lock_file = os.path.join(cls._get_temp_folder(), cls._lock_file_name)
|
||||
lock_file = cls.get_lock_filename()
|
||||
timeout = 0
|
||||
while os.path.exists(lock_file):
|
||||
if timeout > cls._lock_timeout:
|
||||
@@ -79,30 +83,41 @@ class Singleton(object):
|
||||
return ret
|
||||
|
||||
@classmethod
|
||||
def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False):
|
||||
if cls.worker_id:
|
||||
return cls.worker_id, cls.instance_slot
|
||||
# make sure we have a unique name
|
||||
instance_num = 0
|
||||
def get_running_pids(cls):
|
||||
temp_folder = cls._get_temp_folder()
|
||||
files = glob(os.path.join(temp_folder, cls.prefix + cls.sep + '*' + cls.ext))
|
||||
slots = {}
|
||||
pids = []
|
||||
for file in files:
|
||||
parts = file.split(cls.sep)
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
pid = int(parts[1])
|
||||
if not psutil.pid_exists(pid):
|
||||
pid = -1
|
||||
except Exception:
|
||||
# something is wrong, use non existing pid and delete the file
|
||||
pid = -1
|
||||
|
||||
uid, slot = None, None
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
with open(file, 'r') as f:
|
||||
uid, slot = str(f.read()).split('\n')
|
||||
slot = int(slot)
|
||||
except Exception:
|
||||
pass
|
||||
pids.append((pid, uid, slot, file))
|
||||
|
||||
return pids
|
||||
|
||||
@classmethod
|
||||
def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False):
|
||||
if cls.worker_id:
|
||||
return cls.worker_id, cls.instance_slot
|
||||
# make sure we have a unique name
|
||||
instance_num = 0
|
||||
slots = {}
|
||||
for pid, uid, slot, file in cls.get_running_pids():
|
||||
worker = None
|
||||
if api_client and ENV_DOCKER_HOST_MOUNT.get() and uid:
|
||||
try:
|
||||
@@ -111,7 +126,7 @@ class Singleton(object):
|
||||
worker = None
|
||||
|
||||
# count active instances and delete dead files
|
||||
if not worker and not psutil.pid_exists(pid):
|
||||
if not worker and pid < 0:
|
||||
# delete the file
|
||||
try:
|
||||
os.remove(os.path.join(file))
|
||||
@@ -165,3 +180,9 @@ class Singleton(object):
|
||||
@classmethod
|
||||
def get_slot(cls):
|
||||
return cls.instance_slot or 0
|
||||
|
||||
@classmethod
|
||||
def get_pid_file(cls):
|
||||
if not cls._pid_file:
|
||||
return None
|
||||
return cls._pid_file.name
|
||||
|
||||
@@ -68,6 +68,10 @@ DAEMON_ARGS = dict({
|
||||
'dest': 'queues',
|
||||
'type': foreign_object_id('queues'),
|
||||
},
|
||||
'--order-fairness': {
|
||||
'help': 'Pull from each queue in a round-robin order, instead of priority order.',
|
||||
'action': 'store_true',
|
||||
},
|
||||
'--standalone-mode': {
|
||||
'help': 'Do not use any network connects, assume everything is pre-installed',
|
||||
'action': 'store_true',
|
||||
@@ -85,6 +89,10 @@ DAEMON_ARGS = dict({
|
||||
'action': 'store_true',
|
||||
'aliases': ['-d'],
|
||||
},
|
||||
'--stop': {
|
||||
'help': 'Stop the running agent (based on the same set of arguments)',
|
||||
'action': 'store_true',
|
||||
},
|
||||
|
||||
}, **WORKER_ARGS)
|
||||
|
||||
|
||||
@@ -73,9 +73,11 @@ class Session(_Session):
|
||||
os.environ[LOCAL_CONFIG_FILE_OVERRIDE_VAR] = config_file
|
||||
if not Path(config_file).is_file():
|
||||
raise ValueError("Could not open configuration file: {}".format(config_file))
|
||||
|
||||
cpu_only = kwargs.get('cpu_only')
|
||||
if cpu_only:
|
||||
os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['NVIDIA_VISIBLE_DEVICES'] = 'none'
|
||||
|
||||
if kwargs.get('gpus') and not os.environ.get('KUBERNETES_SERVICE_HOST') \
|
||||
and not os.environ.get('KUBERNETES_PORT'):
|
||||
# CUDA_VISIBLE_DEVICES does not support 'all'
|
||||
@@ -84,6 +86,7 @@ class Session(_Session):
|
||||
os.environ['NVIDIA_VISIBLE_DEVICES'] = kwargs.get('gpus')
|
||||
else:
|
||||
os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['NVIDIA_VISIBLE_DEVICES'] = kwargs.get('gpus')
|
||||
|
||||
if kwargs.get('only_load_config'):
|
||||
from trains_agent.backend_api.config import load
|
||||
self.config = load()
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = '0.15.1'
|
||||
__version__ = '0.16.0'
|
||||
|
||||
Reference in New Issue
Block a user