mirror of
https://github.com/clearml/clearml-agent
synced 2025-06-26 18:16:15 +00:00
Compare commits
24 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5446aed9cf | ||
|
|
b94ec85461 | ||
|
|
f55f4f7535 | ||
|
|
c87da3a079 | ||
|
|
c3590a53a8 | ||
|
|
a4315722ab | ||
|
|
c901bd331c | ||
|
|
df97f170a2 | ||
|
|
a30a2dad66 | ||
|
|
2432f5bb68 | ||
|
|
341086d86a | ||
|
|
1163c96438 | ||
|
|
4c120d7cd0 | ||
|
|
966a9758b8 | ||
|
|
f58071fc74 | ||
|
|
8712c5e636 | ||
|
|
a51f9bed49 | ||
|
|
531e514003 | ||
|
|
2cd9e706c8 | ||
|
|
e3e6a1dda8 | ||
|
|
92b5ce61a0 | ||
|
|
36073ad488 | ||
|
|
d89d0f9ff5 | ||
|
|
14c48d0a78 |
@@ -60,6 +60,8 @@ It is a zero configuration fire-and-forget execution agent, providing a full ML/
|
||||
### Kubernetes Integration (Optional)
|
||||
We think Kubernetes is awesome, but it should be a choice.
|
||||
We designed `clearml-agent` so you can run bare-metal or inside a pod with any mix that fits your environment.
|
||||
|
||||
Find Dockerfiles in the [docker](./docker) dir and a helm Chart in https://github.com/allegroai/clearml-helm-charts
|
||||
#### Benefits of integrating existing K8s with ClearML-Agent
|
||||
- ClearML-Agent adds the missing scheduling capabilities to K8s
|
||||
- Allowing for more flexible automation from code
|
||||
|
||||
@@ -12,7 +12,7 @@ from clearml_agent.definitions import FileBuffering, CONFIG_FILE
|
||||
from clearml_agent.helper.base import reverse_home_folder_expansion, chain_map, named_temporary_file
|
||||
from clearml_agent.helper.process import ExitStatus
|
||||
from . import interface, session, definitions, commands
|
||||
from .errors import ConfigFileNotFound, Sigterm, APIError
|
||||
from .errors import ConfigFileNotFound, Sigterm, APIError, CustomBuildScriptFailed
|
||||
from .helper.trace import PackageTrace
|
||||
from .interface import get_parser
|
||||
|
||||
@@ -44,6 +44,8 @@ def run_command(parser, args, command_name):
|
||||
debug = command._session.debug_mode
|
||||
func = getattr(command, command_name)
|
||||
return func(**args_dict)
|
||||
except CustomBuildScriptFailed as e:
|
||||
command_class.exit(e.message, e.errno)
|
||||
except ConfigFileNotFound:
|
||||
message = 'Cannot find configuration file in "{}".\n' \
|
||||
'To create a configuration file, run:\n' \
|
||||
|
||||
@@ -11,7 +11,11 @@
|
||||
|
||||
# Set GIT user/pass credentials (if user/pass are set, GIT protocol will be set to https)
|
||||
# leave blank for GIT SSH credentials (set force_git_ssh_protocol=true to force SSH protocol)
|
||||
# Notice: GitHub personal token is equivalent to password, you can put it directly into `git_pass`
|
||||
# **Notice**: GitHub personal token is equivalent to password, you can put it directly into `git_pass`
|
||||
# To learn how to generate git token GitHub/Bitbucket/GitLab:
|
||||
# https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/creating-a-personal-access-token
|
||||
# https://support.atlassian.com/bitbucket-cloud/docs/app-passwords/
|
||||
# https://docs.gitlab.com/ee/user/profile/personal_access_tokens.html
|
||||
# git_user: ""
|
||||
# git_pass: ""
|
||||
# git_host: ""
|
||||
@@ -35,6 +39,11 @@
|
||||
# default false, only the working directory will be added to the PYHTONPATH
|
||||
# force_git_root_python_path: false
|
||||
|
||||
# in docker mode, if container's entrypoint automatically activated a virtual environment
|
||||
# use the activated virtual environment and install everything there
|
||||
# set to False to disable, and always create a new venv inheriting from the system_site_packages
|
||||
# docker_use_activated_venv: true
|
||||
|
||||
# select python package manager:
|
||||
# currently supported: pip, conda and poetry
|
||||
# if "pip" or "conda" are used, the agent installs the required packages
|
||||
@@ -269,4 +278,34 @@
|
||||
# target_format: json
|
||||
# }
|
||||
# }
|
||||
|
||||
# Specifies a custom environment setup script to be executed instead of installing a virtual environment.
|
||||
# If provided, this script is executed following Git cloning. Script command may include environment variable and
|
||||
# will be expanded before execution (e.g. "$CLEARML_GIT_ROOT/script.sh").
|
||||
# The script can also be specified using the CLEARML_AGENT_CUSTOM_BUILD_SCRIPT environment variable.
|
||||
#
|
||||
# When running the script, the following environment variables will be set:
|
||||
# - CLEARML_CUSTOM_BUILD_TASK_CONFIG_JSON: specifies a path to a temporary files containing the complete task
|
||||
# contents in JSON format
|
||||
# - CLEARML_TASK_SCRIPT_ENTRY: task entrypoint script as defined in the task's script section
|
||||
# - CLEARML_TASK_WORKING_DIR: task working directory as defined in the task's script section
|
||||
# - CLEARML_VENV_PATH: path to the agent's default virtual environment path (as defined in the configuration)
|
||||
# - CLEARML_GIT_ROOT: path to the cloned Git repository
|
||||
# - CLEARML_CUSTOM_BUILD_OUTPUT: a path to a non-existing file that may be created by the script. If created,
|
||||
# this file must be in the following JSON format:
|
||||
# ```json
|
||||
# {
|
||||
# "binary": "/absolute/path/to/python-executable",
|
||||
# "entry_point": "/absolute/path/to/task-entrypoint-script",
|
||||
# "working_dir": "/absolute/path/to/task-working/dir"
|
||||
# }
|
||||
# ```
|
||||
# If provided, the agent will use these instead of the predefined task script section to execute the task and will
|
||||
# skip virtual environment creation.
|
||||
#
|
||||
# In case the custom script returns with a non-zero exit code, the agent will fail with the same exit code.
|
||||
# In case the custom script is specified but does not exist, or if the custom script does not write valid content
|
||||
# into the file specified in CLEARML_CUSTOM_BUILD_OUTPUT, the agent will emit a warning and continue with the
|
||||
# standard flow.
|
||||
custom_build_script: ""
|
||||
}
|
||||
|
||||
@@ -15,6 +15,8 @@ ENV_NO_DEFAULT_SERVER = EnvEntry("CLEARML_NO_DEFAULT_SERVER", "TRAINS_NO_DEFAULT
|
||||
ENV_DISABLE_VAULT_SUPPORT = EnvEntry('CLEARML_AGENT_DISABLE_VAULT_SUPPORT', type=bool)
|
||||
ENV_ENABLE_ENV_CONFIG_SECTION = EnvEntry('CLEARML_AGENT_ENABLE_ENV_CONFIG_SECTION', type=bool)
|
||||
ENV_ENABLE_FILES_CONFIG_SECTION = EnvEntry('CLEARML_AGENT_ENABLE_FILES_CONFIG_SECTION', type=bool)
|
||||
ENV_VENV_CONFIGURED = EnvEntry('VIRTUAL_ENV', type=str)
|
||||
ENV_PROPAGATE_EXITCODE = EnvEntry("CLEARML_AGENT_PROPAGATE_EXITCODE", type=bool, default=False)
|
||||
ENV_INITIAL_CONNECT_RETRY_OVERRIDE = EnvEntry(
|
||||
'CLEARML_AGENT_INITIAL_CONNECT_RETRY_OVERRIDE', default=True, converter=safe_text_to_bool
|
||||
)
|
||||
|
||||
@@ -206,7 +206,7 @@ class Session(TokenManager):
|
||||
http_retries_config = dict(**http_retries_config)
|
||||
http_retries_config['connect'] = connect_retries
|
||||
|
||||
return http_retries_config, get_http_session_with_retry(**http_retries_config)
|
||||
return http_retries_config, get_http_session_with_retry(config=self.config or None, **http_retries_config)
|
||||
|
||||
def load_vaults(self):
|
||||
if not self.check_min_api_version("2.15") or self.feature_set == "basic":
|
||||
|
||||
@@ -12,7 +12,7 @@ from clearml_agent.backend_config.defs import LOCAL_CONFIG_FILES
|
||||
|
||||
description = """
|
||||
Please create new clearml credentials through the settings page in your `clearml-server` web app,
|
||||
or create a free account at https://app.clear.ml/webapp-configuration
|
||||
or create a free account at https://app.clear.ml/settings/webapp-configuration
|
||||
|
||||
In the settings > workspace page, press "Create new credentials", then press "Copy to clipboard".
|
||||
|
||||
@@ -27,9 +27,9 @@ except Exception:
|
||||
|
||||
host_description = """
|
||||
Editing configuration file: {CONFIG_FILE}
|
||||
Enter the url of the clearml-server's Web service, for example: {HOST}
|
||||
Enter the url of the clearml-server's Web service, for example: {HOST} or https://app.clear.ml
|
||||
""".format(
|
||||
CONFIG_FILE=LOCAL_CONFIG_FILES[0],
|
||||
CONFIG_FILE=LOCAL_CONFIG_FILES[-1],
|
||||
HOST=def_host,
|
||||
)
|
||||
|
||||
@@ -84,7 +84,7 @@ def main():
|
||||
host = input_url('API Host', api_server)
|
||||
else:
|
||||
print(host_description)
|
||||
host = input_url('WEB Host', '')
|
||||
host = input_url('WEB Host', 'https://app.clear.ml')
|
||||
|
||||
parsed_host = verify_url(host)
|
||||
api_host, files_host, web_host = parse_host(parsed_host, allow_input=True)
|
||||
@@ -116,9 +116,15 @@ def main():
|
||||
print('Enter git username for repository cloning (leave blank for SSH key authentication): [] ', end='')
|
||||
git_user = input()
|
||||
if git_user.strip():
|
||||
print('Enter password for user \'{}\': '.format(git_user), end='')
|
||||
print(
|
||||
"Git personal token is equivalent to a password, to learn how to generate a token:\n"
|
||||
" GitHub: https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/creating-a-personal-access-token\n" # noqa
|
||||
" Bitbucket: https://support.atlassian.com/bitbucket-cloud/docs/app-passwords/\n"
|
||||
" GitLab: https://docs.gitlab.com/ee/user/profile/personal_access_tokens.html\n"
|
||||
)
|
||||
print('Enter git password token for user \'{}\': '.format(git_user), end='')
|
||||
git_pass = input()
|
||||
print('Git repository cloning will be using user={} password={}'.format(git_user, git_pass))
|
||||
print('Git repository cloning will be using user={} token={}'.format(git_user, git_pass))
|
||||
else:
|
||||
git_user = None
|
||||
git_pass = None
|
||||
|
||||
@@ -21,6 +21,7 @@ from distutils.spawn import find_executable
|
||||
from distutils.util import strtobool
|
||||
from functools import partial
|
||||
from itertools import chain
|
||||
from os.path import basename
|
||||
from tempfile import mkdtemp, NamedTemporaryFile
|
||||
from time import sleep, time
|
||||
from typing import Text, Optional, Any, Tuple, List
|
||||
@@ -38,7 +39,9 @@ from clearml_agent.backend_api.services import queues as queues_api
|
||||
from clearml_agent.backend_api.services import tasks as tasks_api
|
||||
from clearml_agent.backend_api.services import workers as workers_api
|
||||
from clearml_agent.backend_api.session import CallResult
|
||||
from clearml_agent.backend_api.session.defs import ENV_ENABLE_ENV_CONFIG_SECTION, ENV_ENABLE_FILES_CONFIG_SECTION
|
||||
from clearml_agent.backend_api.session.defs import (
|
||||
ENV_ENABLE_ENV_CONFIG_SECTION, ENV_ENABLE_FILES_CONFIG_SECTION,
|
||||
ENV_VENV_CONFIGURED, ENV_PROPAGATE_EXITCODE, )
|
||||
from clearml_agent.backend_config.defs import UptimeConf
|
||||
from clearml_agent.backend_config.utils import apply_environment, apply_files
|
||||
from clearml_agent.commands.base import resolve_names, ServiceCommandSection
|
||||
@@ -64,10 +67,17 @@ from clearml_agent.definitions import (
|
||||
ENV_SSH_AUTH_SOCK,
|
||||
ENV_AGENT_SKIP_PIP_VENV_INSTALL,
|
||||
ENV_EXTRA_DOCKER_ARGS,
|
||||
ENV_CUSTOM_BUILD_SCRIPT, ENV_AGENT_SKIP_PYTHON_ENV_INSTALL, WORKING_STANDALONE_DIR,
|
||||
|
||||
)
|
||||
from clearml_agent.definitions import WORKING_REPOSITORY_DIR, PIP_EXTRA_INDICES
|
||||
from clearml_agent.errors import APIError, CommandFailedError, Sigterm
|
||||
from clearml_agent.errors import (
|
||||
APIError,
|
||||
CommandFailedError,
|
||||
Sigterm,
|
||||
SkippedCustomBuildScript,
|
||||
CustomBuildScriptFailed,
|
||||
)
|
||||
from clearml_agent.helper.base import (
|
||||
return_list,
|
||||
print_parameters,
|
||||
@@ -217,7 +227,7 @@ class LiteralScriptManager(object):
|
||||
location = None
|
||||
location = location or (repo_info and repo_info.root)
|
||||
if not location:
|
||||
location = Path(self.venv_folder, "code")
|
||||
location = Path(self.venv_folder, WORKING_STANDALONE_DIR)
|
||||
location.mkdir(exist_ok=True, parents=True)
|
||||
log.debug("selected execution directory: %s", location)
|
||||
return Text(location), self.write(task, location, execution.entry_point)
|
||||
@@ -629,7 +639,7 @@ class Worker(ServiceCommandSection):
|
||||
pass
|
||||
|
||||
def run_one_task(self, queue, task_id, worker_args, docker=None, task_session=None):
|
||||
# type: (Text, Text, WorkerParams, Optional[Text]) -> ()
|
||||
# type: (Text, Text, WorkerParams, Optional[Text]) -> int
|
||||
"""
|
||||
Run one task pulled from queue.
|
||||
:param queue: ID of queue that task was pulled from
|
||||
@@ -637,6 +647,8 @@ class Worker(ServiceCommandSection):
|
||||
:param worker_args: Worker command line arguments
|
||||
:param task_session: The session for running operations on the passed task
|
||||
:param docker: Docker image in which the execution task will run
|
||||
|
||||
:return: exit code (0 is success)
|
||||
"""
|
||||
# start new process and execute task id
|
||||
# "Running task '{}'".format(task_id)
|
||||
@@ -697,6 +709,9 @@ class Worker(ServiceCommandSection):
|
||||
)
|
||||
if self._impersonate_as_task_owner:
|
||||
docker_params["auth_token"] = task_session.token
|
||||
elif self._session.access_key is None or self._session.secret_key is None:
|
||||
# We're using a token right now
|
||||
docker_params["auth_token"] = self._session.token
|
||||
if self._worker_tags:
|
||||
docker_params["worker_tags"] = self._worker_tags
|
||||
if self._services_mode:
|
||||
@@ -719,7 +734,7 @@ class Worker(ServiceCommandSection):
|
||||
else:
|
||||
print("Warning: generated docker container name is invalid: {}".format(name))
|
||||
|
||||
full_docker_cmd = self.docker_image_func(**docker_params)
|
||||
full_docker_cmd = self.docker_image_func(env_task_id=task_id, **docker_params)
|
||||
|
||||
# if we are using the default docker, update back the Task:
|
||||
if default_docker:
|
||||
@@ -835,6 +850,8 @@ class Worker(ServiceCommandSection):
|
||||
# unregister this worker, it was killed
|
||||
self._unregister()
|
||||
|
||||
return status
|
||||
|
||||
def get_task_session(self, user, company):
|
||||
"""
|
||||
Get task session for the user by cloning the agent session
|
||||
@@ -1257,6 +1274,7 @@ class Worker(ServiceCommandSection):
|
||||
self._session.print_configuration()
|
||||
|
||||
def daemon(self, queues, log_level, foreground=False, docker=False, detached=False, order_fairness=False, **kwargs):
|
||||
self._apply_extra_configuration()
|
||||
|
||||
# check that we have docker command if we need it
|
||||
if docker not in (False, None) and not check_if_command_exists("docker"):
|
||||
@@ -1291,8 +1309,12 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
# We are not running a daemon we are killing one.
|
||||
# find the pid send termination signal and leave
|
||||
if kwargs.get('stop', False):
|
||||
return 1 if not self._kill_daemon(dynamic_gpus=dynamic_gpus) else 0
|
||||
if kwargs.get('stop', False) is not False:
|
||||
return_code = 0
|
||||
for worker_id in kwargs.get('stop') or [None]:
|
||||
if not self._kill_daemon(dynamic_gpus=dynamic_gpus, worker_id=worker_id):
|
||||
return_code = 1
|
||||
return return_code
|
||||
|
||||
# if we do not need to create queues, make sure they are valid
|
||||
# match previous behaviour when we validated queue names before everything else
|
||||
@@ -1771,11 +1793,19 @@ class Worker(ServiceCommandSection):
|
||||
"ERROR! Failed applying git diff, see diff above.".format(diff))
|
||||
|
||||
def _apply_extra_configuration(self):
|
||||
# store a few things we updated in runtime (TODO: we should list theme somewhere)
|
||||
agent_config = self._session.config["agent"].copy()
|
||||
agent_config_keys = ["cuda_version", "cudnn_version", "default_python", "worker_id", "debug"]
|
||||
try:
|
||||
self._session.load_vaults()
|
||||
except Exception as ex:
|
||||
print("Error: failed applying extra configuration: {}".format(ex))
|
||||
|
||||
# merge back
|
||||
for restore_key in agent_config_keys:
|
||||
if restore_key in agent_config:
|
||||
self._session.config["agent"][restore_key] = agent_config[restore_key]
|
||||
|
||||
config = self._session.config
|
||||
default = config.get("agent.apply_environment", False)
|
||||
if ENV_ENABLE_ENV_CONFIG_SECTION.get(default=default):
|
||||
@@ -1828,13 +1858,7 @@ class Worker(ServiceCommandSection):
|
||||
requirements = None
|
||||
|
||||
if not python_version:
|
||||
try:
|
||||
python_version = current_task.script.binary
|
||||
python_version = python_version.split('/')[-1].replace('python', '')
|
||||
# if we can cast it, we are good
|
||||
python_version = '{:.1f}'.format(float(python_version))
|
||||
except:
|
||||
python_version = None
|
||||
python_version = self._get_task_python_version(current_task)
|
||||
|
||||
venv_folder, requirements_manager, is_cached = self.install_virtualenv(
|
||||
venv_dir=target, requested_python_version=python_version, execution_info=execution,
|
||||
@@ -1858,6 +1882,9 @@ class Worker(ServiceCommandSection):
|
||||
base_interpreter=package_api.requirements_manager.get_interpreter(),
|
||||
requirement_substitutions=[OnlyExternalRequirements],
|
||||
)
|
||||
# manually update the current state,
|
||||
# for the external git reference chance (in the replace callback)
|
||||
package_api.requirements_manager.update_installed_packages_state(package_api.freeze())
|
||||
# make sure we run the handlers
|
||||
cached_requirements = \
|
||||
{k: package_api.requirements_manager.replace(requirements[k] or '')
|
||||
@@ -1984,6 +2011,16 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
return
|
||||
|
||||
def _get_task_python_version(self, task):
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
python_ver = task.script.binary
|
||||
python_ver = python_ver.split('/')[-1].replace('python', '')
|
||||
# if we can cast it, we are good
|
||||
return '{:.1f}'.format(float(python_ver))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@resolve_names
|
||||
def execute(
|
||||
self,
|
||||
@@ -2068,7 +2105,7 @@ class Worker(ServiceCommandSection):
|
||||
)
|
||||
try:
|
||||
self.report_monitor(ResourceMonitor.StatusReport(task=current_task.id))
|
||||
self.run_one_task(queue='', task_id=current_task.id, worker_args=worker_params, docker=docker)
|
||||
status = self.run_one_task(queue='', task_id=current_task.id, worker_args=worker_params, docker=docker)
|
||||
finally:
|
||||
self.stop_monitor()
|
||||
self._unregister()
|
||||
@@ -2076,7 +2113,7 @@ class Worker(ServiceCommandSection):
|
||||
if full_monitoring and self.temp_config_path:
|
||||
safe_remove_file(self._session.config_file)
|
||||
Singleton.close_pid_file()
|
||||
return
|
||||
return status if ENV_PROPAGATE_EXITCODE.get() else 0
|
||||
|
||||
self._apply_extra_configuration()
|
||||
|
||||
@@ -2096,85 +2133,157 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
execution = self.get_execution_info(current_task)
|
||||
|
||||
if self._session.config.get("agent.package_manager.force_repo_requirements_txt", False):
|
||||
requirements = None
|
||||
print("[package_manager.force_repo_requirements_txt=true] "
|
||||
"Skipping requirements, using repository \"requirements.txt\" ")
|
||||
else:
|
||||
python_ver = self._get_task_python_version(current_task)
|
||||
|
||||
freeze = None
|
||||
repo_info = None
|
||||
script_dir = ""
|
||||
venv_folder = ""
|
||||
|
||||
custom_build_script = self._session.config.get("agent.custom_build_script", "") or ENV_CUSTOM_BUILD_SCRIPT.get()
|
||||
if custom_build_script:
|
||||
try:
|
||||
requirements = current_task.script.requirements
|
||||
except AttributeError:
|
||||
venv_folder = Path(self._session.config["agent.venvs_dir"], python_ver or "3")
|
||||
venv_folder = Path(os.path.expanduser(os.path.expandvars(venv_folder.as_posix())))
|
||||
directory, vcs, repo_info = self.get_repo_info(
|
||||
execution, current_task, str(venv_folder)
|
||||
)
|
||||
binary, entry_point, working_dir = self.run_custom_build_script(
|
||||
custom_build_script,
|
||||
current_task,
|
||||
execution,
|
||||
venv_folder=venv_folder,
|
||||
git_root=vcs.location,
|
||||
)
|
||||
|
||||
execution.entry_point = str(entry_point)
|
||||
execution.working_dir = str(working_dir)
|
||||
script_dir = str(working_dir)
|
||||
|
||||
self.package_api = VirtualenvPip(
|
||||
session=self._session,
|
||||
interpreter=str(binary),
|
||||
python=str(binary),
|
||||
requirements_manager=RequirementsManager(self._session),
|
||||
execution_info=execution,
|
||||
path=venv_folder,
|
||||
)
|
||||
|
||||
self.global_package_api = SystemPip(
|
||||
session=self._session,
|
||||
interpreter=str(binary),
|
||||
)
|
||||
|
||||
except SkippedCustomBuildScript as ex:
|
||||
print("Warning: {}".format(str(ex)))
|
||||
custom_build_script = None
|
||||
|
||||
if not custom_build_script:
|
||||
if self._session.config.get("agent.package_manager.force_repo_requirements_txt", False):
|
||||
requirements = None
|
||||
print("\n[package_manager.force_repo_requirements_txt=true] "
|
||||
"Skipping requirements, using repository \"requirements.txt\" \n")
|
||||
elif self._session.config.get("agent.package_manager.force_original_requirements", False):
|
||||
try:
|
||||
requirements = current_task.script.requirements
|
||||
if isinstance(requirements, dict):
|
||||
if 'org_pip' in requirements:
|
||||
requirements['pip'] = requirements['org_pip']
|
||||
print("\n[package_manager.force_original_requirements=true] "
|
||||
"Using original requirements: \n{}\n".format(requirements['org_pip']))
|
||||
if 'org_conda' in requirements:
|
||||
requirements['conda'] = requirements['org_conda']
|
||||
print("\n[package_manager.force_original_requirements=true] "
|
||||
"Using original requirements: \n{}\n".format(requirements['org_conda']))
|
||||
except AttributeError:
|
||||
requirements = None
|
||||
else:
|
||||
try:
|
||||
requirements = current_task.script.requirements
|
||||
except AttributeError:
|
||||
requirements = None
|
||||
|
||||
try:
|
||||
python_ver = current_task.script.binary
|
||||
python_ver = python_ver.split('/')[-1].replace('python', '')
|
||||
# if we can cast it, we are good
|
||||
python_ver = '{:.1f}'.format(float(python_ver))
|
||||
except:
|
||||
python_ver = None
|
||||
alternative_code_folder = None
|
||||
if ENV_AGENT_SKIP_PYTHON_ENV_INSTALL.get():
|
||||
venv_folder, requirements_manager, is_cached = None, None, False
|
||||
# we need to create a folder for the code to be dumped into
|
||||
code_folder = self._session.config.get("agent.venvs_dir")
|
||||
code_folder = Path(os.path.expanduser(os.path.expandvars(code_folder)))
|
||||
# let's make sure it is clear from previous runs
|
||||
rm_tree(normalize_path(code_folder, WORKING_REPOSITORY_DIR))
|
||||
rm_tree(normalize_path(code_folder, WORKING_STANDALONE_DIR))
|
||||
if not code_folder.exists():
|
||||
code_folder.mkdir(parents=True, exist_ok=True)
|
||||
alternative_code_folder = code_folder.as_posix()
|
||||
else:
|
||||
venv_folder, requirements_manager, is_cached = self.install_virtualenv(
|
||||
standalone_mode=standalone_mode,
|
||||
requested_python_version=python_ver,
|
||||
execution_info=execution,
|
||||
cached_requirements=requirements,
|
||||
)
|
||||
|
||||
venv_folder, requirements_manager, is_cached = self.install_virtualenv(
|
||||
standalone_mode=standalone_mode,
|
||||
requested_python_version=python_ver,
|
||||
execution_info=execution,
|
||||
cached_requirements=requirements,
|
||||
)
|
||||
if not is_cached and not standalone_mode:
|
||||
if self._default_pip:
|
||||
self.package_api.install_packages(*self._default_pip)
|
||||
|
||||
if not is_cached and not standalone_mode:
|
||||
if self._default_pip:
|
||||
self.package_api.install_packages(*self._default_pip)
|
||||
print("\n")
|
||||
|
||||
# either use the venvs base folder for code or the cwd
|
||||
directory, vcs, repo_info = self.get_repo_info(
|
||||
execution, current_task, str(venv_folder or alternative_code_folder)
|
||||
)
|
||||
|
||||
print("\n")
|
||||
|
||||
directory, vcs, repo_info = self.get_repo_info(
|
||||
execution, current_task, venv_folder
|
||||
)
|
||||
cwd = vcs.location if vcs and vcs.location else directory
|
||||
|
||||
print("\n")
|
||||
if not standalone_mode:
|
||||
if is_cached:
|
||||
# reinstalling git / local packages
|
||||
package_api = copy(self.package_api)
|
||||
OnlyExternalRequirements.cwd = package_api.cwd = cwd
|
||||
package_api.requirements_manager = self._get_requirements_manager(
|
||||
base_interpreter=package_api.requirements_manager.get_interpreter(),
|
||||
requirement_substitutions=[OnlyExternalRequirements]
|
||||
)
|
||||
# manually update the current state,
|
||||
# for the external git reference chance (in the replace callback)
|
||||
package_api.requirements_manager.update_installed_packages_state(package_api.freeze())
|
||||
# make sure we run the handlers
|
||||
cached_requirements = \
|
||||
{k: package_api.requirements_manager.replace(requirements[k] or '')
|
||||
for k in requirements}
|
||||
if str(cached_requirements.get('pip', '')).strip() \
|
||||
or str(cached_requirements.get('conda', '')).strip():
|
||||
package_api.load_requirements(cached_requirements)
|
||||
# make sure we call the correct freeze
|
||||
requirements_manager = package_api.requirements_manager
|
||||
elif requirements_manager:
|
||||
self.install_requirements(
|
||||
execution,
|
||||
repo_info,
|
||||
requirements_manager=requirements_manager,
|
||||
cached_requirements=requirements,
|
||||
cwd=cwd,
|
||||
)
|
||||
elif not self.package_api:
|
||||
# check if we have to manually configure package API, it will be readonly
|
||||
self.package_api = SystemPip(session=self._session)
|
||||
|
||||
cwd = vcs.location if vcs and vcs.location else directory
|
||||
# do not update the task packages if we are using conda,
|
||||
# it will most likely make the task environment unreproducible
|
||||
skip_freeze_update = self.is_conda and not self._session.config.get(
|
||||
"agent.package_manager.conda_full_env_update", False)
|
||||
|
||||
if not standalone_mode:
|
||||
if is_cached:
|
||||
# reinstalling git / local packages
|
||||
package_api = copy(self.package_api)
|
||||
OnlyExternalRequirements.cwd = package_api.cwd = cwd
|
||||
package_api.requirements_manager = self._get_requirements_manager(
|
||||
base_interpreter=package_api.requirements_manager.get_interpreter(),
|
||||
requirement_substitutions=[OnlyExternalRequirements]
|
||||
)
|
||||
# make sure we run the handlers
|
||||
cached_requirements = \
|
||||
{k: package_api.requirements_manager.replace(requirements[k] or '')
|
||||
for k in requirements}
|
||||
if str(cached_requirements.get('pip', '')).strip() \
|
||||
or str(cached_requirements.get('conda', '')).strip():
|
||||
package_api.load_requirements(cached_requirements)
|
||||
# make sure we call the correct freeze
|
||||
requirements_manager = package_api.requirements_manager
|
||||
else:
|
||||
self.install_requirements(
|
||||
execution,
|
||||
repo_info,
|
||||
requirements_manager=requirements_manager,
|
||||
cached_requirements=requirements,
|
||||
cwd=cwd,
|
||||
)
|
||||
|
||||
# do not update the task packages if we are using conda,
|
||||
# it will most likely make the task environment unreproducible
|
||||
skip_freeze_update = self.is_conda and not self._session.config.get(
|
||||
"agent.package_manager.conda_full_env_update", False)
|
||||
|
||||
freeze = self.freeze_task_environment(
|
||||
task_id=current_task.id,
|
||||
requirements_manager=requirements_manager,
|
||||
add_venv_folder_cache=venv_folder,
|
||||
execution_info=execution,
|
||||
update_requirements=not skip_freeze_update,
|
||||
)
|
||||
script_dir = (directory if isinstance(directory, Path) else Path(directory)).absolute().as_posix()
|
||||
freeze = self.freeze_task_environment(
|
||||
task_id=current_task.id,
|
||||
requirements_manager=requirements_manager,
|
||||
add_venv_folder_cache=venv_folder,
|
||||
execution_info=execution,
|
||||
update_requirements=not skip_freeze_update,
|
||||
)
|
||||
script_dir = (directory if isinstance(directory, Path) else Path(directory)).absolute().as_posix()
|
||||
|
||||
# run code
|
||||
# print("Running task id [%s]:" % current_task.id)
|
||||
@@ -2184,7 +2293,9 @@ class Worker(ServiceCommandSection):
|
||||
extra.append(
|
||||
WorkerParams(optimization=optimization).get_optimization_flag()
|
||||
)
|
||||
|
||||
# check if this is a module load, then load it.
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
if current_task.script.binary and current_task.script.binary.startswith('python') and \
|
||||
execution.entry_point and execution.entry_point.split()[0].strip() == '-m':
|
||||
@@ -2192,7 +2303,7 @@ class Worker(ServiceCommandSection):
|
||||
extra.extend(shlex.split(execution.entry_point))
|
||||
else:
|
||||
extra.append(execution.entry_point)
|
||||
except:
|
||||
except Exception:
|
||||
extra.append(execution.entry_point)
|
||||
|
||||
command = self.package_api.get_python_command(extra)
|
||||
@@ -2576,7 +2687,7 @@ class Worker(ServiceCommandSection):
|
||||
python_version=getattr(self.package_api, 'python', ''),
|
||||
cuda_version=self._session.config.get("agent.cuda_version"),
|
||||
source_folder=add_venv_folder_cache,
|
||||
exclude_sub_folders=['task_repository', 'code'])
|
||||
exclude_sub_folders=[WORKING_REPOSITORY_DIR, WORKING_STANDALONE_DIR])
|
||||
|
||||
# If do not update back requirements
|
||||
if not update_requirements:
|
||||
@@ -2703,7 +2814,7 @@ class Worker(ServiceCommandSection):
|
||||
if self._session.debug_mode and temp_file:
|
||||
rm_file(temp_file.name)
|
||||
# call post installation callback
|
||||
requirements_manager.post_install(self._session)
|
||||
requirements_manager.post_install(self._session, package_manager=package_api)
|
||||
# mark as successful installation
|
||||
repo_requirements_installed = True
|
||||
|
||||
@@ -2851,28 +2962,122 @@ class Worker(ServiceCommandSection):
|
||||
)
|
||||
)
|
||||
|
||||
def install_virtualenv(
|
||||
self,
|
||||
venv_dir=None,
|
||||
requested_python_version=None,
|
||||
standalone_mode=False,
|
||||
execution_info=None,
|
||||
cached_requirements=None,
|
||||
):
|
||||
# type: (str, str, bool, ExecutionInfo, dict) -> Tuple[Path, RequirementsManager, bool]
|
||||
def run_custom_build_script(self, script, task, execution, venv_folder, git_root):
|
||||
# type: (str, tasks_api.Task, ExecutionInfo, Path, str)-> Tuple[Path, Path, Path]
|
||||
"""
|
||||
Install a new python virtual environment, removing the old one if exists
|
||||
If CLEARML_SKIP_PIP_VENV_INSTALL is set then an emtpy virtual env folder is created
|
||||
and package manager is configured to work with the global python interpreter (the interpreter
|
||||
path itself can be passed in this variable)
|
||||
:return: virtualenv directory, requirements manager to use with task, True if there is a cached venv entry
|
||||
Run a custom env build script
|
||||
:param script:
|
||||
:return: A tuple containing:
|
||||
- a full path to a python executable
|
||||
- a new task entry_point (replacing the entry_point in the task's script section)
|
||||
- a new working directory (replacing the working_dir in the task's script section)
|
||||
- a requirements manager instance
|
||||
"""
|
||||
skip_pip_venv_install = ENV_AGENT_SKIP_PIP_VENV_INSTALL.get()
|
||||
script = os.path.expanduser(os.path.expandvars(script))
|
||||
|
||||
try:
|
||||
if not os.path.isfile(script):
|
||||
raise SkippedCustomBuildScript("Build script {} is not found".format(script))
|
||||
except OSError as ex:
|
||||
raise SkippedCustomBuildScript(str(ex))
|
||||
|
||||
print("Running custom build script {}".format(script))
|
||||
|
||||
script_output_file = NamedTemporaryFile(prefix="custom_build_script", suffix=".json", mode="wt", delete=False)
|
||||
|
||||
os.environ["CLEARML_AGENT_CUSTOM_BUILD_SCRIPT"] = script
|
||||
os.environ["CLEARML_CUSTOM_BUILD_TASK_CONFIG_JSON"] = json.dumps(
|
||||
task.to_dict(), separators=(',', ':'), default=str
|
||||
)
|
||||
os.environ["CLEARML_CUSTOM_BUILD_OUTPUT"] = script_output_file.name
|
||||
os.environ["CLEARML_TASK_SCRIPT_ENTRY"] = execution.entry_point
|
||||
os.environ["CLEARML_TASK_WORKING_DIR"] = execution.working_dir
|
||||
os.environ["CLEARML_VENV_PATH"] = str(venv_folder)
|
||||
os.environ["CLEARML_GIT_ROOT"] = git_root
|
||||
|
||||
try:
|
||||
subprocess.check_call([script])
|
||||
except subprocess.CalledProcessError as ex:
|
||||
raise CustomBuildScriptFailed(
|
||||
message="Custom build script failed with return code {}".format(ex.returncode),
|
||||
errno=ex.returncode
|
||||
)
|
||||
|
||||
output = Path(script_output_file.name).read_text()
|
||||
if not output:
|
||||
raise SkippedCustomBuildScript("Build script {} is not found".format(script))
|
||||
|
||||
try:
|
||||
output = json.loads(output)
|
||||
binary = Path(output["binary"])
|
||||
entry_point = Path(output["entry_point"])
|
||||
working_dir = Path(output["working_dir"])
|
||||
except ValueError as ex:
|
||||
raise SkippedCustomBuildScript(
|
||||
"Failed parsing build script output JSON ({}): {}".format(script_output_file.name, ex)
|
||||
)
|
||||
except KeyError as ex:
|
||||
raise SkippedCustomBuildScript("Build script output missing {} field".format(ex.args[0]))
|
||||
|
||||
try:
|
||||
if not binary.is_file():
|
||||
raise SkippedCustomBuildScript(
|
||||
"Invalid binary path returned from custom build script: {}".format(binary)
|
||||
)
|
||||
if not entry_point.is_file():
|
||||
raise SkippedCustomBuildScript(
|
||||
"Invalid entrypoint path returned from custom build script: {}".format(entry_point)
|
||||
)
|
||||
if not working_dir.is_dir():
|
||||
raise SkippedCustomBuildScript(
|
||||
"Invalid working dir returned from custom build script: {}".format(working_dir)
|
||||
)
|
||||
except OSError as ex:
|
||||
raise SkippedCustomBuildScript(str(ex))
|
||||
|
||||
return binary, entry_point, working_dir
|
||||
|
||||
def _get_skip_pip_venv_install(self, skip_pip_venv_install=None):
|
||||
if skip_pip_venv_install is None:
|
||||
skip_pip_venv_install = ENV_AGENT_SKIP_PIP_VENV_INSTALL.get()
|
||||
|
||||
if skip_pip_venv_install:
|
||||
try:
|
||||
skip_pip_venv_install = bool(strtobool(skip_pip_venv_install))
|
||||
except ValueError:
|
||||
pass
|
||||
elif ENV_VENV_CONFIGURED.get() and ENV_DOCKER_IMAGE.get() and \
|
||||
self._session.config.get("agent.docker_use_activated_venv", True) and \
|
||||
self._session.config.get("agent.package_manager.system_site_packages", False):
|
||||
# if we are running inside a container, and virtual environment is already installed,
|
||||
# we should install directly into it, because we cannot inherit from the system packages
|
||||
skip_pip_venv_install = find_executable("python") or True
|
||||
|
||||
# check if we are running inside a container:
|
||||
print(
|
||||
"Warning! Found python virtual environment [{}] already activated inside the container, "
|
||||
"installing packages into venv (pip does not support inherit/nested venv)".format(
|
||||
skip_pip_venv_install if isinstance(skip_pip_venv_install, str) else ENV_VENV_CONFIGURED.get())
|
||||
)
|
||||
return skip_pip_venv_install
|
||||
|
||||
def install_virtualenv(
|
||||
self,
|
||||
venv_dir=None,
|
||||
requested_python_version=None,
|
||||
standalone_mode=False,
|
||||
execution_info=None,
|
||||
cached_requirements=None,
|
||||
):
|
||||
# type: (str, str, bool, ExecutionInfo, dict) -> Tuple[Path, RequirementsManager, bool]
|
||||
"""
|
||||
Install a new python virtual environment, removing the old one if exists
|
||||
If skip_pip_venv_install is True or contains a string (or if CLEARML_SKIP_PIP_VENV_INSTALL is set)
|
||||
then an emtpy virtual env folder is created and package manager is configured to work with the global python
|
||||
interpreter (or using a custom interpreter if an interpreter path is passed in this variable)
|
||||
:return: virtualenv directory, requirements manager to use with task, True if there is a cached venv entry
|
||||
"""
|
||||
skip_pip_venv_install = self._get_skip_pip_venv_install()
|
||||
|
||||
if self._session.config.get("agent.ignore_requested_python_version", None):
|
||||
requested_python_version = ''
|
||||
@@ -2929,13 +3134,50 @@ class Worker(ServiceCommandSection):
|
||||
or not self.is_venv_update
|
||||
)
|
||||
|
||||
if not standalone_mode:
|
||||
rm_tree(normalize_path(venv_dir, WORKING_REPOSITORY_DIR))
|
||||
rm_tree(normalize_path(venv_dir, WORKING_STANDALONE_DIR))
|
||||
|
||||
call_package_manager_create, requirements_manager = self._setup_package_api(
|
||||
executable_name=executable_name,
|
||||
executable_version_suffix=executable_version_suffix,
|
||||
venv_dir=venv_dir,
|
||||
execution_info=execution_info,
|
||||
standalone_mode=standalone_mode,
|
||||
skip_pip_venv_install=skip_pip_venv_install,
|
||||
first_time=first_time,
|
||||
)
|
||||
|
||||
# check if we have a cached folder
|
||||
if cached_requirements and not skip_pip_venv_install and self.package_api.get_cached_venv(
|
||||
requirements=cached_requirements,
|
||||
docker_cmd=execution_info.docker_cmd if execution_info else None,
|
||||
python_version=self.package_api.python,
|
||||
cuda_version=self._session.config.get("agent.cuda_version"),
|
||||
destination_folder=Path(venv_dir)
|
||||
):
|
||||
print('::: Using Cached environment {} :::'.format(self.package_api.get_last_used_entry_cache()))
|
||||
return venv_dir, requirements_manager, True
|
||||
|
||||
# create the initial venv
|
||||
if not skip_pip_venv_install:
|
||||
if call_package_manager_create:
|
||||
self.package_api.create()
|
||||
else:
|
||||
if not venv_dir.exists():
|
||||
venv_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
return venv_dir, requirements_manager, False
|
||||
|
||||
def _setup_package_api(
|
||||
self, executable_name, executable_version_suffix, venv_dir, execution_info,
|
||||
standalone_mode, skip_pip_venv_install=False, first_time=False
|
||||
):
|
||||
# type: (str, str, Path, ExecutionInfo, bool, bool, bool) -> Tuple[bool, RequirementsManager]
|
||||
requirements_manager = self._get_requirements_manager(
|
||||
base_interpreter=executable_name
|
||||
)
|
||||
|
||||
if not standalone_mode:
|
||||
rm_tree(normalize_path(venv_dir, WORKING_REPOSITORY_DIR))
|
||||
|
||||
package_manager_params = dict(
|
||||
session=self._session,
|
||||
python=executable_version_suffix if self.is_conda else executable_name,
|
||||
@@ -2950,7 +3192,6 @@ class Worker(ServiceCommandSection):
|
||||
)
|
||||
|
||||
call_package_manager_create = False
|
||||
|
||||
if not self.is_conda:
|
||||
if standalone_mode or skip_pip_venv_install:
|
||||
# pip with standalone mode
|
||||
@@ -2958,7 +3199,14 @@ class Worker(ServiceCommandSection):
|
||||
if standalone_mode:
|
||||
self.package_api = VirtualenvPip(**package_manager_params)
|
||||
else:
|
||||
self.package_api = self.global_package_api
|
||||
if not Path(executable_name).is_file():
|
||||
executable_name_path = find_executable(executable_name)
|
||||
print("Interpreter '{}' found at '{}'".format(executable_name, executable_name_path))
|
||||
executable_name = executable_name_path
|
||||
# we can change it, no one is going to use it anyhow
|
||||
package_manager_params['path'] = None
|
||||
package_manager_params['interpreter'] = executable_name
|
||||
self.package_api = VirtualenvPip(**package_manager_params)
|
||||
else:
|
||||
if self.is_venv_update:
|
||||
self.package_api = VenvUpdateAPI(
|
||||
@@ -2996,26 +3244,7 @@ class Worker(ServiceCommandSection):
|
||||
venv_dir = new_venv_folder
|
||||
self.package_api = get_conda(path=venv_dir)
|
||||
|
||||
# check if we have a cached folder
|
||||
if cached_requirements and not skip_pip_venv_install and self.package_api.get_cached_venv(
|
||||
requirements=cached_requirements,
|
||||
docker_cmd=execution_info.docker_cmd if execution_info else None,
|
||||
python_version=package_manager_params['python'],
|
||||
cuda_version=self._session.config.get("agent.cuda_version"),
|
||||
destination_folder=Path(venv_dir)
|
||||
):
|
||||
print('::: Using Cached environment {} :::'.format(self.package_api.get_last_used_entry_cache()))
|
||||
return venv_dir, requirements_manager, True
|
||||
|
||||
# create the initial venv
|
||||
if not skip_pip_venv_install:
|
||||
if call_package_manager_create:
|
||||
self.package_api.create()
|
||||
else:
|
||||
if not venv_dir.exists():
|
||||
venv_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
return venv_dir, requirements_manager, False
|
||||
return call_package_manager_create, requirements_manager
|
||||
|
||||
def parse_requirements(self, reqs_file=None, overrides=None):
|
||||
os = None
|
||||
@@ -3265,6 +3494,7 @@ class Worker(ServiceCommandSection):
|
||||
worker_tags=None,
|
||||
name=None,
|
||||
mount_ssh=None, mount_apt_cache=None, mount_pip_cache=None, mount_poetry_cache=None,
|
||||
env_task_id=None,
|
||||
):
|
||||
docker = 'docker'
|
||||
|
||||
@@ -3358,6 +3588,9 @@ class Worker(ServiceCommandSection):
|
||||
# update the docker image, so the system knows where it runs
|
||||
base_cmd += ['-e', 'CLEARML_DOCKER_IMAGE={} {}'.format(docker_image, ' '.join(docker_arguments or [])).strip()]
|
||||
|
||||
if env_task_id:
|
||||
base_cmd += ['-e', 'CLEARML_TASK_ID={}'.format(env_task_id), ]
|
||||
|
||||
if auth_token:
|
||||
# if auth token is passed then put it in the env var
|
||||
base_cmd += ['-e', '{}={}'.format(ENV_AGENT_AUTH_TOKEN.vars[0], auth_token)]
|
||||
@@ -3383,7 +3616,7 @@ class Worker(ServiceCommandSection):
|
||||
agent_install_bash_script = []
|
||||
if os.environ.get('FORCE_LOCAL_CLEARML_AGENT_WHEEL'):
|
||||
local_wheel = os.path.expanduser(os.environ.get('FORCE_LOCAL_CLEARML_AGENT_WHEEL'))
|
||||
docker_wheel = str(Path('/tmp') / Path(local_wheel).name)
|
||||
docker_wheel = '/tmp/{}'.format(basename(local_wheel))
|
||||
base_cmd += ['-v', local_wheel + ':' + docker_wheel]
|
||||
clearml_agent_wheel = '\"{}\"'.format(docker_wheel)
|
||||
elif os.environ.get('FORCE_CLEARML_AGENT_REPO'):
|
||||
@@ -3404,11 +3637,11 @@ class Worker(ServiceCommandSection):
|
||||
' libsm6 libxext6 libxrender-dev libglib2.0-0' if install_opencv_libs else ""),
|
||||
"[ ! -z $(which git) ] || export CLEARML_APT_INSTALL=\"$CLEARML_APT_INSTALL git\"",
|
||||
"declare LOCAL_PYTHON",
|
||||
"for i in {{10..5}}; do which {python_single_digit}.$i && " +
|
||||
"[ ! -z $LOCAL_PYTHON ] || for i in {{15..5}}; do which {python_single_digit}.$i && " +
|
||||
"{python_single_digit}.$i -m pip --version && " +
|
||||
"export LOCAL_PYTHON=$(which {python_single_digit}.$i) && break ; done",
|
||||
"[ ! -z $LOCAL_PYTHON ] || export CLEARML_APT_INSTALL=\"$CLEARML_APT_INSTALL {python_single_digit}-pip\"", # noqa
|
||||
"[ -z \"$CLEARML_APT_INSTALL\" ] || (apt-get update && apt-get install -y $CLEARML_APT_INSTALL)",
|
||||
"[ -z \"$CLEARML_APT_INSTALL\" ] || (apt-get update -y ; apt-get install -y $CLEARML_APT_INSTALL)",
|
||||
]
|
||||
|
||||
if preprocess_bash_script:
|
||||
@@ -3437,7 +3670,7 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
if docker_bash_setup_script and docker_bash_setup_script.strip('\n '):
|
||||
extra_shell_script = (extra_shell_script or '') + \
|
||||
' ; '.join(line.strip().replace('\"', '\\\"')
|
||||
' ; '.join(line.strip()
|
||||
for line in docker_bash_setup_script.split('\n') if line.strip()) + \
|
||||
' ; '
|
||||
|
||||
@@ -3549,8 +3782,11 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
return command, script_dir
|
||||
|
||||
def _kill_daemon(self, dynamic_gpus=False):
|
||||
worker_id, worker_name = self._generate_worker_id_name(dynamic_gpus=dynamic_gpus)
|
||||
def _kill_daemon(self, dynamic_gpus=False, worker_id=None):
|
||||
if not worker_id:
|
||||
worker_id, worker_name = self._generate_worker_id_name(dynamic_gpus=dynamic_gpus)
|
||||
else:
|
||||
worker_name = worker_id
|
||||
|
||||
# Iterate over all running process
|
||||
for pid, uid, slot, file in sorted(Singleton.get_running_pids(), key=lambda x: x[1] or ''):
|
||||
|
||||
@@ -126,6 +126,7 @@ DEFAULT_VENV_UPDATE_URL = (
|
||||
"https://raw.githubusercontent.com/Yelp/venv-update/v3.2.4/venv_update.py"
|
||||
)
|
||||
WORKING_REPOSITORY_DIR = "task_repository"
|
||||
WORKING_STANDALONE_DIR = "code"
|
||||
DEFAULT_VCS_CACHE = normalize_path(CONFIG_DIR, "vcs-cache")
|
||||
PIP_EXTRA_INDICES = [
|
||||
]
|
||||
@@ -134,6 +135,7 @@ ENV_DOCKER_IMAGE = EnvironmentConfig('CLEARML_DOCKER_IMAGE', 'TRAINS_DOCKER_IMAG
|
||||
ENV_WORKER_ID = EnvironmentConfig('CLEARML_WORKER_ID', 'TRAINS_WORKER_ID')
|
||||
ENV_WORKER_TAGS = EnvironmentConfig('CLEARML_WORKER_TAGS')
|
||||
ENV_AGENT_SKIP_PIP_VENV_INSTALL = EnvironmentConfig('CLEARML_AGENT_SKIP_PIP_VENV_INSTALL')
|
||||
ENV_AGENT_SKIP_PYTHON_ENV_INSTALL = EnvironmentConfig('CLEARML_AGENT_SKIP_PYTHON_ENV_INSTALL', type=bool)
|
||||
ENV_DOCKER_SKIP_GPUS_FLAG = EnvironmentConfig('CLEARML_DOCKER_SKIP_GPUS_FLAG', 'TRAINS_DOCKER_SKIP_GPUS_FLAG')
|
||||
ENV_AGENT_GIT_USER = EnvironmentConfig('CLEARML_AGENT_GIT_USER', 'TRAINS_AGENT_GIT_USER')
|
||||
ENV_AGENT_GIT_PASS = EnvironmentConfig('CLEARML_AGENT_GIT_PASS', 'TRAINS_AGENT_GIT_PASS')
|
||||
@@ -147,6 +149,38 @@ ENV_DOCKER_HOST_MOUNT = EnvironmentConfig('CLEARML_AGENT_K8S_HOST_MOUNT', 'CLEAR
|
||||
ENV_VENV_CACHE_PATH = EnvironmentConfig('CLEARML_AGENT_VENV_CACHE_PATH')
|
||||
ENV_EXTRA_DOCKER_ARGS = EnvironmentConfig('CLEARML_AGENT_EXTRA_DOCKER_ARGS', type=list)
|
||||
|
||||
ENV_CUSTOM_BUILD_SCRIPT = EnvironmentConfig('CLEARML_AGENT_CUSTOM_BUILD_SCRIPT')
|
||||
"""
|
||||
Specifies a custom environment setup script to be executed instead of installing a virtual environment.
|
||||
If provided, this script is executed following Git cloning. Script command may include environment variable and
|
||||
will be expanded before execution (e.g. "$CLEARML_GIT_ROOT/script.sh").
|
||||
The script can also be specified using the `agent.custom_build_script` configuration setting.
|
||||
|
||||
When running the script, the following environment variables will be set:
|
||||
- CLEARML_CUSTOM_BUILD_TASK_CONFIG_JSON: specifies a path to a temporary files containing the complete task
|
||||
contents in JSON format
|
||||
- CLEARML_TASK_SCRIPT_ENTRY: task entrypoint script as defined in the task's script section
|
||||
- CLEARML_TASK_WORKING_DIR: task working directory as defined in the task's script section
|
||||
- CLEARML_VENV_PATH: path to the agent's default virtual environment path (as defined in the configuration)
|
||||
- CLEARML_GIT_ROOT: path to the cloned Git repository
|
||||
- CLEARML_CUSTOM_BUILD_OUTPUT: a path to a non-existing file that may be created by the script. If created,
|
||||
this file must be in the following JSON format:
|
||||
```json
|
||||
{
|
||||
"binary": "/absolute/path/to/python-executable",
|
||||
"entry_point": "/absolute/path/to/task-entrypoint-script",
|
||||
"working_dir": "/absolute/path/to/task-working/dir"
|
||||
}
|
||||
```
|
||||
If provided, the agent will use these instead of the predefined task script section to execute the task and will
|
||||
skip virtual environment creation.
|
||||
|
||||
In case the custom script returns with a non-zero exit code, the agent will fail with the same exit code.
|
||||
In case the custom script is specified but does not exist, or if the custom script does not write valid content
|
||||
into the file specified in CLEARML_CUSTOM_BUILD_OUTPUT, the agent will emit a warning and continue with the
|
||||
standard flow.
|
||||
"""
|
||||
|
||||
|
||||
class FileBuffering(IntEnum):
|
||||
"""
|
||||
|
||||
@@ -84,3 +84,13 @@ class MissingPackageError(CommandFailedError):
|
||||
def __str__(self):
|
||||
return '{self.__class__.__name__}: ' \
|
||||
'"{self.name}" package is required. Please run "pip install {self.name}"'.format(self=self)
|
||||
|
||||
|
||||
class CustomBuildScriptFailed(CommandFailedError):
|
||||
def __init__(self, errno, *args, **kwargs):
|
||||
super(CustomBuildScriptFailed, self).__init__(*args, **kwargs)
|
||||
self.errno = errno
|
||||
|
||||
|
||||
class SkippedCustomBuildScript(CommandFailedError):
|
||||
pass
|
||||
|
||||
@@ -69,7 +69,7 @@ class K8sIntegration(Worker):
|
||||
"apt-get update",
|
||||
"apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0",
|
||||
"declare LOCAL_PYTHON",
|
||||
"for i in {{10..5}}; do which python3.$i && python3.$i -m pip --version && "
|
||||
"[ ! -z $LOCAL_PYTHON ] || for i in {{15..5}}; do which python3.$i && python3.$i -m pip --version && "
|
||||
"export LOCAL_PYTHON=$(which python3.$i) && break ; done",
|
||||
"[ ! -z $LOCAL_PYTHON ] || apt-get install -y python3-pip",
|
||||
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3",
|
||||
|
||||
@@ -506,6 +506,38 @@ def is_conda(config):
|
||||
return config['agent.package_manager.type'].lower() == 'conda'
|
||||
|
||||
|
||||
def convert_cuda_version_to_float_single_digit_str(cuda_version):
|
||||
"""
|
||||
Convert a cuda_version (string/float/int) into a float representation, e.g. 11.4
|
||||
Notice returns String Single digit only!
|
||||
:return str:
|
||||
"""
|
||||
cuda_version = str(cuda_version or 0)
|
||||
# if we have patch version we parse it here
|
||||
cuda_version_parts = [int(v) for v in cuda_version.split('.')]
|
||||
if len(cuda_version_parts) > 1 or cuda_version_parts[0] < 60:
|
||||
cuda_version = 10 * cuda_version_parts[0]
|
||||
if len(cuda_version_parts) > 1:
|
||||
cuda_version += float(".{:d}".format(cuda_version_parts[1]))*10
|
||||
|
||||
cuda_version_full = "{:.1f}".format(float(cuda_version) / 10.)
|
||||
else:
|
||||
cuda_version = cuda_version_parts[0]
|
||||
cuda_version_full = "{:.1f}".format(float(cuda_version) / 10.)
|
||||
|
||||
return cuda_version_full
|
||||
|
||||
|
||||
def convert_cuda_version_to_int_10_base_str(cuda_version):
|
||||
"""
|
||||
Convert a cuda_version (string/float/int) into an integer version, e.g. 112 for cuda 11.2
|
||||
Return string
|
||||
:return str:
|
||||
"""
|
||||
cuda_version = convert_cuda_version_to_float_single_digit_str(cuda_version)
|
||||
return str(int(float(cuda_version)*10))
|
||||
|
||||
|
||||
class NonStrictAttrs(object):
|
||||
|
||||
@classmethod
|
||||
|
||||
@@ -19,7 +19,9 @@ from clearml_agent.external.requirements_parser import parse
|
||||
from clearml_agent.external.requirements_parser.requirement import Requirement
|
||||
|
||||
from clearml_agent.errors import CommandFailedError
|
||||
from clearml_agent.helper.base import rm_tree, NonStrictAttrs, select_for_platform, is_windows_platform, ExecutionInfo
|
||||
from clearml_agent.helper.base import (
|
||||
rm_tree, NonStrictAttrs, select_for_platform, is_windows_platform, ExecutionInfo,
|
||||
convert_cuda_version_to_float_single_digit_str, convert_cuda_version_to_int_10_base_str, )
|
||||
from clearml_agent.helper.process import Argv, Executable, DEVNULL, CommandSequence, PathLike
|
||||
from clearml_agent.helper.package.requirements import SimpleVersion
|
||||
from clearml_agent.session import Session
|
||||
@@ -167,7 +169,7 @@ class CondaAPI(PackageManager):
|
||||
raise ValueError("Could not restore Conda environment, cannot find {}".format(
|
||||
self.conda_pre_build_env_path))
|
||||
|
||||
output = Argv(
|
||||
command = Argv(
|
||||
self.conda,
|
||||
"create",
|
||||
"--yes",
|
||||
@@ -175,7 +177,9 @@ class CondaAPI(PackageManager):
|
||||
"--prefix",
|
||||
self.path,
|
||||
"python={}".format(self.python),
|
||||
).get_output(stderr=DEVNULL)
|
||||
)
|
||||
print('Executing Conda: {}'.format(command.serialize()))
|
||||
output = command.get_output(stderr=DEVNULL)
|
||||
match = re.search(
|
||||
r"\W*(.*activate) ({})".format(re.escape(str(self.path))), output
|
||||
)
|
||||
@@ -420,7 +424,7 @@ class CondaAPI(PackageManager):
|
||||
finally:
|
||||
PackageManager._selected_manager = self
|
||||
|
||||
self.requirements_manager.post_install(self.session)
|
||||
self.requirements_manager.post_install(self.session, package_manager=self)
|
||||
|
||||
def load_requirements(self, requirements):
|
||||
# if we are in read only mode, do not uninstall anything
|
||||
@@ -457,16 +461,8 @@ class CondaAPI(PackageManager):
|
||||
if not cuda_version:
|
||||
cuda_version = 0
|
||||
else:
|
||||
cuda_version_full = str(cuda_version)
|
||||
# if we have patch version we parse it here
|
||||
cuda_version_parts = [int(v) for v in cuda_version.split('.')]
|
||||
if len(cuda_version_parts) > 1 or cuda_version_parts[0] < 60:
|
||||
cuda_version = 10*cuda_version_parts[0]
|
||||
if len(cuda_version_parts) > 1:
|
||||
cuda_version += cuda_version_parts[1]
|
||||
else:
|
||||
cuda_version = cuda_version_parts[0]
|
||||
cuda_version_full = "{:.1f}".format(float(cuda_version)/10.)
|
||||
cuda_version_full = convert_cuda_version_to_float_single_digit_str(cuda_version)
|
||||
cuda_version = int(convert_cuda_version_to_int_10_base_str(cuda_version))
|
||||
except Exception:
|
||||
cuda_version = 0
|
||||
|
||||
@@ -646,7 +642,7 @@ class CondaAPI(PackageManager):
|
||||
finally:
|
||||
PackageManager._selected_manager = self
|
||||
|
||||
self.requirements_manager.post_install(self.session)
|
||||
self.requirements_manager.post_install(self.session, package_manager=self)
|
||||
return True
|
||||
|
||||
def _parse_conda_result_bad_packges(self, result_dict):
|
||||
|
||||
@@ -46,11 +46,10 @@ class ExternalRequirements(SimpleSubstitution):
|
||||
post_install_req = self.post_install_req
|
||||
self.post_install_req = []
|
||||
for req in post_install_req:
|
||||
try:
|
||||
freeze_base = PackageManager.out_of_scope_freeze() or ''
|
||||
except:
|
||||
freeze_base = ''
|
||||
|
||||
if self.is_already_installed(req):
|
||||
print("No need to reinstall \'{}\' from VCS, "
|
||||
"the exact same version is already installed".format(req.name))
|
||||
continue
|
||||
req_line = self._add_vcs_credentials(req, session)
|
||||
|
||||
# if we have older pip version we have to make sure we replace back the package name with the
|
||||
@@ -175,5 +174,11 @@ class OnlyExternalRequirements(ExternalRequirements):
|
||||
# Do not store the skipped requirements
|
||||
# mark skip package
|
||||
if super(OnlyExternalRequirements, self).match(req):
|
||||
if self.is_already_installed(req):
|
||||
print("No need to reinstall \'{}\' from VCS, "
|
||||
"the exact same version is already installed".format(req.name))
|
||||
return Text('')
|
||||
|
||||
return self._add_vcs_credentials(req, self._session)
|
||||
|
||||
return Text('')
|
||||
|
||||
@@ -12,7 +12,7 @@ from ..requirements import RequirementsManager
|
||||
|
||||
class VirtualenvPip(SystemPip, PackageManager):
|
||||
def __init__(self, session, python, requirements_manager, path, interpreter=None, execution_info=None, **kwargs):
|
||||
# type: (Session, float, RequirementsManager, PathLike, PathLike, ExecutionInfo, Any) -> ()
|
||||
# type: (Session, str, RequirementsManager, PathLike, PathLike, ExecutionInfo, Any) -> ()
|
||||
"""
|
||||
Program interface to virtualenv pip.
|
||||
Must be given either path to virtualenv or source command.
|
||||
@@ -39,7 +39,7 @@ class VirtualenvPip(SystemPip, PackageManager):
|
||||
if isinstance(requirements, dict) and requirements.get("pip"):
|
||||
requirements["pip"] = self.requirements_manager.replace(requirements["pip"])
|
||||
super(VirtualenvPip, self).load_requirements(requirements)
|
||||
self.requirements_manager.post_install(self.session)
|
||||
self.requirements_manager.post_install(self.session, package_manager=self)
|
||||
|
||||
def create_flags(self):
|
||||
"""
|
||||
|
||||
@@ -2,6 +2,7 @@ from __future__ import unicode_literals
|
||||
|
||||
import re
|
||||
import sys
|
||||
import platform
|
||||
from furl import furl
|
||||
import urllib.parse
|
||||
from operator import itemgetter
|
||||
@@ -174,36 +175,42 @@ class PytorchRequirement(SimpleSubstitution):
|
||||
self.log = self._session.get_logger(__name__)
|
||||
self.package_manager = self.config["agent.package_manager.type"].lower()
|
||||
self.os = os_name or self.get_platform()
|
||||
self.cuda = "cuda{}".format(self.cuda_version).lower()
|
||||
self.python_version_string = str(self.config["agent.default_python"])
|
||||
self.python_major_minor_str = '.'.join(self.python_version_string.split('.')[:2])
|
||||
if '.' not in self.python_major_minor_str:
|
||||
raise PytorchResolutionError(
|
||||
"invalid python version {!r} defined in configuration file, key 'agent.default_python': "
|
||||
"must have both major and minor parts of the version (for example: '3.7')".format(
|
||||
self.python_version_string
|
||||
)
|
||||
)
|
||||
self.python = "python{}".format(self.python_major_minor_str)
|
||||
|
||||
self.exceptions = [
|
||||
PytorchResolutionError(message)
|
||||
for message in (
|
||||
None,
|
||||
'cuda version "{}" is not supported'.format(self.cuda),
|
||||
'python version "{}" is not supported'.format(
|
||||
self.python_version_string
|
||||
),
|
||||
)
|
||||
]
|
||||
|
||||
try:
|
||||
self.validate_python_version()
|
||||
except PytorchResolutionError as e:
|
||||
self.log.warn("will not be able to install pytorch wheels: %s", e.args[0])
|
||||
|
||||
self.cuda = None
|
||||
self.python_version_string = None
|
||||
self.python_major_minor_str = None
|
||||
self.python = None
|
||||
self.exceptions = []
|
||||
self._original_req = []
|
||||
|
||||
def _init_python_ver_cuda_ver(self):
|
||||
if self.cuda is None:
|
||||
self.cuda = "cuda{}".format(self.cuda_version).lower()
|
||||
if self.python_version_string is None:
|
||||
self.python_version_string = str(self.config["agent.default_python"])
|
||||
if self.python_major_minor_str is None:
|
||||
self.python_major_minor_str = '.'.join(self.python_version_string.split('.')[:2])
|
||||
if '.' not in self.python_major_minor_str:
|
||||
raise PytorchResolutionError(
|
||||
"invalid python version {!r} defined in configuration file, key 'agent.default_python': "
|
||||
"must have both major and minor parts of the version (for example: '3.7')".format(
|
||||
self.python_version_string
|
||||
)
|
||||
)
|
||||
if self.python is None:
|
||||
self.python = "python{}".format(self.python_major_minor_str)
|
||||
|
||||
if not self.exceptions:
|
||||
self.exceptions = [
|
||||
PytorchResolutionError(message)
|
||||
for message in (
|
||||
None,
|
||||
'cuda version "{}" is not supported'.format(self.cuda),
|
||||
'python version "{}" is not supported'.format(
|
||||
self.python_version_string
|
||||
),
|
||||
)
|
||||
]
|
||||
|
||||
@property
|
||||
def is_conda(self):
|
||||
return self.package_manager == "conda"
|
||||
@@ -216,6 +223,8 @@ class PytorchRequirement(SimpleSubstitution):
|
||||
"""
|
||||
Make sure python version has both major and minor versions as required for choosing pytorch wheel
|
||||
"""
|
||||
self._init_python_ver_cuda_ver()
|
||||
|
||||
if self.is_pip and not self.python_major_minor_str:
|
||||
raise PytorchResolutionError(
|
||||
"invalid python version {!r} defined in configuration file, key 'agent.default_python': "
|
||||
@@ -237,10 +246,15 @@ class PytorchRequirement(SimpleSubstitution):
|
||||
return "macos"
|
||||
raise RuntimeError("unrecognized OS")
|
||||
|
||||
@staticmethod
|
||||
def get_arch():
|
||||
return str(platform.machine()).lower()
|
||||
|
||||
def _get_link_from_torch_page(self, req, torch_url):
|
||||
links_parser = LinksHTMLParser()
|
||||
links_parser.feed(requests.get(torch_url, timeout=10).text)
|
||||
platform_wheel = "win" if self.get_platform() == "windows" else self.get_platform()
|
||||
arch_wheel = self.get_arch()
|
||||
py_ver = self.python_major_minor_str.replace('.', '')
|
||||
url = None
|
||||
last_v = None
|
||||
@@ -261,8 +275,11 @@ class PytorchRequirement(SimpleSubstitution):
|
||||
continue
|
||||
if len(parts) < 3 or not parts[2].endswith(py_ver):
|
||||
continue
|
||||
if len(parts) < 5 or platform_wheel not in parts[4]:
|
||||
if len(parts) < 5 or platform_wheel not in parts[4].lower():
|
||||
continue
|
||||
if len(parts) < 5 or arch_wheel not in parts[4].lower():
|
||||
continue
|
||||
|
||||
# yes this is for linux python 2.7 support, this is the only python 2.7 we support...
|
||||
if py_ver and py_ver[0] == '2' and len(parts) > 3 and not parts[3].endswith('u'):
|
||||
continue
|
||||
@@ -294,6 +311,7 @@ class PytorchRequirement(SimpleSubstitution):
|
||||
|
||||
def get_url_for_platform(self, req):
|
||||
# check if package is already installed with system packages
|
||||
self.validate_python_version()
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
if self.config.get("agent.package_manager.system_site_packages", None):
|
||||
|
||||
@@ -16,7 +16,9 @@ from pyhocon import ConfigTree
|
||||
import six
|
||||
import logging
|
||||
from clearml_agent.definitions import PIP_EXTRA_INDICES
|
||||
from clearml_agent.helper.base import warning, is_conda, which, join_lines, is_windows_platform
|
||||
from clearml_agent.helper.base import (
|
||||
warning, is_conda, which, join_lines, is_windows_platform,
|
||||
convert_cuda_version_to_int_10_base_str, )
|
||||
from clearml_agent.helper.process import Argv, PathLike
|
||||
from clearml_agent.helper.gpu.gpustat import get_driver_cuda_version
|
||||
from clearml_agent.session import Session, normalize_cuda_version
|
||||
@@ -177,7 +179,7 @@ class MarkerRequirement(object):
|
||||
if self.remove_local_file_ref():
|
||||
# print warning
|
||||
logging.getLogger(__name__).warning(
|
||||
'Local file not found [{}], references removed !'.format(line))
|
||||
'Local file not found [{}], references removed'.format(line))
|
||||
|
||||
|
||||
class SimpleVersion:
|
||||
@@ -435,6 +437,7 @@ class RequirementSubstitution(object):
|
||||
self.config = session.config # type: ConfigTree
|
||||
self.suffix = '.post{config[agent.cuda_version]}.dev{config[agent.cudnn_version]}'.format(config=self.config)
|
||||
self.package_manager = self.config['agent.package_manager.type']
|
||||
self._is_already_installed_cb = None
|
||||
|
||||
@abstractmethod
|
||||
def match(self, req): # type: (MarkerRequirement) -> bool
|
||||
@@ -450,6 +453,20 @@ class RequirementSubstitution(object):
|
||||
"""
|
||||
pass
|
||||
|
||||
def set_is_already_installed_cb(self, cb):
|
||||
self._is_already_installed_cb = cb
|
||||
|
||||
def is_already_installed(self, req):
|
||||
if not self._is_already_installed_cb:
|
||||
return False
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
return self._is_already_installed_cb(req)
|
||||
except BaseException as ex:
|
||||
# debug could not resolve something
|
||||
print("Warning: Requirements post install callback exception (check if package installed): {}".format(ex))
|
||||
return False
|
||||
|
||||
def post_scan_add_req(self): # type: () -> Optional[MarkerRequirement]
|
||||
"""
|
||||
Allows the RequirementSubstitution to add an extra line/requirements after
|
||||
@@ -474,7 +491,7 @@ class RequirementSubstitution(object):
|
||||
|
||||
@property
|
||||
def cuda_version(self):
|
||||
return self.config['agent.cuda_version']
|
||||
return convert_cuda_version_to_int_10_base_str(self.config['agent.cuda_version'])
|
||||
|
||||
@property
|
||||
def cudnn_version(self):
|
||||
@@ -560,6 +577,7 @@ class RequirementsManager(object):
|
||||
cache_dir=pip_cache_dir.as_posix())
|
||||
self._base_interpreter = base_interpreter
|
||||
self._cwd = None
|
||||
self._installed_parsed_packages = set()
|
||||
|
||||
def register(self, cls): # type: (Type[RequirementSubstitution]) -> None
|
||||
self.handlers.append(cls(self._session))
|
||||
@@ -617,7 +635,9 @@ class RequirementsManager(object):
|
||||
|
||||
return join_lines(result)
|
||||
|
||||
def post_install(self, session):
|
||||
def post_install(self, session, package_manager=None):
|
||||
if package_manager:
|
||||
self.update_installed_packages_state(package_manager.freeze())
|
||||
for h in self.handlers:
|
||||
try:
|
||||
h.post_install(session)
|
||||
@@ -639,6 +659,34 @@ class RequirementsManager(object):
|
||||
def get_interpreter(self):
|
||||
return self._base_interpreter
|
||||
|
||||
def update_installed_packages_state(self, requirements):
|
||||
"""
|
||||
Updates internal Installed Packages objects, so that later we can detect
|
||||
if we already have a pre-installed package
|
||||
:param requirements: is the output of a freeze() call, i.e. dict {'pip': "package==version"}
|
||||
"""
|
||||
requirements = requirements if not isinstance(requirements, dict) else requirements.get("pip")
|
||||
self._installed_parsed_packages = self.parse_requirements_section_to_marker_requirements(
|
||||
requirements=requirements, cwd=self._cwd)
|
||||
for h in self.handlers:
|
||||
h.set_is_already_installed_cb(self._callback_is_already_installed)
|
||||
|
||||
def _callback_is_already_installed(self, req):
|
||||
for p in (self._installed_parsed_packages or []):
|
||||
if p.name != req.name:
|
||||
continue
|
||||
# if this is version control package, only return true of both installed and requests specify commit ID
|
||||
if req.vcs:
|
||||
return p.vcs and req.revision and req.revision == p.revision
|
||||
|
||||
if not req.specs and not p.specs:
|
||||
return True
|
||||
|
||||
# return if this is the same version
|
||||
return req.specs and p.specs and req.compare_version(p, op="==")
|
||||
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def get_cuda_version(config): # type: (ConfigTree) -> (Text, Text)
|
||||
# we assume os.environ already updated the config['agent.cuda_version'] & config['agent.cudnn_version']
|
||||
|
||||
@@ -99,8 +99,10 @@ DAEMON_ARGS = dict({
|
||||
'aliases': ['-d'],
|
||||
},
|
||||
'--stop': {
|
||||
'help': 'Stop the running agent (based on the same set of arguments)',
|
||||
'action': 'store_true',
|
||||
'help': 'Stop the running agent (based on the same set of arguments). '
|
||||
'Optional: provide a list of specific local worker IDs to stop',
|
||||
'nargs': '*',
|
||||
'default': False,
|
||||
},
|
||||
'--dynamic-gpus': {
|
||||
'help': 'Allow to dynamically allocate gpus based on queue properties, '
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = '1.2.0rc2'
|
||||
__version__ = '1.2.2'
|
||||
|
||||
75
docker/k8s-glue/glue-build/Dockerfile.alpine
Normal file
75
docker/k8s-glue/glue-build/Dockerfile.alpine
Normal file
@@ -0,0 +1,75 @@
|
||||
ARG TAG=3.7.12-alpine3.15
|
||||
|
||||
FROM python:${TAG} as build
|
||||
|
||||
RUN apk add --no-cache \
|
||||
gcc \
|
||||
musl-dev \
|
||||
libffi-dev
|
||||
|
||||
RUN python3 \
|
||||
-m pip \
|
||||
install \
|
||||
--prefix=/install \
|
||||
--no-cache-dir \
|
||||
-U \
|
||||
clearml-agent \
|
||||
cryptography>=2.9
|
||||
|
||||
FROM python:${TAG} as target
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
ARG KUBECTL_VERSION=1.22.4
|
||||
|
||||
# Not sure about these ENV vars
|
||||
# ENV LC_ALL=en_US.UTF-8
|
||||
# ENV LANG=en_US.UTF-8
|
||||
# ENV LANGUAGE=en_US.UTF-8
|
||||
# ENV PYTHONIOENCODING=UTF-8
|
||||
|
||||
COPY --from=build /install /usr/local
|
||||
|
||||
ADD https://storage.googleapis.com/kubernetes-release/release/v${KUBECTL_VERSION}/bin/linux/amd64/kubectl /usr/bin/
|
||||
|
||||
RUN chmod +x /usr/bin/kubectl
|
||||
|
||||
RUN apk add --no-cache \
|
||||
bash
|
||||
|
||||
COPY k8s_glue_example.py .
|
||||
|
||||
# AWS CLI
|
||||
# https://github.com/kyleknap/aws-cli/blob/source-proposal/proposals/source-install.md#alpine-linux
|
||||
# https://github.com/aws/aws-cli/issues/4685
|
||||
# https://github.com/aws/aws-cli/pull/6352
|
||||
|
||||
# https://github.com/GoogleCloudPlatform/cloud-sdk-docker/blob/master/alpine/Dockerfile
|
||||
|
||||
FROM target as gcp
|
||||
|
||||
ARG CLOUD_SDK_VERSION=371.0.0
|
||||
ENV CLOUD_SDK_VERSION=$CLOUD_SDK_VERSION
|
||||
ENV PATH /google-cloud-sdk/bin:$PATH
|
||||
|
||||
WORKDIR /
|
||||
|
||||
RUN apk --no-cache add \
|
||||
curl \
|
||||
python3 \
|
||||
py3-crcmod \
|
||||
py3-openssl \
|
||||
bash \
|
||||
libc6-compat \
|
||||
openssh-client \
|
||||
git \
|
||||
gnupg \
|
||||
&& curl -O https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-${CLOUD_SDK_VERSION}-linux-x86_64.tar.gz && \
|
||||
tar xzf google-cloud-sdk-${CLOUD_SDK_VERSION}-linux-x86_64.tar.gz && \
|
||||
rm google-cloud-sdk-${CLOUD_SDK_VERSION}-linux-x86_64.tar.gz && \
|
||||
gcloud config set core/disable_usage_reporting true && \
|
||||
gcloud config set component_manager/disable_update_check true && \
|
||||
gcloud config set metrics/environment github_docker_image && \
|
||||
gcloud --version
|
||||
|
||||
WORKDIR /app
|
||||
82
docker/k8s-glue/glue-build/Dockerfile.bullseye
Normal file
82
docker/k8s-glue/glue-build/Dockerfile.bullseye
Normal file
@@ -0,0 +1,82 @@
|
||||
ARG TAG=3.7.12-slim-bullseye
|
||||
|
||||
FROM python:${TAG} as target
|
||||
|
||||
ARG KUBECTL_VERSION=1.22.4
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
RUN python3 \
|
||||
-m pip \
|
||||
install \
|
||||
--no-cache-dir \
|
||||
-U \
|
||||
clearml-agent \
|
||||
cryptography>=2.9
|
||||
|
||||
# Not sure about these ENV vars
|
||||
# ENV LC_ALL=en_US.UTF-8
|
||||
# ENV LANG=en_US.UTF-8
|
||||
# ENV LANGUAGE=en_US.UTF-8
|
||||
# ENV PYTHONIOENCODING=UTF-8
|
||||
|
||||
ADD https://storage.googleapis.com/kubernetes-release/release/v${KUBECTL_VERSION}/bin/linux/amd64/kubectl /usr/bin/
|
||||
|
||||
RUN chmod +x /usr/bin/kubectl
|
||||
|
||||
COPY k8s_glue_example.py .
|
||||
|
||||
CMD ["python3", "k8s_glue_example.py"]
|
||||
|
||||
FROM target as aws
|
||||
|
||||
# https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html
|
||||
# https://docs.aws.amazon.com/eks/latest/userguide/install-aws-iam-authenticator.html
|
||||
|
||||
RUN apt-get update -qqy && \
|
||||
apt-get install -qqy \
|
||||
unzip && \
|
||||
rm -rf /var/lib/apt/lists/*
|
||||
|
||||
ADD https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip awscliv2.zip
|
||||
ADD https://amazon-eks.s3.us-west-2.amazonaws.com/1.21.2/2021-07-05/bin/linux/amd64/aws-iam-authenticator /usr/local/bin/aws-iam-authenticator
|
||||
|
||||
RUN unzip awscliv2.zip && \
|
||||
./aws/install && \
|
||||
rm -r awscliv2.zip aws/ && \
|
||||
chmod +x /usr/local/bin/aws-iam-authenticator && \
|
||||
aws --version && \
|
||||
aws-iam-authenticator version
|
||||
|
||||
# https://github.com/GoogleCloudPlatform/cloud-sdk-docker/blob/master/debian_slim/Dockerfile
|
||||
|
||||
FROM target as gcp
|
||||
|
||||
ARG CLOUD_SDK_VERSION=371.0.0
|
||||
ENV CLOUD_SDK_VERSION=$CLOUD_SDK_VERSION
|
||||
|
||||
ENV PATH "$PATH:/opt/google-cloud-sdk/bin/"
|
||||
|
||||
ARG INSTALL_COMPONENTS
|
||||
RUN mkdir -p /usr/share/man/man1/
|
||||
RUN apt-get update -qqy && \
|
||||
apt-get install -qqy \
|
||||
curl \
|
||||
gcc \
|
||||
python3-dev \
|
||||
python3-pip \
|
||||
apt-transport-https \
|
||||
lsb-release \
|
||||
openssh-client \
|
||||
git \
|
||||
gnupg && \
|
||||
rm -rf /var/lib/apt/lists/* && \
|
||||
pip3 install -U crcmod && \
|
||||
export CLOUD_SDK_REPO="cloud-sdk-$(lsb_release -c -s)" && \
|
||||
echo "deb https://packages.cloud.google.com/apt $CLOUD_SDK_REPO main" > /etc/apt/sources.list.d/google-cloud-sdk.list && \
|
||||
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - && \
|
||||
apt-get update && apt-get install -y google-cloud-sdk=${CLOUD_SDK_VERSION}-0 $INSTALL_COMPONENTS && \
|
||||
gcloud config set core/disable_usage_reporting true && \
|
||||
gcloud config set component_manager/disable_update_check true && \
|
||||
gcloud config set metrics/environment github_docker_image && \
|
||||
gcloud --version
|
||||
94
docker/k8s-glue/glue-build/k8s_glue_example.py
Normal file
94
docker/k8s-glue/glue-build/k8s_glue_example.py
Normal file
@@ -0,0 +1,94 @@
|
||||
"""
|
||||
This example assumes you have preconfigured services with selectors in the form of
|
||||
"ai.allegro.agent.serial=pod-<number>" and a targetPort of 10022.
|
||||
The K8sIntegration component will label each pod accordingly.
|
||||
"""
|
||||
from argparse import ArgumentParser
|
||||
|
||||
from clearml_agent.glue.k8s import K8sIntegration
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = ArgumentParser()
|
||||
group = parser.add_mutually_exclusive_group()
|
||||
|
||||
parser.add_argument(
|
||||
"--queue", type=str, help="Queue to pull tasks from"
|
||||
)
|
||||
group.add_argument(
|
||||
"--ports-mode", action='store_true', default=False,
|
||||
help="Ports-Mode will add a label to the pod which can be used as service, in order to expose ports"
|
||||
"Should not be used with max-pods"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--num-of-services", type=int, default=20,
|
||||
help="Specify the number of k8s services to be used. Use only with ports-mode."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--base-port", type=int,
|
||||
help="Used in conjunction with ports-mode, specifies the base port exposed by the services. "
|
||||
"For pod #X, the port will be <base-port>+X. Note that pod number is calculated based on base-pod-num"
|
||||
"e.g. if base-port=20000 and base-pod-num=3, the port for the first pod will be 20003"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--base-pod-num", type=int, default=1,
|
||||
help="Used in conjunction with ports-mode and base-port, specifies the base pod number to be used by the "
|
||||
"service (default: %(default)s)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--gateway-address", type=str, default=None,
|
||||
help="Used in conjunction with ports-mode, specify the external address of the k8s ingress / ELB"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--pod-clearml-conf", type=str,
|
||||
help="Configuration file to be used by the pod itself (if not provided, current configuration is used)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--overrides-yaml", type=str,
|
||||
help="YAML file containing pod overrides to be used when launching a new pod"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--template-yaml", type=str,
|
||||
help="YAML file containing pod template. If provided pod will be scheduled with kubectl apply "
|
||||
"and overrides are ignored, otherwise it will be scheduled with kubectl run"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--ssh-server-port", type=int, default=0,
|
||||
help="If non-zero, every pod will also start an SSH server on the selected port (default: zero, not active)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--namespace", type=str,
|
||||
help="Specify the namespace in which pods will be created (default: %(default)s)", default="clearml"
|
||||
)
|
||||
group.add_argument(
|
||||
"--max-pods", type=int,
|
||||
help="Limit the maximum number of pods that this service can run at the same time."
|
||||
"Should not be used with ports-mode"
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
|
||||
user_props_cb = None
|
||||
if args.ports_mode and args.base_port:
|
||||
def k8s_user_props_cb(pod_number=0):
|
||||
user_prop = {"k8s-pod-port": args.base_port + pod_number}
|
||||
if args.gateway_address:
|
||||
user_prop["k8s-gateway-address"] = args.gateway_address
|
||||
return user_prop
|
||||
user_props_cb = k8s_user_props_cb
|
||||
|
||||
k8s = K8sIntegration(
|
||||
ports_mode=args.ports_mode, num_of_services=args.num_of_services, base_pod_num=args.base_pod_num,
|
||||
user_props_cb=user_props_cb, overrides_yaml=args.overrides_yaml, clearml_conf_file=args.pod_clearml_conf,
|
||||
template_yaml=args.template_yaml, extra_bash_init_script=K8sIntegration.get_ssh_server_bash(
|
||||
ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None,
|
||||
namespace=args.namespace, max_pods_limit=args.max_pods or None,
|
||||
)
|
||||
k8s.k8s_daemon(args.queue)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -4,7 +4,7 @@ api {
|
||||
web_server: https://demoapp.demo.clear.ml
|
||||
files_server: https://demofiles.demo.clear.ml
|
||||
|
||||
# Credentials are generated in the webapp, https://demoapp.demo.clear.ml/profile
|
||||
# Credentials are generated in the webapp, https://app.clear.ml/settings/workspace-configuration
|
||||
# Overridden with os environment: CLEARML_API_ACCESS_KEY / CLEARML_API_SECRET_KEY
|
||||
credentials {"access_key": "EGRTCO8JMSIGI6S39GTP43NFWXDQOW", "secret_key": "x!XTov_G-#vspE*Y(h$Anm&DIc5Ou-F)jsl$PdOyj5wG1&E!Z8"}
|
||||
|
||||
@@ -15,7 +15,11 @@ api {
|
||||
agent {
|
||||
# Set GIT user/pass credentials (if user/pass are set, GIT protocol will be set to https)
|
||||
# leave blank for GIT SSH credentials (set force_git_ssh_protocol=true to force SSH protocol)
|
||||
# Notice: GitHub personal token is equivalent to password, you can put it directly into `git_pass`
|
||||
# **Notice**: GitHub personal token is equivalent to password, you can put it directly into `git_pass`
|
||||
# To learn how to generate git token GitHub/Bitbucket/GitLab:
|
||||
# https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/creating-a-personal-access-token
|
||||
# https://support.atlassian.com/bitbucket-cloud/docs/app-passwords/
|
||||
# https://docs.gitlab.com/ee/user/profile/personal_access_tokens.html
|
||||
git_user=""
|
||||
git_pass=""
|
||||
# Limit credentials to a single domain, for example: github.com,
|
||||
|
||||
@@ -11,7 +11,7 @@ python-dateutil>=2.4.2,<2.9.0
|
||||
pyjwt>=1.6.4,<2.1.0
|
||||
PyYAML>=3.12,<5.5.0
|
||||
requests>=2.20.0,<2.26.0
|
||||
six>=1.11.0,<1.16.0
|
||||
typing>=3.6.4,<3.8.0
|
||||
six>=1.13.0,<1.16.0
|
||||
typing>=3.6.4,<3.8.0 ; python_version < '3.5'
|
||||
urllib3>=1.21.1,<1.27.0
|
||||
virtualenv>=16,<21
|
||||
|
||||
Reference in New Issue
Block a user