From f41ed09dc1af24f886d3e57dd5599fe014fe2232 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Tue, 21 Dec 2021 14:20:43 +0200 Subject: [PATCH] Add support for custom docker image resolving --- clearml_agent/backend_api/session/session.py | 6 + clearml_agent/commands/resolver.py | 166 +++++++++++++++++++ clearml_agent/commands/worker.py | 18 +- clearml_agent/helper/package/conda_api.py | 35 ++-- clearml_agent/helper/package/requirements.py | 121 ++++++++++++-- clearml_agent/session.py | 21 ++- docs/clearml.conf | 47 ++++++ 7 files changed, 380 insertions(+), 34 deletions(-) create mode 100644 clearml_agent/commands/resolver.py diff --git a/clearml_agent/backend_api/session/session.py b/clearml_agent/backend_api/session/session.py index 71a9ea1..b36f980 100644 --- a/clearml_agent/backend_api/session/session.py +++ b/clearml_agent/backend_api/session/session.py @@ -240,6 +240,12 @@ class Session(TokenManager): except Exception as ex: print("Failed getting vaults: {}".format(ex)) + def verify_feature_set(self, feature_set): + if isinstance(feature_set, str): + feature_set = [feature_set] + if self.feature_set not in feature_set: + raise ValueError('ClearML-server does not support requested feature set {}'.format(feature_set)) + def _send_request( self, service, diff --git a/clearml_agent/commands/resolver.py b/clearml_agent/commands/resolver.py new file mode 100644 index 0000000..e47aa68 --- /dev/null +++ b/clearml_agent/commands/resolver.py @@ -0,0 +1,166 @@ +import json +import re +import shlex +from clearml_agent.helper.package.requirements import ( + RequirementsManager, MarkerRequirement, + compare_version_rules, ) + + +def resolve_default_container(session, task_id, container_config): + container_lookup = session.config.get('agent.default_docker.match_rules', None) + if not session.check_min_api_version("2.13") or not container_lookup: + return container_config + + # check backend support before sending any more requests (because they will fail and crash the Task) + try: + session.verify_feature_set('advanced') + except ValueError: + return container_config + + result = session.send_request( + service='tasks', + action='get_all', + version='2.14', + json={'id': [task_id], + 'only_fields': ['script.requirements', 'script.binary', + 'script.repository', 'script.branch', + 'project', 'container'], + 'search_hidden': True}, + method='get', + async_enable=False, + ) + try: + task_info = result.json()['data']['tasks'][0] if result.ok else {} + except (ValueError, TypeError): + return container_config + + from clearml_agent.external.requirements_parser.requirement import Requirement + + # store tasks repository + repository = task_info.get('script', {}).get('repository') or '' + branch = task_info.get('script', {}).get('branch') or '' + binary = task_info.get('script', {}).get('binary') or '' + requested_container = task_info.get('container', {}) + + # get project full path + project_full_name = '' + if task_info.get('project', None): + result = session.send_request( + service='projects', + action='get_all', + version='2.13', + json={ + 'id': [task_info.get('project')], + 'only_fields': ['name'], + }, + method='get', + async_enable=False, + ) + try: + if result.ok: + project_full_name = result.json()['data']['projects'][0]['name'] or '' + except (ValueError, TypeError): + pass + + task_packages_lookup = {} + for entry in container_lookup: + match = entry.get('match', None) + if not match: + continue + if match.get('project', None): + # noinspection PyBroadException + try: + if not re.search(match.get('project', None), project_full_name): + continue + except Exception: + print('Failed parsing regular expression \"{}\" in rule: {}'.format( + match.get('project', None), entry)) + continue + + if match.get('script.repository', None): + # noinspection PyBroadException + try: + if not re.search(match.get('script.repository', None), repository): + continue + except Exception: + print('Failed parsing regular expression \"{}\" in rule: {}'.format( + match.get('script.repository', None), entry)) + continue + + if match.get('script.branch', None): + # noinspection PyBroadException + try: + if not re.search(match.get('script.branch', None), branch): + continue + except Exception: + print('Failed parsing regular expression \"{}\" in rule: {}'.format( + match.get('script.branch', None), entry)) + continue + + if match.get('script.binary', None): + # noinspection PyBroadException + try: + if not re.search(match.get('script.binary', None), binary): + continue + except Exception: + print('Failed parsing regular expression \"{}\" in rule: {}'.format( + match.get('script.binary', None), entry)) + continue + + if match.get('container', None): + # noinspection PyBroadException + try: + if not re.search(match.get('container', None), requested_container.get('image', '')): + continue + except Exception: + print('Failed parsing regular expression \"{}\" in rule: {}'.format( + match.get('container', None), entry)) + continue + + matched = True + for req_section in ['script.requirements.pip', 'script.requirements.conda']: + if not match.get(req_section, None): + continue + + match_pip_reqs = [MarkerRequirement(Requirement.parse('{} {}'.format(k, v))) + for k, v in match.get(req_section, None).items()] + + if not task_packages_lookup.get(req_section): + req_section_parts = req_section.split('.') + task_packages_lookup[req_section] = \ + RequirementsManager.parse_requirements_section_to_marker_requirements( + requirements=task_info.get(req_section_parts[0], {}).get( + req_section_parts[1], {}).get(req_section_parts[2], None) + ) + + matched_all_reqs = True + for mr in match_pip_reqs: + matched_req = False + for pr in task_packages_lookup[req_section]: + if mr.req.name != pr.req.name: + continue + if compare_version_rules(mr.specs, pr.specs): + matched_req = True + break + if not matched_req: + matched_all_reqs = False + break + + # if ew have a match, check second section + if matched_all_reqs: + continue + # no match stop + matched = False + break + + if matched: + if not container_config.get('container'): + container_config['container'] = entry.get('image', None) + if not container_config.get('arguments'): + container_config['arguments'] = entry.get('arguments', None) + container_config['arguments'] = shlex.split(str(container_config.get('arguments') or '').strip()) + print('Matching default container with rule:\n{}'.format(json.dumps(entry))) + return container_config + + return container_config + diff --git a/clearml_agent/commands/worker.py b/clearml_agent/commands/worker.py index 7023716..741ee75 100644 --- a/clearml_agent/commands/worker.py +++ b/clearml_agent/commands/worker.py @@ -41,6 +41,7 @@ from clearml_agent.backend_api.session.defs import ENV_ENABLE_ENV_CONFIG_SECTION from clearml_agent.backend_config.defs import UptimeConf from clearml_agent.backend_config.utils import apply_environment, apply_files from clearml_agent.commands.base import resolve_names, ServiceCommandSection +from clearml_agent.commands.resolver import resolve_default_container from clearml_agent.definitions import ( ENVIRONMENT_SDK_PARAMS, PROGRAM_NAME, @@ -102,7 +103,8 @@ from clearml_agent.helper.package.poetry_api import PoetryConfig, PoetryAPI from clearml_agent.helper.package.post_req import PostRequirement from clearml_agent.helper.package.priority_req import PriorityPackageRequirement, PackageCollectorRequirement from clearml_agent.helper.package.pytorch import PytorchRequirement -from clearml_agent.helper.package.requirements import RequirementsManager +from clearml_agent.helper.package.requirements import ( + RequirementsManager, ) from clearml_agent.helper.package.venv_update_api import VenvUpdateAPI from clearml_agent.helper.process import ( kill_all_child_processes, @@ -330,6 +332,9 @@ def get_task_container(session, task_id): except (ValueError, TypeError): container = {} + if (not container or not container.get('container')) and session.check_min_api_version("2.13"): + container = resolve_default_container(session=session, task_id=task_id, container_config=container) + return container @@ -629,7 +634,7 @@ class Worker(ServiceCommandSection): :param queue: ID of queue that task was pulled from :param task_id: ID of task to run :param worker_args: Worker command line arguments - :params task_session: The session for running operations on the passed task + :param task_session: The session for running operations on the passed task :param docker: Docker image in which the execution task will run """ # start new process and execute task id @@ -1118,6 +1123,7 @@ class Worker(ServiceCommandSection): return queue_tags, runtime_props def get_runtime_properties(self): + # TODO: refactor to use the Session env State if self._runtime_props_support is not True: # either not supported or never tested if self._runtime_props_support == self._session.api_version: @@ -2843,7 +2849,8 @@ class Worker(ServiceCommandSection): requested_python_version = \ requested_python_version or \ Text(self._session.config.get("agent.python_binary", None)) or \ - Text(self._session.config.get("agent.default_python", None)) + Text(self._session.config.get("agent.default_python", None)) or \ + '{}.{}'.format(sys.version_info.major, sys.version_info.minor) if self.is_conda: executable_version_suffix = \ @@ -2870,13 +2877,14 @@ class Worker(ServiceCommandSection): self.find_python_executable_for_version(requested_python_version) except Exception: def_python_version = Text(self._session.config.get("agent.python_binary", None)) or \ - Text(self._session.config.get("agent.default_python", None)) + Text(self._session.config.get("agent.default_python", None)) or \ + '{}.{}'.format(sys.version_info.major, sys.version_info.minor) print('Warning: could not locate requested Python version {}, reverting to version {}'.format( requested_python_version, def_python_version)) executable_version, executable_version_suffix, executable_name = \ self.find_python_executable_for_version(def_python_version) - self._session.config.put("agent.default_python", executable_version) + self._session.config.put("agent.default_python", executable_version_suffix) self._session.config.put("agent.python_binary", executable_name) venv_dir = Path(venv_dir) if venv_dir else \ diff --git a/clearml_agent/helper/package/conda_api.py b/clearml_agent/helper/package/conda_api.py index 91b9fe9..04e3b0d 100644 --- a/clearml_agent/helper/package/conda_api.py +++ b/clearml_agent/helper/package/conda_api.py @@ -189,14 +189,6 @@ class CondaAPI(PackageManager): if conda_env.is_file() and not is_windows_platform(): self.source = self.pip.source = CommandSequence(('source', conda_env.as_posix()), self.source) - # install cuda toolkit - # noinspection PyBroadException - try: - cuda_version = float(int(self.session.config['agent.cuda_version'])) / 10.0 - if cuda_version > 0: - self._install('cudatoolkit={:.1f}'.format(cuda_version)) - except Exception: - pass return self def _init_existing_environment(self, conda_pre_build_env_path): @@ -456,7 +448,9 @@ class CondaAPI(PackageManager): requirements['conda'] = requirements['conda'].split('\n') has_torch = False has_matplotlib = False + has_cudatoolkit = False try: + # notice this is an integer version: 112 (means 11.2) cuda_version = int(self.session.config.get('agent.cuda_version', 0)) except: cuda_version = 0 @@ -488,6 +482,19 @@ class CondaAPI(PackageManager): if '.' not in m.specs[0][1]: continue + if m.name.lower() == 'cudatoolkit': + # skip cuda if we are running on CPU + if not cuda_version: + continue + + has_cudatoolkit = True + # cuda version, only major.minor + requested_cuda_version = '.'.join(m.specs[0][1].split('.')[:2]) + # make sure that the cuda_version we support can install the requested cuda (major version) + if int(float(requested_cuda_version)) > int(float(cuda_version)/10.0): + continue + m.specs = [(m.specs[0][0], str(requested_cuda_version)), ] + conda_supported_req_names.append(m.name.lower()) if m.req.name.lower() == 'matplotlib': has_matplotlib = True @@ -504,6 +511,10 @@ class CondaAPI(PackageManager): reqs.append(m) + if not has_cudatoolkit and cuda_version: + m = MarkerRequirement(Requirement("cudatoolkit == {}".format(float(cuda_version) / 10.0))) + reqs.append(m) + # if we have a conda list, the rest should be installed with pip, # this means any experiment that was executed with pip environment, # will be installed using pip @@ -559,8 +570,12 @@ class CondaAPI(PackageManager): # change _ to - in name but not the prefix _ (as this is conda prefix) if r.name and not r.name.startswith('_') and not requirements.get('conda', None): r.name = r.name.replace('_', '-') - # remove .post from version numbers, it fails ~= version, and change == to ~= - if r.specs and r.specs[0]: + + if has_cudatoolkit and r.specs and len(r.specs[0]) > 1 and r.name == 'cudatoolkit': + # select specific cuda version if it came from the requirements + r.specs = [(r.specs[0][0].replace('==', '='), r.specs[0][1].split('.post')[0])] + elif r.specs and r.specs[0] and len(r.specs[0]) > 1: + # remove .post from version numbers it fails with ~= version, and change == to ~= r.specs = [(r.specs[0][0].replace('==', '~='), r.specs[0][1].split('.post')[0])] while reqs: diff --git a/clearml_agent/helper/package/requirements.py b/clearml_agent/helper/package/requirements.py index f8b68e8..933d565 100644 --- a/clearml_agent/helper/package/requirements.py +++ b/clearml_agent/helper/package/requirements.py @@ -208,7 +208,11 @@ class SimpleVersion: if not version_b: return True + if not num_parts: + num_parts = max(len(version_a.split('.')), len(version_b.split('.')), ) + if op == '~=': + num_parts = len(version_b.split('.')) - 1 num_parts = max(num_parts, 2) op = '==' ignore_sub_versions = True @@ -245,6 +249,16 @@ class SimpleVersion: return version_a_key < version_b_key raise ValueError('Unrecognized comparison operator [{}]'.format(op)) + @classmethod + def max_version(cls, version_a, version_b): + return version_a if cls.compare_versions( + version_a=version_a, op='>=', version_b=version_b, num_parts=None) else version_b + + @classmethod + def min_version(cls, version_a, version_b): + return version_a if cls.compare_versions( + version_a=version_a, op='<=', version_b=version_b, num_parts=None) else version_b + @staticmethod def _parse_letter_version( letter, # type: str @@ -313,6 +327,77 @@ class SimpleVersion: return () +def compare_version_rules(specs_a, specs_b): + # specs_a/b are a list of tuples: [('==', '1.2.3'), ] or [('>=', '1.2'), ('<', '1.3')] + # section definition: + class Section(object): + def __init__(self, left=None, left_eq=False, right=None, right_eq=False): + self.left, self.left_eq, self.right, self.right_eq = left, left_eq, right, right_eq + # first create a list of in/out sections for each spec + # >, >= are left rule + # <, <= are right rule + # ~= x.y.z is converted to: >= x.y and < x.y+1 + # ==/=== are converted to: >= and <= + # != x.y.z will split a section into: left < x.y.z and right > x.y.z + def create_section(specs): + section = Section() + for op, v in specs: + a = section + if op == '>': + a.left = v + a.left_eq = False + elif op == '>=': + a.left = v + a.left_eq = True + elif op == '<': + a.right = v + a.right_eq = False + elif op == '<=': + a.right = v + a.right_eq = True + elif op == '==': + a.left = v + a.left_eq = True + a.right = v + a.right_eq = True + elif op == '~=': + new_v = v.split('.') + a_left = '.'.join(new_v[:-1]) + a.left = a_left if not a.left else SimpleVersion.max_version(a_left, a.left) + a.left_eq = True + a_right = '.'.join(new_v[:-2] + [str(int(new_v[-2])+1)]) + a.right = a_right if not a.right else SimpleVersion.min_version(a_right, a.right) + a.right_eq = False if a.right == a_right else a.right_eq + + return section + + section_a = create_section(specs_a) + section_b = create_section(specs_b) + i = Section() + # then we have a list of sections for spec A/B + if section_a.left == section_b.left: + i.left = section_a.left + i.left_eq = section_a.left_eq and section_b.left_eq + else: + i.left = SimpleVersion.max_version(section_a.left, section_b.left) + i.left_eq = section_a.left_eq if i.left == section_a.left else section_b.left_eq + if section_a.right == section_b.right: + i.right = section_a.right + i.right_eq = section_a.right_eq and section_b.right_eq + else: + i.right = SimpleVersion.min_version(section_a.right, section_b.right) + i.right_eq = section_a.right_eq if i.right == section_a.right else section_b.right_eq + + # return true if any section from A intersects a section from B + valid = True + valid &= SimpleVersion.compare_versions( + version_a=i.left, op='<=' if i.left_eq else '<', version_b=i.right, num_parts=None) + valid &= SimpleVersion.compare_versions( + version_a=i.right, op='>=' if i.left_eq else '>', version_b=i.left, num_parts=None) + + return valid + + @six.add_metaclass(ABCMeta) class RequirementSubstitution(object): @@ -468,20 +553,9 @@ class RequirementsManager(object): return None def replace(self, requirements): # type: (Text) -> Text - def safe_parse(req_str): - # noinspection PyBroadException - try: - return list(parse(req_str, cwd=self._cwd)) - except Exception as ex: - return [Requirement(req_str)] + parsed_requirements = self.parse_requirements_section_to_marker_requirements( + requirements=requirements, cwd=self._cwd) - parsed_requirements = tuple( - map( - MarkerRequirement, - [r for line in (requirements.splitlines() if isinstance(requirements, six.text_type) else requirements) - for r in safe_parse(line)] - ) - ) if not parsed_requirements: # return the original requirements just in case return requirements @@ -614,3 +688,24 @@ class RequirementsManager(object): return (normalize_cuda_version(cuda_version or 0), normalize_cuda_version(cudnn_version or 0)) + + @staticmethod + def parse_requirements_section_to_marker_requirements(requirements, cwd=None): + def safe_parse(req_str): + # noinspection PyBroadException + try: + return list(parse(req_str, cwd=cwd)) + except Exception as ex: + return [Requirement(req_str)] + + if not requirements: + return tuple() + + parsed_requirements = tuple( + map( + MarkerRequirement, + [r for line in (requirements.splitlines() if isinstance(requirements, str) else requirements) + for r in safe_parse(line)] + ) + ) + return parsed_requirements diff --git a/clearml_agent/session.py b/clearml_agent/session.py index 3de1f81..03c5746 100644 --- a/clearml_agent/session.py +++ b/clearml_agent/session.py @@ -229,26 +229,35 @@ class Session(_Session): except: pass - def print_configuration(self, remove_secret_keys=("secret", "pass", "token", "account_key")): + def print_configuration( + self, + remove_secret_keys=("secret", "pass", "token", "account_key", "contents"), + skip_value_keys=("environment", ) + ): # remove all the secrets from the print - def recursive_remove_secrets(dictionary, secret_keys=()): + def recursive_remove_secrets(dictionary, secret_keys=(), empty_keys=()): for k in list(dictionary): for s in secret_keys: if s in k: dictionary.pop(k) break + for s in empty_keys: + if s == k: + dictionary[k] = {key: '****' for key in dictionary[k]} \ + if isinstance(dictionary[k], dict) else '****' + break if isinstance(dictionary.get(k, None), dict): - recursive_remove_secrets(dictionary[k], secret_keys=secret_keys) + recursive_remove_secrets(dictionary[k], secret_keys=secret_keys, empty_keys=empty_keys) elif isinstance(dictionary.get(k, None), (list, tuple)): for item in dictionary[k]: if isinstance(item, dict): - recursive_remove_secrets(item, secret_keys=secret_keys) + recursive_remove_secrets(item, secret_keys=secret_keys, empty_keys=empty_keys) config = deepcopy(self.config.to_dict()) # remove the env variable, it's not important config.pop('env', None) - if remove_secret_keys: - recursive_remove_secrets(config, secret_keys=remove_secret_keys) + if remove_secret_keys or skip_value_keys: + recursive_remove_secrets(config, secret_keys=remove_secret_keys, empty_keys=skip_value_keys) # remove logging.loggers.urllib3.level from the print try: config['logging']['loggers']['urllib3'].pop('level', None) diff --git a/docs/clearml.conf b/docs/clearml.conf index 8d70e2b..24667aa 100644 --- a/docs/clearml.conf +++ b/docs/clearml.conf @@ -159,6 +159,53 @@ agent { # optional arguments to pass to docker image # arguments: ["--ipc=host"] + + # lookup table rules for default container + # first matched rule will be picked, according to rule order + # enterprise version only + # match_rules: [ + # { + # image: "nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04" + # arguments: "-e define=value" + # match: { + # script{ + # # Optional: must match all requirements (not partial) + # requirements: { + # # version selection matching PEP-440 + # pip: { + # tensorflow: "~=2.6" + # }, + # } + # # Optional: matching based on regular expression, example: "^exact_match$" + # repository: "/my_repository/" + # branch: "main" + # binary: "python3.6" + # } + # # Optional: matching based on regular expression, example: "^exact_match$" + # project: "project/sub_project" + # } + # }, + # { + # image: "nvidia/cuda:10.1-cudnn7-runtime-ubuntu18.04" + # arguments: "-e define=value" + # match: { + # # must match all requirements (not partial) + # script{ + # requirements: { + # conda: { + # torch: ">=2.6,<2.8" + # } + # } + # # no repository matching required + # repository: "" + # } + # # no container image matching required (allow to replace one requested container with another) + # container: "" + # # no repository matching required + # project: "" + # } + # }, + # ] } # set the OS environments based on the Task's Environment section before launching the Task process.