diff --git a/clearml_agent/backend_api/session/session.py b/clearml_agent/backend_api/session/session.py index dbf8cf4..fae184b 100644 --- a/clearml_agent/backend_api/session/session.py +++ b/clearml_agent/backend_api/session/session.py @@ -112,7 +112,7 @@ class Session(TokenManager): self.__access_key = api_key or ENV_ACCESS_KEY.get( default=(self.config.get("api.credentials.access_key", None) or self.default_key), - value_cb=lambda key, value: logger.info("Using environment access key {}={}".format(key, value)) + value_cb=lambda key, value: print("Using environment access key {}={}".format(key, value)) ) if not self.access_key: raise ValueError( @@ -121,7 +121,7 @@ class Session(TokenManager): self.__secret_key = secret_key or ENV_SECRET_KEY.get( default=(self.config.get("api.credentials.secret_key", None) or self.default_secret), - value_cb=lambda key, value: logger.info("Using environment secret key {}=********".format(key)) + value_cb=lambda key, value: print("Using environment secret key {}=********".format(key)) ) if not self.secret_key: raise ValueError( diff --git a/clearml_agent/commands/worker.py b/clearml_agent/commands/worker.py index 17e0949..2be68d6 100644 --- a/clearml_agent/commands/worker.py +++ b/clearml_agent/commands/worker.py @@ -2370,6 +2370,10 @@ class Worker(ServiceCommandSection): if not self.is_conda: package_api.out_of_scope_install_package('Cython') + # add support for -r in requirements + if requirements_manager: + requirements_manager.set_cwd(cwd) + cached_requirements_failed = False if cached_requirements and (cached_requirements.get('pip') is not None or cached_requirements.get('conda') is not None): diff --git a/clearml_agent/external/requirements_parser/parser.py b/clearml_agent/external/requirements_parser/parser.py index 43755dc..8669320 100644 --- a/clearml_agent/external/requirements_parser/parser.py +++ b/clearml_agent/external/requirements_parser/parser.py @@ -4,13 +4,14 @@ import warnings from .requirement import Requirement -def parse(reqstr): +def parse(reqstr, cwd=None): """ Parse a requirements file into a list of Requirements See: pip/req.py:parse_requirements() :param reqstr: a string or file like object containing requirements + :param cwd: Optional current working dir for -r file.txt loading :returns: a *generator* of Requirement objects """ filename = getattr(reqstr, 'name', None) @@ -32,8 +33,8 @@ def parse(reqstr): continue elif line.startswith('-r') or line.startswith('--requirement'): _, new_filename = line.split() - new_file_path = os.path.join(os.path.dirname(filename or '.'), - new_filename) + new_file_path = os.path.join( + os.path.dirname(filename or '.') if filename or not cwd else cwd, new_filename) with open(new_file_path) as f: for requirement in parse(f): yield requirement diff --git a/clearml_agent/external/requirements_parser/requirement.py b/clearml_agent/external/requirements_parser/requirement.py index 7f4af5b..961d5ee 100644 --- a/clearml_agent/external/requirements_parser/requirement.py +++ b/clearml_agent/external/requirements_parser/requirement.py @@ -20,6 +20,15 @@ VCS_REGEX = re.compile( r'(#(?P\S+))?' ) +VCS_EXT_REGEX = re.compile( + r'^(?P{0})(@)'.format(r'|'.join( + [scheme.replace('+', r'\+') for scheme in ['git+git']])) + + r'((?P[^/@]+)@)?' + r'(?P[^#@]+)' + r'(@(?P[^#]+))?' + r'(#(?P\S+))?' +) + # This matches just about everyting LOCAL_REGEX = re.compile( r'^((?Pfile)://)?' @@ -100,7 +109,7 @@ class Requirement(object): req = cls('-e {0}'.format(line)) req.editable = True - vcs_match = VCS_REGEX.match(line) + vcs_match = VCS_REGEX.match(line) or VCS_EXT_REGEX.match(line) local_match = LOCAL_REGEX.match(line) if vcs_match is not None: @@ -147,7 +156,7 @@ class Requirement(object): req = cls(line) - vcs_match = VCS_REGEX.match(line) + vcs_match = VCS_REGEX.match(line) or VCS_EXT_REGEX.match(line) uri_match = URI_REGEX.match(line) local_match = LOCAL_REGEX.match(line) @@ -226,7 +235,7 @@ class Requirement(object): # check if the name is valid & parsed Req.parse(name) # if we are here, name is a valid package name, check if the vcs part is valid - if VCS_REGEX.match(uri): + if VCS_REGEX.match(uri) or VCS_EXT_REGEX.match(uri): req = cls.parse_line(uri) req.name = name return req diff --git a/clearml_agent/glue/k8s.py b/clearml_agent/glue/k8s.py index 4b5375a..24d65d9 100644 --- a/clearml_agent/glue/k8s.py +++ b/clearml_agent/glue/k8s.py @@ -12,12 +12,12 @@ from copy import deepcopy from pathlib import Path from threading import Thread from time import sleep -from typing import Text, List +from typing import Text, List, Callable, Any, Collection, Optional, Union import yaml from clearml_agent.commands.events import Events -from clearml_agent.commands.worker import Worker +from clearml_agent.commands.worker import Worker, get_task_container from clearml_agent.definitions import ENV_DOCKER_IMAGE from clearml_agent.errors import APIError from clearml_agent.helper.base import safe_remove_file @@ -31,16 +31,18 @@ class K8sIntegration(Worker): K8S_PENDING_QUEUE = "k8s_scheduler" K8S_DEFAULT_NAMESPACE = "clearml" + AGENT_LABEL = "CLEARML=agent" + LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}" KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f" KUBECTL_RUN_CMD = "kubectl run clearml-id-{task_id} " \ - "--image {docker_image} " \ + "--image {docker_image} {docker_args} " \ "--restart=Never " \ "--namespace={namespace}" KUBECTL_DELETE_CMD = "kubectl delete pods " \ - "--selector=TRAINS=agent " \ + "--selector={selector} " \ "--field-selector=status.phase!=Pending,status.phase!=Running " \ "--namespace={namespace}" @@ -72,12 +74,10 @@ class K8sIntegration(Worker): "[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3", "$LOCAL_PYTHON -m pip install clearml-agent", "{extra_bash_init_cmd}", + "{extra_docker_bash_script}", "$LOCAL_PYTHON -m clearml_agent execute --full-monitoring --require-queue --id {task_id}" ] - AGENT_LABEL = "TRAINS=agent" - LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}" - _edit_hyperparams_version = "2.9" def __init__( @@ -104,7 +104,7 @@ class K8sIntegration(Worker): :param str k8s_pending_queue_name: queue name to use when task is pending in the k8s scheduler :param str|callable kubectl_cmd: kubectl command line str, supports formatting (default: KUBECTL_RUN_CMD) example: "task={task_id} image={docker_image} queue_id={queue_id}" - or a callable function: kubectl_cmd(task_id, docker_image, queue_id, task_data) + or a callable function: kubectl_cmd(task_id, docker_image, docker_args, queue_id, task_data) :param str container_bash_script: container bash script to be executed in k8s (default: CONTAINER_BASH_SCRIPT) Notice this string will use format() call, if you have curly brackets they should be doubled { -> {{ Format arguments passed: {task_id} and {extra_bash_init_cmd} @@ -275,16 +275,12 @@ class K8sIntegration(Worker): task_id, self.k8s_pending_queue_name, e)) return - if task_data.execution.docker_cmd: - docker_cmd = task_data.execution.docker_cmd - else: - docker_cmd = str(ENV_DOCKER_IMAGE.get() or - self._session.config.get("agent.default_docker.image", "nvidia/cuda")) - - # take the first part, this is the docker image name (not arguments) - docker_parts = docker_cmd.split() - docker_image = docker_parts[0] - docker_args = docker_parts[1:] if len(docker_parts) > 1 else [] + container = get_task_container(self._session, task_id) + if not container.get('image'): + container['image'] = str( + ENV_DOCKER_IMAGE.get() or self._session.config.get("agent.default_docker.image", "nvidia/cuda") + ) + container['arguments'] = self._session.config.get("agent.default_docker.arguments", None) # get the clearml.conf encoded file # noinspection PyProtectedMember @@ -378,17 +374,20 @@ class K8sIntegration(Worker): else: print("Kubernetes scheduling task id={}".format(task_id)) + kubectl_kwargs = dict( + create_clearml_conf=create_clearml_conf, + labels=labels, + docker_image=container['image'], + docker_args=container['arguments'], + docker_bash=container.get('setup_shell_script'), + task_id=task_id, + queue=queue + ) + if self.template_dict: - output, error = self._kubectl_apply( - create_clearml_conf=create_clearml_conf, - labels=labels, docker_image=docker_image, docker_args=docker_args, - task_id=task_id, queue=queue) + output, error = self._kubectl_apply(**kubectl_kwargs) else: - output, error = self._kubectl_run( - create_clearml_conf=create_clearml_conf, - labels=labels, docker_image=docker_cmd, - task_data=task_data, - task_id=task_id, queue=queue) + output, error = self._kubectl_run(task_data=task_data, **kubectl_kwargs) error = '' if not error else (error if isinstance(error, str) else error.decode('utf-8')) output = '' if not output else (output if isinstance(output, str) else output.decode('utf-8')) @@ -422,20 +421,33 @@ class K8sIntegration(Worker): **user_props ) - def _parse_docker_args(self, docker_args): - # type: (list) -> dict - kube_args = [] - while docker_args: - cmd = docker_args.pop(0).strip() - if cmd in ('-e', '--env',): - env = docker_args.pop(0).strip() - key, value = env.split('=', 1) - kube_args.append({'name': key, 'value': value}) + def _get_docker_args(self, docker_args, flags, target=None, convert=None): + # type: (List[str], Collection[str], Optional[str], Callable[[str], Any]) -> Union[dict, List[str]] + """ + Get docker args matching specific flags. + + :argument docker_args: List of docker argument strings (flags and values) + :argument flags: List of flags/names to intercept (e.g. "--env" etc.) + :argument target: Controls return format. If provided, returns a dict with a target field containing a list + of result strings, otherwise returns a list of result strings + :argument convert: Optional conversion function for each result string + """ + args = docker_args[:] if docker_args else [] + results = [] + while args: + cmd = args.pop(0).strip() + if cmd in flags: + env = args.pop(0).strip() + if convert: + env = convert(env) + results.append(env) else: self.log.warning('skipping docker argument {} (only -e --env supported)'.format(cmd)) - return {'env': kube_args} if kube_args else {} + if target: + return {target: results} if results else {} + return results - def _kubectl_apply(self, create_clearml_conf, docker_image, docker_args, labels, queue, task_id): + def _kubectl_apply(self, create_clearml_conf, docker_image, docker_args, docker_bash, labels, queue, task_id): template = deepcopy(self.template_dict) template.setdefault('apiVersion', 'v1') template['kind'] = 'Pod' @@ -449,14 +461,26 @@ class K8sIntegration(Worker): labels_dict = dict(pair.split('=', 1) for pair in labels) template['metadata'].setdefault('labels', {}) template['metadata']['labels'].update(labels_dict) - container = self._parse_docker_args(docker_args) + + container = self._get_docker_args( + docker_args, + target="env", + flags={"-e", "--env"}, + convert=lambda env: {'name': env.partition("=")[0], 'value': env.partition("=")[2]}, + ) container_bash_script = [self.container_bash_script] if isinstance(self.container_bash_script, str) \ else self.container_bash_script + extra_docker_bash_script = '\n'.join(self._session.config.get("agent.extra_docker_shell_script", None) or []) + if docker_bash: + extra_docker_bash_script += '\n' + str(docker_bash) + '\n' + script_encoded = '\n'.join( ['#!/bin/bash', ] + - [line.format(extra_bash_init_cmd=self.extra_bash_init_script or '', task_id=task_id) + [line.format(extra_bash_init_cmd=self.extra_bash_init_script or '', + task_id=task_id, + extra_docker_bash_script=extra_docker_bash_script) for line in container_bash_script]) create_init_script = \ @@ -479,6 +503,10 @@ class K8sIntegration(Worker): else: template['spec']['containers'].append(container) + if self._docker_force_pull: + for c in template['spec']['containers']: + c.setdefault('imagePullPolicy', 'Always') + fp, yaml_file = tempfile.mkstemp(prefix='clearml_k8stmpl_', suffix='.yml') os.close(fp) with open(yaml_file, 'wt') as f: @@ -506,13 +534,18 @@ class K8sIntegration(Worker): return output, error - def _kubectl_run(self, create_clearml_conf, docker_image, labels, queue, task_data, task_id): + def _kubectl_run( + self, create_clearml_conf, docker_image, docker_args, docker_bash, labels, queue, task_data, task_id + ): if callable(self.kubectl_cmd): - kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data) + kubectl_cmd = self.kubectl_cmd(task_id, docker_image, docker_args, queue, task_data) else: kubectl_cmd = self.kubectl_cmd.format( task_id=task_id, docker_image=docker_image, + docker_args=" ".join(self._get_docker_args( + docker_args, flags={"-e", "--env"}, convert=lambda env: '--env={}'.format(env)) + ), queue_id=queue, namespace=self.namespace, ) @@ -528,6 +561,9 @@ class K8sIntegration(Worker): if self.pod_requests: kubectl_cmd += ['--requests', ",".join(self.pod_requests)] + if self._docker_force_pull and not any(x.startswith("--image-pull-policy=") for x in kubectl_cmd): + kubectl_cmd += ["--image-pull-policy='always'"] + container_bash_script = [self.container_bash_script] if isinstance(self.container_bash_script, str) \ else self.container_bash_script container_bash_script = ' ; '.join(container_bash_script) @@ -539,7 +575,10 @@ class K8sIntegration(Worker): "/bin/sh", "-c", "{} ; {}".format(create_clearml_conf, container_bash_script.format( - extra_bash_init_cmd=self.extra_bash_init_script, task_id=task_id)), + extra_bash_init_cmd=self.extra_bash_init_script or "", + extra_docker_bash_script=docker_bash or "", + task_id=task_id + )), ] process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, error = process.communicate() @@ -572,7 +611,7 @@ class K8sIntegration(Worker): # iterate over queues (priority style, queues[0] is highest) for queue in queues: # delete old completed / failed pods - get_bash_output(self.KUBECTL_DELETE_CMD.format(namespace=self.namespace)) + get_bash_output(self.KUBECTL_DELETE_CMD.format(namespace=self.namespace, selector=self.AGENT_LABEL)) # get next task in queue try: diff --git a/clearml_agent/helper/package/requirements.py b/clearml_agent/helper/package/requirements.py index e511e22..4086100 100644 --- a/clearml_agent/helper/package/requirements.py +++ b/clearml_agent/helper/package/requirements.py @@ -448,10 +448,14 @@ class RequirementsManager(object): self.translator = RequirementsTranslator(session, interpreter=base_interpreter, cache_dir=pip_cache_dir.as_posix()) self._base_interpreter = base_interpreter + self._cwd = None def register(self, cls): # type: (Type[RequirementSubstitution]) -> None self.handlers.append(cls(self._session)) + def set_cwd(self, cwd): + self._cwd = str(cwd) if cwd else None + def _replace_one(self, req): # type: (MarkerRequirement) -> Optional[Text] match = re.search(r';\s*(.*)', Text(req)) if match: @@ -466,7 +470,7 @@ class RequirementsManager(object): def replace(self, requirements): # type: (Text) -> Text def safe_parse(req_str): try: - return next(parse(req_str)) + return next(parse(req_str, cwd=self._cwd)) except Exception as ex: return Requirement(req_str)