Merge branch 'allegroai:master' into master

This commit is contained in:
pollfly 2021-06-29 07:37:10 +03:00 committed by GitHub
commit 05ec45352c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 110 additions and 53 deletions

View File

@ -112,7 +112,7 @@ class Session(TokenManager):
self.__access_key = api_key or ENV_ACCESS_KEY.get(
default=(self.config.get("api.credentials.access_key", None) or self.default_key),
value_cb=lambda key, value: logger.info("Using environment access key {}={}".format(key, value))
value_cb=lambda key, value: print("Using environment access key {}={}".format(key, value))
)
if not self.access_key:
raise ValueError(
@ -121,7 +121,7 @@ class Session(TokenManager):
self.__secret_key = secret_key or ENV_SECRET_KEY.get(
default=(self.config.get("api.credentials.secret_key", None) or self.default_secret),
value_cb=lambda key, value: logger.info("Using environment secret key {}=********".format(key))
value_cb=lambda key, value: print("Using environment secret key {}=********".format(key))
)
if not self.secret_key:
raise ValueError(

View File

@ -2370,6 +2370,10 @@ class Worker(ServiceCommandSection):
if not self.is_conda:
package_api.out_of_scope_install_package('Cython')
# add support for -r <file.txt> in requirements
if requirements_manager:
requirements_manager.set_cwd(cwd)
cached_requirements_failed = False
if cached_requirements and (cached_requirements.get('pip') is not None or
cached_requirements.get('conda') is not None):

View File

@ -4,13 +4,14 @@ import warnings
from .requirement import Requirement
def parse(reqstr):
def parse(reqstr, cwd=None):
"""
Parse a requirements file into a list of Requirements
See: pip/req.py:parse_requirements()
:param reqstr: a string or file like object containing requirements
:param cwd: Optional current working dir for -r file.txt loading
:returns: a *generator* of Requirement objects
"""
filename = getattr(reqstr, 'name', None)
@ -32,8 +33,8 @@ def parse(reqstr):
continue
elif line.startswith('-r') or line.startswith('--requirement'):
_, new_filename = line.split()
new_file_path = os.path.join(os.path.dirname(filename or '.'),
new_filename)
new_file_path = os.path.join(
os.path.dirname(filename or '.') if filename or not cwd else cwd, new_filename)
with open(new_file_path) as f:
for requirement in parse(f):
yield requirement

View File

@ -20,6 +20,15 @@ VCS_REGEX = re.compile(
r'(#(?P<fragment>\S+))?'
)
VCS_EXT_REGEX = re.compile(
r'^(?P<scheme>{0})(@)'.format(r'|'.join(
[scheme.replace('+', r'\+') for scheme in ['git+git']])) +
r'((?P<login>[^/@]+)@)?'
r'(?P<path>[^#@]+)'
r'(@(?P<revision>[^#]+))?'
r'(#(?P<fragment>\S+))?'
)
# This matches just about everyting
LOCAL_REGEX = re.compile(
r'^((?P<scheme>file)://)?'
@ -100,7 +109,7 @@ class Requirement(object):
req = cls('-e {0}'.format(line))
req.editable = True
vcs_match = VCS_REGEX.match(line)
vcs_match = VCS_REGEX.match(line) or VCS_EXT_REGEX.match(line)
local_match = LOCAL_REGEX.match(line)
if vcs_match is not None:
@ -147,7 +156,7 @@ class Requirement(object):
req = cls(line)
vcs_match = VCS_REGEX.match(line)
vcs_match = VCS_REGEX.match(line) or VCS_EXT_REGEX.match(line)
uri_match = URI_REGEX.match(line)
local_match = LOCAL_REGEX.match(line)
@ -226,7 +235,7 @@ class Requirement(object):
# check if the name is valid & parsed
Req.parse(name)
# if we are here, name is a valid package name, check if the vcs part is valid
if VCS_REGEX.match(uri):
if VCS_REGEX.match(uri) or VCS_EXT_REGEX.match(uri):
req = cls.parse_line(uri)
req.name = name
return req

View File

@ -12,12 +12,12 @@ from copy import deepcopy
from pathlib import Path
from threading import Thread
from time import sleep
from typing import Text, List
from typing import Text, List, Callable, Any, Collection, Optional, Union
import yaml
from clearml_agent.commands.events import Events
from clearml_agent.commands.worker import Worker
from clearml_agent.commands.worker import Worker, get_task_container
from clearml_agent.definitions import ENV_DOCKER_IMAGE
from clearml_agent.errors import APIError
from clearml_agent.helper.base import safe_remove_file
@ -31,16 +31,18 @@ class K8sIntegration(Worker):
K8S_PENDING_QUEUE = "k8s_scheduler"
K8S_DEFAULT_NAMESPACE = "clearml"
AGENT_LABEL = "CLEARML=agent"
LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}"
KUBECTL_APPLY_CMD = "kubectl apply --namespace={namespace} -f"
KUBECTL_RUN_CMD = "kubectl run clearml-id-{task_id} " \
"--image {docker_image} " \
"--image {docker_image} {docker_args} " \
"--restart=Never " \
"--namespace={namespace}"
KUBECTL_DELETE_CMD = "kubectl delete pods " \
"--selector=TRAINS=agent " \
"--selector={selector} " \
"--field-selector=status.phase!=Pending,status.phase!=Running " \
"--namespace={namespace}"
@ -72,12 +74,10 @@ class K8sIntegration(Worker):
"[ ! -z $LOCAL_PYTHON ] || export LOCAL_PYTHON=python3",
"$LOCAL_PYTHON -m pip install clearml-agent",
"{extra_bash_init_cmd}",
"{extra_docker_bash_script}",
"$LOCAL_PYTHON -m clearml_agent execute --full-monitoring --require-queue --id {task_id}"
]
AGENT_LABEL = "TRAINS=agent"
LIMIT_POD_LABEL = "ai.allegro.agent.serial=pod-{pod_number}"
_edit_hyperparams_version = "2.9"
def __init__(
@ -104,7 +104,7 @@ class K8sIntegration(Worker):
:param str k8s_pending_queue_name: queue name to use when task is pending in the k8s scheduler
:param str|callable kubectl_cmd: kubectl command line str, supports formatting (default: KUBECTL_RUN_CMD)
example: "task={task_id} image={docker_image} queue_id={queue_id}"
or a callable function: kubectl_cmd(task_id, docker_image, queue_id, task_data)
or a callable function: kubectl_cmd(task_id, docker_image, docker_args, queue_id, task_data)
:param str container_bash_script: container bash script to be executed in k8s (default: CONTAINER_BASH_SCRIPT)
Notice this string will use format() call, if you have curly brackets they should be doubled { -> {{
Format arguments passed: {task_id} and {extra_bash_init_cmd}
@ -275,16 +275,12 @@ class K8sIntegration(Worker):
task_id, self.k8s_pending_queue_name, e))
return
if task_data.execution.docker_cmd:
docker_cmd = task_data.execution.docker_cmd
else:
docker_cmd = str(ENV_DOCKER_IMAGE.get() or
self._session.config.get("agent.default_docker.image", "nvidia/cuda"))
# take the first part, this is the docker image name (not arguments)
docker_parts = docker_cmd.split()
docker_image = docker_parts[0]
docker_args = docker_parts[1:] if len(docker_parts) > 1 else []
container = get_task_container(self._session, task_id)
if not container.get('image'):
container['image'] = str(
ENV_DOCKER_IMAGE.get() or self._session.config.get("agent.default_docker.image", "nvidia/cuda")
)
container['arguments'] = self._session.config.get("agent.default_docker.arguments", None)
# get the clearml.conf encoded file
# noinspection PyProtectedMember
@ -378,17 +374,20 @@ class K8sIntegration(Worker):
else:
print("Kubernetes scheduling task id={}".format(task_id))
kubectl_kwargs = dict(
create_clearml_conf=create_clearml_conf,
labels=labels,
docker_image=container['image'],
docker_args=container['arguments'],
docker_bash=container.get('setup_shell_script'),
task_id=task_id,
queue=queue
)
if self.template_dict:
output, error = self._kubectl_apply(
create_clearml_conf=create_clearml_conf,
labels=labels, docker_image=docker_image, docker_args=docker_args,
task_id=task_id, queue=queue)
output, error = self._kubectl_apply(**kubectl_kwargs)
else:
output, error = self._kubectl_run(
create_clearml_conf=create_clearml_conf,
labels=labels, docker_image=docker_cmd,
task_data=task_data,
task_id=task_id, queue=queue)
output, error = self._kubectl_run(task_data=task_data, **kubectl_kwargs)
error = '' if not error else (error if isinstance(error, str) else error.decode('utf-8'))
output = '' if not output else (output if isinstance(output, str) else output.decode('utf-8'))
@ -422,20 +421,33 @@ class K8sIntegration(Worker):
**user_props
)
def _parse_docker_args(self, docker_args):
# type: (list) -> dict
kube_args = []
while docker_args:
cmd = docker_args.pop(0).strip()
if cmd in ('-e', '--env',):
env = docker_args.pop(0).strip()
key, value = env.split('=', 1)
kube_args.append({'name': key, 'value': value})
def _get_docker_args(self, docker_args, flags, target=None, convert=None):
# type: (List[str], Collection[str], Optional[str], Callable[[str], Any]) -> Union[dict, List[str]]
"""
Get docker args matching specific flags.
:argument docker_args: List of docker argument strings (flags and values)
:argument flags: List of flags/names to intercept (e.g. "--env" etc.)
:argument target: Controls return format. If provided, returns a dict with a target field containing a list
of result strings, otherwise returns a list of result strings
:argument convert: Optional conversion function for each result string
"""
args = docker_args[:] if docker_args else []
results = []
while args:
cmd = args.pop(0).strip()
if cmd in flags:
env = args.pop(0).strip()
if convert:
env = convert(env)
results.append(env)
else:
self.log.warning('skipping docker argument {} (only -e --env supported)'.format(cmd))
return {'env': kube_args} if kube_args else {}
if target:
return {target: results} if results else {}
return results
def _kubectl_apply(self, create_clearml_conf, docker_image, docker_args, labels, queue, task_id):
def _kubectl_apply(self, create_clearml_conf, docker_image, docker_args, docker_bash, labels, queue, task_id):
template = deepcopy(self.template_dict)
template.setdefault('apiVersion', 'v1')
template['kind'] = 'Pod'
@ -449,14 +461,26 @@ class K8sIntegration(Worker):
labels_dict = dict(pair.split('=', 1) for pair in labels)
template['metadata'].setdefault('labels', {})
template['metadata']['labels'].update(labels_dict)
container = self._parse_docker_args(docker_args)
container = self._get_docker_args(
docker_args,
target="env",
flags={"-e", "--env"},
convert=lambda env: {'name': env.partition("=")[0], 'value': env.partition("=")[2]},
)
container_bash_script = [self.container_bash_script] if isinstance(self.container_bash_script, str) \
else self.container_bash_script
extra_docker_bash_script = '\n'.join(self._session.config.get("agent.extra_docker_shell_script", None) or [])
if docker_bash:
extra_docker_bash_script += '\n' + str(docker_bash) + '\n'
script_encoded = '\n'.join(
['#!/bin/bash', ] +
[line.format(extra_bash_init_cmd=self.extra_bash_init_script or '', task_id=task_id)
[line.format(extra_bash_init_cmd=self.extra_bash_init_script or '',
task_id=task_id,
extra_docker_bash_script=extra_docker_bash_script)
for line in container_bash_script])
create_init_script = \
@ -479,6 +503,10 @@ class K8sIntegration(Worker):
else:
template['spec']['containers'].append(container)
if self._docker_force_pull:
for c in template['spec']['containers']:
c.setdefault('imagePullPolicy', 'Always')
fp, yaml_file = tempfile.mkstemp(prefix='clearml_k8stmpl_', suffix='.yml')
os.close(fp)
with open(yaml_file, 'wt') as f:
@ -506,13 +534,18 @@ class K8sIntegration(Worker):
return output, error
def _kubectl_run(self, create_clearml_conf, docker_image, labels, queue, task_data, task_id):
def _kubectl_run(
self, create_clearml_conf, docker_image, docker_args, docker_bash, labels, queue, task_data, task_id
):
if callable(self.kubectl_cmd):
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data)
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, docker_args, queue, task_data)
else:
kubectl_cmd = self.kubectl_cmd.format(
task_id=task_id,
docker_image=docker_image,
docker_args=" ".join(self._get_docker_args(
docker_args, flags={"-e", "--env"}, convert=lambda env: '--env={}'.format(env))
),
queue_id=queue,
namespace=self.namespace,
)
@ -528,6 +561,9 @@ class K8sIntegration(Worker):
if self.pod_requests:
kubectl_cmd += ['--requests', ",".join(self.pod_requests)]
if self._docker_force_pull and not any(x.startswith("--image-pull-policy=") for x in kubectl_cmd):
kubectl_cmd += ["--image-pull-policy='always'"]
container_bash_script = [self.container_bash_script] if isinstance(self.container_bash_script, str) \
else self.container_bash_script
container_bash_script = ' ; '.join(container_bash_script)
@ -539,7 +575,10 @@ class K8sIntegration(Worker):
"/bin/sh",
"-c",
"{} ; {}".format(create_clearml_conf, container_bash_script.format(
extra_bash_init_cmd=self.extra_bash_init_script, task_id=task_id)),
extra_bash_init_cmd=self.extra_bash_init_script or "",
extra_docker_bash_script=docker_bash or "",
task_id=task_id
)),
]
process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
@ -572,7 +611,7 @@ class K8sIntegration(Worker):
# iterate over queues (priority style, queues[0] is highest)
for queue in queues:
# delete old completed / failed pods
get_bash_output(self.KUBECTL_DELETE_CMD.format(namespace=self.namespace))
get_bash_output(self.KUBECTL_DELETE_CMD.format(namespace=self.namespace, selector=self.AGENT_LABEL))
# get next task in queue
try:

View File

@ -448,10 +448,14 @@ class RequirementsManager(object):
self.translator = RequirementsTranslator(session, interpreter=base_interpreter,
cache_dir=pip_cache_dir.as_posix())
self._base_interpreter = base_interpreter
self._cwd = None
def register(self, cls): # type: (Type[RequirementSubstitution]) -> None
self.handlers.append(cls(self._session))
def set_cwd(self, cwd):
self._cwd = str(cwd) if cwd else None
def _replace_one(self, req): # type: (MarkerRequirement) -> Optional[Text]
match = re.search(r';\s*(.*)', Text(req))
if match:
@ -466,7 +470,7 @@ class RequirementsManager(object):
def replace(self, requirements): # type: (Text) -> Text
def safe_parse(req_str):
try:
return next(parse(req_str))
return next(parse(req_str, cwd=self._cwd))
except Exception as ex:
return Requirement(req_str)