Compare commits

..

19 Commits

Author SHA1 Message Date
allegroai
a5a797ec5e Version bump to v1.3.0 2022-06-16 23:24:28 +03:00
allegroai
ff6cee4a44 Fix requirements --extra-index-url line with trailing comment
Fix --extra-index-url is added for different command line switches
2022-06-16 23:22:29 +03:00
allegroai
9acbad28f7 Fix repository URL contains credentials even when agent.force_git_ssh_protocolagent.force_git_ssh_protocol is true 2022-06-16 23:20:53 +03:00
allegroai
560e689ccd Fix always make pygobject an optional package (i.e. if installation fails continue the Task package environment setup) 2022-06-16 23:18:55 +03:00
allegroai
f66e42ddb1 Fix optional priority packaged always compare lower case package name 2022-06-16 23:18:31 +03:00
allegroai
d9856d5de5 Add Python 3.10 support 2022-06-16 23:16:06 +03:00
Niels ten Boom
24177cc5a9 Support private repos from requirements.txt file (#107)
* support private repos
* fix double indices
2022-06-15 10:26:24 +03:00
allegroai
178af0dee8 Bump PyJWT version due to "Key confusion through non-blocklisted public key formats" vulnerability 2022-05-25 16:41:26 +03:00
allegroai
51eb0a713c Version bump 2022-05-12 23:31:54 +03:00
allegroai
249aa006cb Make sure that if we have "setuptools" in the original required packages, we preserve the line in the pip freeze list 2022-05-12 23:31:32 +03:00
allegroai
c08e2ac0bb Fix clearml.conf access in non-root containers 2022-05-05 12:23:11 +03:00
allegroai
335ef91d8e Fix git unsafe directory issue (disable check on cached vcs folder) 2022-05-05 12:22:40 +03:00
allegroai
6c7a639673 Fix broken pytorch setuptools incompatibility (force setuptools < 59 if torch is below 1.11) 2022-05-05 12:22:13 +03:00
allegroai
5f77cad5ac Fix error message 2022-04-27 15:36:39 +03:00
allegroai
0228ae0494 Set environment variables before expanding path 2022-04-27 15:14:16 +03:00
allegroai
165677e800 Version bump 2022-04-27 14:59:51 +03:00
allegroai
2e5298b737 Add support for use-owner-token in k8s glue 2022-04-27 14:59:27 +03:00
allegroai
c9ffb8a053 Version bump 2022-04-20 08:57:16 +03:00
allegroai
2466eed23f Fix dynamic GPUs with "all" GPUs on he same worker 2022-04-20 08:56:22 +03:00
13 changed files with 195 additions and 51 deletions

View File

@@ -83,7 +83,7 @@
# set the optional priority packages to be installed before the rest of the required packages,
# In case a package installation fails, the package will be ignored,
# and the virtual environment process will continue
# priority_optional_packages: ["pygobject", ]
priority_optional_packages: ["pygobject", ]
# set the post packages to be installed after all the rest of the required packages
# post_packages: ["horovod", ]

View File

@@ -139,8 +139,8 @@ from clearml_agent.helper.singleton import Singleton
from clearml_agent.session import Session
from .events import Events
DOCKER_ROOT_CONF_FILE = "/root/clearml.conf"
DOCKER_DEFAULT_CONF_FILE = "/root/default_clearml.conf"
DOCKER_ROOT_CONF_FILE = "/tmp/clearml.conf" # assuming we can always access/mount this file
DOCKER_DEFAULT_CONF_FILE = "~/default_clearml.conf"
sys_random = random.SystemRandom()
@@ -1124,7 +1124,13 @@ class Worker(ServiceCommandSection):
raise ValueError("Dynamic GPU allocation is not supported by the ClearML-server")
available_gpus = [prop["value"] for prop in available_gpus if prop["key"] == 'available_gpus']
if available_gpus:
available_gpus = [int(g) for g in available_gpus[-1].split(',')]
gpus = []
for g in available_gpus[-1].split(','):
try:
gpus += [int(g.strip())]
except (ValueError, TypeError):
print("INFO: failed parsing GPU int('{}') - skipping".format(g))
available_gpus = gpus
if not isinstance(gpu_queues, dict):
gpu_queues = dict(gpu_queues)
@@ -1285,12 +1291,9 @@ class Worker(ServiceCommandSection):
raise ValueError("Running in Docker mode, 'docker' command was not found")
self._worker_tags = kwargs.get('child_report_tags', None)
self._impersonate_as_task_owner = kwargs.get('use_owner_token', False)
if self._impersonate_as_task_owner:
if not self._session.check_min_api_version("2.14"):
raise ValueError("Server does not support --use-owner-token option (incompatible API version)")
if self._session.feature_set == "basic":
raise ValueError("Server does not support --use-owner-token option")
self._use_owner_token(kwargs.get('use_owner_token', False))
self._standalone_mode = kwargs.get('standalone_mode', False)
self._services_mode = kwargs.get('services_mode', False)
# must have docker in services_mode
@@ -1999,7 +2002,7 @@ class Worker(ServiceCommandSection):
if entry_point == "clone_task" or entry_point == "reuse_task":
change = 'ENTRYPOINT if [ ! -s "{trains_conf}" ] ; then ' \
'cp {default_trains_conf} {trains_conf} ; ' \
'cp {default_trains_conf} {trains_conf} && export CLEARML_CONFIG_FILE={trains_conf}; ' \
' fi ; clearml-agent execute --id {task_id} --standalone-mode {clone}'.format(
default_trains_conf=DOCKER_DEFAULT_CONF_FILE,
trains_conf=DOCKER_ROOT_CONF_FILE,
@@ -2377,6 +2380,10 @@ class Worker(ServiceCommandSection):
if sys.getfilesystemencoding() == 'ascii' and not os.environ.get("PYTHONIOENCODING"):
os.environ["PYTHONIOENCODING"] = "utf-8"
# check if we need to update backwards compatible OS environment
if not os.environ.get("TRAINS_CONFIG_FILE") and os.environ.get("CLEARML_CONFIG_FILE"):
os.environ["TRAINS_CONFIG_FILE"] = os.environ.get("CLEARML_CONFIG_FILE")
print("Starting Task Execution:\n".format(current_task.id))
exit_code = -1
try:
@@ -2977,6 +2984,11 @@ class Worker(ServiceCommandSection):
- a new working directory (replacing the working_dir in the task's script section)
- a requirements manager instance
"""
os.environ["CLEARML_TASK_SCRIPT_ENTRY"] = execution.entry_point
os.environ["CLEARML_TASK_WORKING_DIR"] = execution.working_dir
os.environ["CLEARML_VENV_PATH"] = str(venv_folder)
os.environ["CLEARML_GIT_ROOT"] = git_root
script = os.path.expanduser(os.path.expandvars(script))
try:
@@ -2994,10 +3006,6 @@ class Worker(ServiceCommandSection):
task.to_dict(), separators=(',', ':'), default=str
)
os.environ["CLEARML_CUSTOM_BUILD_OUTPUT"] = script_output_file.name
os.environ["CLEARML_TASK_SCRIPT_ENTRY"] = execution.entry_point
os.environ["CLEARML_TASK_WORKING_DIR"] = execution.working_dir
os.environ["CLEARML_VENV_PATH"] = str(venv_folder)
os.environ["CLEARML_GIT_ROOT"] = git_root
try:
subprocess.check_call([script])
@@ -3009,7 +3017,7 @@ class Worker(ServiceCommandSection):
output = Path(script_output_file.name).read_text()
if not output:
raise SkippedCustomBuildScript("Build script {} is not found".format(script))
raise SkippedCustomBuildScript("Build script {} did not return any output".format(script))
try:
output = json.loads(output)
@@ -3686,6 +3694,7 @@ class Worker(ServiceCommandSection):
base_cmd += (
(['--name', name] if name else []) +
['-v', conf_file+':'+DOCKER_ROOT_CONF_FILE] +
['-e', "CLEARML_CONFIG_FILE={}".format(DOCKER_ROOT_CONF_FILE)] +
(['-v', host_ssh_cache+':'+mount_ssh] if host_ssh_cache else []) +
(['-v', host_apt_cache+':'+mount_apt_cache] if host_apt_cache else []) +
(['-v', host_pip_cache+':'+mount_pip_cache] if host_pip_cache else []) +
@@ -3776,12 +3785,17 @@ class Worker(ServiceCommandSection):
# patch venv folder to new location
script_dir = script_dir.replace(venv_folder, new_venv_folder)
# New command line execution
command = RunasArgv('bash', '-c', 'HOME=\"{}\" PATH=\"{}\" PYTHONPATH=\"{}\" TRAINS_CONFIG_FILE={} {}'.format(
home_folder,
os.environ.get('PATH', '').replace(venv_folder, new_venv_folder),
os.environ.get('PYTHONPATH', '').replace(venv_folder, new_venv_folder),
user_trains_conf,
command.serialize().replace(venv_folder, new_venv_folder)))
command = RunasArgv(
'bash', '-c',
'HOME=\"{}\" PATH=\"{}\" PYTHONPATH=\"{}\" '
'TRAINS_CONFIG_FILE={} CLEARML_CONFIG_FILE={} {}'.format(
home_folder,
os.environ.get('PATH', '').replace(venv_folder, new_venv_folder),
os.environ.get('PYTHONPATH', '').replace(venv_folder, new_venv_folder),
user_trains_conf, user_trains_conf,
command.serialize().replace(venv_folder, new_venv_folder)
)
)
command.set_uid(user_uid=user_uid, user_gid=user_uid)
return command, script_dir
@@ -3947,6 +3961,14 @@ class Worker(ServiceCommandSection):
# type: (str) -> bool
return re.fullmatch(r"^[a-zA-Z0-9][a-zA-Z0-9_.-]+$", name) is not None
def _use_owner_token(self, use_owner_token=False):
self._impersonate_as_task_owner = use_owner_token
if self._impersonate_as_task_owner:
if not self._session.check_min_api_version("2.14"):
raise ValueError("Server does not support --use-owner-token option (incompatible API version)")
if self._session.feature_set == "basic":
raise ValueError("Server does not support --use-owner-token option")
if __name__ == "__main__":
pass

View File

@@ -1,6 +1,9 @@
import os
import re
import warnings
from clearml_agent.definitions import PIP_EXTRA_INDICES
from .requirement import Requirement
@@ -42,9 +45,14 @@ def parse(reqstr, cwd=None):
yield requirement
elif line.startswith('-f') or line.startswith('--find-links') or \
line.startswith('-i') or line.startswith('--index-url') or \
line.startswith('--extra-index-url') or \
line.startswith('--no-index'):
warnings.warn('Private repos not supported. Skipping.')
elif line.startswith('--extra-index-url'):
extra_index = line[len('--extra-index-url'):].strip()
extra_index = re.sub(r"\s+#.*$", "", extra_index) # strip comments
if extra_index and extra_index not in PIP_EXTRA_INDICES:
PIP_EXTRA_INDICES.append(extra_index)
print(f"appended {extra_index} to list of extra pip indices")
continue
elif line.startswith('-Z') or line.startswith('--always-unzip'):
warnings.warn('Unused option --always-unzip. Skipping.')

View File

@@ -18,7 +18,7 @@ from typing import Text, List, Callable, Any, Collection, Optional, Union
import yaml
from clearml_agent.commands.events import Events
from clearml_agent.commands.worker import Worker, get_task_container, set_task_container
from clearml_agent.commands.worker import Worker, get_task_container, set_task_container, get_next_task
from clearml_agent.definitions import ENV_DOCKER_IMAGE
from clearml_agent.errors import APIError
from clearml_agent.helper.base import safe_remove_file
@@ -362,7 +362,7 @@ class K8sIntegration(Worker):
print('Failed getting number of used pods: {}'.format(ex))
return -2
def run_one_task(self, queue: Text, task_id: Text, worker_args=None, **_):
def run_one_task(self, queue: Text, task_id: Text, worker_args=None, task_session=None, **_):
print('Pulling task {} launching on kubernetes cluster'.format(task_id))
task_data = self._session.api_client.tasks.get_all(id=[task_id])[0]
@@ -398,11 +398,19 @@ class K8sIntegration(Worker):
self.conf_file_content
or Path(self._session._config_file).read_text()
).encode("ascii")
create_clearml_conf = "echo '{}' | base64 --decode >> ~/clearml.conf".format(
create_clearml_conf = ["echo '{}' | base64 --decode >> ~/clearml.conf".format(
base64.b64encode(
hocon_config_encoded
).decode('ascii')
)
)]
if task_session:
create_clearml_conf.append(
"export CLEARML_AUTH_TOKEN=$(echo '{}' | base64 --decode)".format(
base64.b64encode(task_session.token.encode("ascii")).decode('ascii')
)
)
if self.ports_mode:
print("Kubernetes looking for available pod to use")
@@ -594,19 +602,22 @@ class K8sIntegration(Worker):
extra_docker_bash_script=extra_docker_bash_script)
for line in container_bash_script])
create_init_script = \
"echo '{}' | base64 --decode >> ~/__start_agent__.sh ; " \
extra_bash_commands = list(create_clearml_conf or [])
extra_bash_commands.append(
"echo '{}' | base64 --decode >> ~/__start_agent__.sh ; "
"/bin/bash ~/__start_agent__.sh".format(
base64.b64encode(
script_encoded.encode('ascii')
).decode('ascii'))
)
# Notice: we always leave with exit code 0, so pods are never restarted
container = self._merge_containers(
container,
dict(name=name, image=docker_image,
command=['/bin/bash'],
args=['-c', '{} ; {} ; exit 0'.format(create_clearml_conf, create_init_script)])
args=['-c', '{} ; exit 0'.format(' ; '.join(extra_bash_commands))])
)
if template['spec']['containers']:
@@ -685,7 +696,7 @@ class K8sIntegration(Worker):
"--",
"/bin/sh",
"-c",
"{} ; {}".format(create_clearml_conf, container_bash_script.format(
"{} ; {}".format(" ; ".join(create_clearml_conf or []), container_bash_script.format(
extra_bash_init_cmd=self.extra_bash_init_script or "",
extra_docker_bash_script=docker_bash or "",
task_id=task_id
@@ -742,14 +753,16 @@ class K8sIntegration(Worker):
# get next task in queue
try:
response = self._session.api_client.queues.get_next_task(queue=queue)
response = get_next_task(
self._session, queue=queue, get_task_info=self._impersonate_as_task_owner
)
except Exception as e:
print("Warning: Could not access task queue [{}], error: {}".format(queue, e))
continue
else:
try:
task_id = response.entry.task
except AttributeError:
task_id = response["entry"]["task"]
except (KeyError, TypeError, AttributeError):
print("No tasks in queue {}".format(queue))
continue
events_service.send_log_events(
@@ -761,8 +774,26 @@ class K8sIntegration(Worker):
level="INFO",
)
task_session = None
if self._impersonate_as_task_owner:
try:
task_user = response["task_info"]["user"]
task_company = response["task_info"]["company"]
except (KeyError, TypeError, AttributeError):
print("Error: cannot retrieve owner user for the task '{}', skipping".format(task_id))
continue
task_session = self.get_task_session(task_user, task_company)
if not task_session:
print(
"Error: Could not login as the user '{}' for the task '{}', skipping".format(
task_user, task_id
)
)
continue
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
self.run_one_task(queue, task_id, worker_params)
self.run_one_task(queue, task_id, worker_params, task_session)
self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues))
break
else:
@@ -773,7 +804,7 @@ class K8sIntegration(Worker):
if self._session.config["agent.reload_config"]:
self.reload_config()
def k8s_daemon(self, queue):
def k8s_daemon(self, queue, **kwargs):
"""
Start the k8s Glue service.
This service will be pulling tasks from *queue* and scheduling them for execution using kubectl.
@@ -784,8 +815,10 @@ class K8sIntegration(Worker):
:param list(str) queue: queue name to pull from
"""
return self.daemon(queues=[ObjectID(name=queue)] if queue else None,
log_level=logging.INFO, foreground=True, docker=False)
return self.daemon(
queues=[ObjectID(name=queue)] if queue else None,
log_level=logging.INFO, foreground=True, docker=False, **kwargs,
)
@classmethod
def get_ssh_server_bash(cls, ssh_port_number):

View File

@@ -95,7 +95,8 @@ class ExternalRequirements(SimpleSubstitution):
vcs._set_ssh_url()
new_req_line = 'git+{}{}{}'.format(
'' if scheme and '://' in vcs.url else scheme,
vcs.url_with_auth, fragment
vcs_url if session.config.get('agent.force_git_ssh_protocol', None) else vcs.url_with_auth,
fragment
)
if new_req_line != req_line:
furl_line = furl(new_req_line)

View File

@@ -1,3 +1,4 @@
import re
from typing import Text
from .base import PackageManager
@@ -11,13 +12,14 @@ class PriorityPackageRequirement(SimpleSubstitution):
def __init__(self, *args, **kwargs):
super(PriorityPackageRequirement, self).__init__(*args, **kwargs)
self._replaced_packages = {}
# check if we need to replace the packages:
priority_packages = self.config.get('agent.package_manager.priority_packages', None)
if priority_packages:
self.__class__.name = priority_packages
self.__class__.name = [p.lower() for p in priority_packages]
priority_optional_packages = self.config.get('agent.package_manager.priority_optional_packages', None)
if priority_optional_packages:
self.__class__.optional_package_names = priority_optional_packages
self.__class__.optional_package_names = [p.lower() for p in priority_optional_packages]
def match(self, req):
# match both Cython & cython
@@ -28,7 +30,9 @@ class PriorityPackageRequirement(SimpleSubstitution):
Replace a requirement
:raises: ValueError if version is pre-release
"""
if req.name in self.optional_package_names:
self._replaced_packages[req.name] = req.line
if req.name.lower() in self.optional_package_names:
# noinspection PyBroadException
try:
if PackageManager.out_of_scope_install_package(str(req)):
@@ -39,6 +43,41 @@ class PriorityPackageRequirement(SimpleSubstitution):
PackageManager.out_of_scope_install_package(str(req))
return Text(req)
def replace_back(self, list_of_requirements):
"""
:param list_of_requirements: {'pip': ['a==1.0', ]}
:return: {'pip': ['a==1.0', ]}
"""
# if we replaced setuptools, it means someone requested it, and since freeze will not contain it,
# we need to add it manually
if not self._replaced_packages or "setuptools" not in self._replaced_packages:
return list_of_requirements
try:
for k, lines in list_of_requirements.items():
# k is either pip/conda
if k not in ('pip', 'conda'):
continue
for i, line in enumerate(lines):
if not line or line.lstrip().startswith('#'):
continue
parts = [p for p in re.split(r'\s|=|\.|<|>|~|!|@|#', line) if p]
if not parts:
continue
# if we found setuptools, do nothing
if parts[0] == "setuptools":
return list_of_requirements
# if we are here it means we have not found setuptools
# we should add it:
if "pip" in list_of_requirements:
list_of_requirements["pip"] = [self._replaced_packages["setuptools"]] + list_of_requirements["pip"]
except Exception as ex: # noqa
return list_of_requirements
return list_of_requirements
class PackageCollectorRequirement(SimpleSubstitution):
"""

View File

@@ -7,13 +7,14 @@ from furl import furl
import urllib.parse
from operator import itemgetter
from html.parser import HTMLParser
from typing import Text
from typing import Text, Optional, Dict
import attr
import requests
import six
from .requirements import SimpleSubstitution, FatalSpecsResolutionError, SimpleVersion
from .requirements import SimpleSubstitution, FatalSpecsResolutionError, SimpleVersion, MarkerRequirement
from ...external.requirements_parser.requirement import Requirement
OS_TO_WHEEL_NAME = {"linux": "linux_x86_64", "windows": "win_amd64"}
@@ -179,6 +180,7 @@ class PytorchRequirement(SimpleSubstitution):
self.python_version_string = None
self.python_major_minor_str = None
self.python = None
self._fix_setuptools = None
self.exceptions = []
self._original_req = []
@@ -366,6 +368,10 @@ class PytorchRequirement(SimpleSubstitution):
else:
print('Trying PyTorch CUDA version {} support'.format(torch_url_key))
# fix broken pytorch setuptools incompatibility
if closest_matched_version and SimpleVersion.compare_versions(closest_matched_version, "<", "1.11.0"):
self._fix_setuptools = "setuptools < 59"
if not url:
url = PytorchWheel(
torch_version=fix_version(version),
@@ -506,7 +512,7 @@ class PytorchRequirement(SimpleSubstitution):
for i, line in enumerate(lines):
if not line or line.lstrip().startswith('#'):
continue
parts = [p for p in re.split('\s|=|\.|<|>|~|!|@|#', line) if p]
parts = [p for p in re.split(r'\s|=|\.|<|>|~|!|@|#', line) if p]
if not parts:
continue
for req, new_req in self._original_req:
@@ -528,6 +534,16 @@ class PytorchRequirement(SimpleSubstitution):
return list_of_requirements
def post_scan_add_req(self): # type: () -> Optional[MarkerRequirement]
"""
Allows the RequirementSubstitution to add an extra line/requirements after
the initial requirements scan is completed.
Called only once per requirements.txt object
"""
if self._fix_setuptools:
return MarkerRequirement(Requirement.parse(self._fix_setuptools))
return None
MAP = {
"windows": {
"cuda100": {

View File

@@ -628,10 +628,23 @@ class RequirementsManager(object):
result = list(result)
# add post scan add requirements call back
double_req_set = None
for h in self.handlers:
req = h.post_scan_add_req()
if req:
result.append(req.tostr())
reqs = h.post_scan_add_req()
if reqs:
if double_req_set is None:
def safe_parse_name(line):
try:
return Requirement.parse(line).name
except: # noqa
return None
double_req_set = set([safe_parse_name(r) for r in result if r])
for r in (reqs if isinstance(reqs, (tuple, list)) else [reqs]):
if r and (not r.name or r.name not in double_req_set):
result.append(r.tostr())
elif r:
print("SKIPPING additional auto installed package: \"{}\"".format(r))
return join_lines(result)

View File

@@ -529,6 +529,13 @@ class Git(VCS):
"GIT_SSH_COMMAND": "ssh -oBatchMode=yes",
}
def __init__(self, *args, **kwargs):
super(Git, self).__init__(*args, **kwargs)
try:
self.call("config", "--global", "--replace-all", "safe.directory", "*", cwd=self.location)
except: # noqa
pass
@staticmethod
def remote_branch_name(branch):
return [

View File

@@ -1 +1 @@
__version__ = '1.2.3'
__version__ = '1.3.0'

View File

@@ -65,6 +65,10 @@ def parse_args():
help="Limit the maximum number of pods that this service can run at the same time."
"Should not be used with ports-mode"
)
parser.add_argument(
"--use-owner-token", action="store_true", default=False,
help="Generate and use task owner token for the execution of each task"
)
return parser.parse_args()
@@ -87,7 +91,7 @@ def main():
ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None,
namespace=args.namespace, max_pods_limit=args.max_pods or None,
)
k8s.k8s_daemon(args.queue)
k8s.k8s_daemon(args.queue, use_owner_token=args.use_owner_token)
if __name__ == "__main__":

View File

@@ -8,7 +8,7 @@ psutil>=3.4.2,<5.9.0
pyhocon>=0.3.38,<0.4.0
pyparsing>=2.0.3,<2.5.0
python-dateutil>=2.4.2,<2.9.0
pyjwt>=1.6.4,<2.1.0
pyjwt>=2.4.0,<2.5.0
PyYAML>=3.12,<5.5.0
requests>=2.20.0,<2.26.0
six>=1.13.0,<1.16.0

View File

@@ -61,6 +61,7 @@ setup(
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'License :: OSI Approved :: Apache Software License',
],