Compare commits

...

9 Commits

Author SHA1 Message Date
allegroai
c08e2ac0bb Fix clearml.conf access in non-root containers 2022-05-05 12:23:11 +03:00
allegroai
335ef91d8e Fix git unsafe directory issue (disable check on cached vcs folder) 2022-05-05 12:22:40 +03:00
allegroai
6c7a639673 Fix broken pytorch setuptools incompatibility (force setuptools < 59 if torch is below 1.11) 2022-05-05 12:22:13 +03:00
allegroai
5f77cad5ac Fix error message 2022-04-27 15:36:39 +03:00
allegroai
0228ae0494 Set environment variables before expanding path 2022-04-27 15:14:16 +03:00
allegroai
165677e800 Version bump 2022-04-27 14:59:51 +03:00
allegroai
2e5298b737 Add support for use-owner-token in k8s glue 2022-04-27 14:59:27 +03:00
allegroai
c9ffb8a053 Version bump 2022-04-20 08:57:16 +03:00
allegroai
2466eed23f Fix dynamic GPUs with "all" GPUs on he same worker 2022-04-20 08:56:22 +03:00
7 changed files with 138 additions and 43 deletions

View File

@@ -139,8 +139,8 @@ from clearml_agent.helper.singleton import Singleton
from clearml_agent.session import Session
from .events import Events
DOCKER_ROOT_CONF_FILE = "/root/clearml.conf"
DOCKER_DEFAULT_CONF_FILE = "/root/default_clearml.conf"
DOCKER_ROOT_CONF_FILE = "/tmp/clearml.conf" # assuming we can always access/mount this file
DOCKER_DEFAULT_CONF_FILE = "~/default_clearml.conf"
sys_random = random.SystemRandom()
@@ -1124,7 +1124,13 @@ class Worker(ServiceCommandSection):
raise ValueError("Dynamic GPU allocation is not supported by the ClearML-server")
available_gpus = [prop["value"] for prop in available_gpus if prop["key"] == 'available_gpus']
if available_gpus:
available_gpus = [int(g) for g in available_gpus[-1].split(',')]
gpus = []
for g in available_gpus[-1].split(','):
try:
gpus += [int(g.strip())]
except (ValueError, TypeError):
print("INFO: failed parsing GPU int('{}') - skipping".format(g))
available_gpus = gpus
if not isinstance(gpu_queues, dict):
gpu_queues = dict(gpu_queues)
@@ -1285,12 +1291,9 @@ class Worker(ServiceCommandSection):
raise ValueError("Running in Docker mode, 'docker' command was not found")
self._worker_tags = kwargs.get('child_report_tags', None)
self._impersonate_as_task_owner = kwargs.get('use_owner_token', False)
if self._impersonate_as_task_owner:
if not self._session.check_min_api_version("2.14"):
raise ValueError("Server does not support --use-owner-token option (incompatible API version)")
if self._session.feature_set == "basic":
raise ValueError("Server does not support --use-owner-token option")
self._use_owner_token(kwargs.get('use_owner_token', False))
self._standalone_mode = kwargs.get('standalone_mode', False)
self._services_mode = kwargs.get('services_mode', False)
# must have docker in services_mode
@@ -1999,7 +2002,7 @@ class Worker(ServiceCommandSection):
if entry_point == "clone_task" or entry_point == "reuse_task":
change = 'ENTRYPOINT if [ ! -s "{trains_conf}" ] ; then ' \
'cp {default_trains_conf} {trains_conf} ; ' \
'cp {default_trains_conf} {trains_conf} && export CLEARML_CONFIG_FILE={trains_conf}; ' \
' fi ; clearml-agent execute --id {task_id} --standalone-mode {clone}'.format(
default_trains_conf=DOCKER_DEFAULT_CONF_FILE,
trains_conf=DOCKER_ROOT_CONF_FILE,
@@ -2377,6 +2380,10 @@ class Worker(ServiceCommandSection):
if sys.getfilesystemencoding() == 'ascii' and not os.environ.get("PYTHONIOENCODING"):
os.environ["PYTHONIOENCODING"] = "utf-8"
# check if we need to update backwards compatible OS environment
if not os.environ.get("TRAINS_CONFIG_FILE") and os.environ.get("CLEARML_CONFIG_FILE"):
os.environ["TRAINS_CONFIG_FILE"] = os.environ.get("CLEARML_CONFIG_FILE")
print("Starting Task Execution:\n".format(current_task.id))
exit_code = -1
try:
@@ -2977,6 +2984,11 @@ class Worker(ServiceCommandSection):
- a new working directory (replacing the working_dir in the task's script section)
- a requirements manager instance
"""
os.environ["CLEARML_TASK_SCRIPT_ENTRY"] = execution.entry_point
os.environ["CLEARML_TASK_WORKING_DIR"] = execution.working_dir
os.environ["CLEARML_VENV_PATH"] = str(venv_folder)
os.environ["CLEARML_GIT_ROOT"] = git_root
script = os.path.expanduser(os.path.expandvars(script))
try:
@@ -2994,10 +3006,6 @@ class Worker(ServiceCommandSection):
task.to_dict(), separators=(',', ':'), default=str
)
os.environ["CLEARML_CUSTOM_BUILD_OUTPUT"] = script_output_file.name
os.environ["CLEARML_TASK_SCRIPT_ENTRY"] = execution.entry_point
os.environ["CLEARML_TASK_WORKING_DIR"] = execution.working_dir
os.environ["CLEARML_VENV_PATH"] = str(venv_folder)
os.environ["CLEARML_GIT_ROOT"] = git_root
try:
subprocess.check_call([script])
@@ -3009,7 +3017,7 @@ class Worker(ServiceCommandSection):
output = Path(script_output_file.name).read_text()
if not output:
raise SkippedCustomBuildScript("Build script {} is not found".format(script))
raise SkippedCustomBuildScript("Build script {} did not return any output".format(script))
try:
output = json.loads(output)
@@ -3686,6 +3694,7 @@ class Worker(ServiceCommandSection):
base_cmd += (
(['--name', name] if name else []) +
['-v', conf_file+':'+DOCKER_ROOT_CONF_FILE] +
['-e', "CLEARML_CONFIG_FILE={}".format(DOCKER_ROOT_CONF_FILE)] +
(['-v', host_ssh_cache+':'+mount_ssh] if host_ssh_cache else []) +
(['-v', host_apt_cache+':'+mount_apt_cache] if host_apt_cache else []) +
(['-v', host_pip_cache+':'+mount_pip_cache] if host_pip_cache else []) +
@@ -3776,12 +3785,17 @@ class Worker(ServiceCommandSection):
# patch venv folder to new location
script_dir = script_dir.replace(venv_folder, new_venv_folder)
# New command line execution
command = RunasArgv('bash', '-c', 'HOME=\"{}\" PATH=\"{}\" PYTHONPATH=\"{}\" TRAINS_CONFIG_FILE={} {}'.format(
home_folder,
os.environ.get('PATH', '').replace(venv_folder, new_venv_folder),
os.environ.get('PYTHONPATH', '').replace(venv_folder, new_venv_folder),
user_trains_conf,
command.serialize().replace(venv_folder, new_venv_folder)))
command = RunasArgv(
'bash', '-c',
'HOME=\"{}\" PATH=\"{}\" PYTHONPATH=\"{}\" '
'TRAINS_CONFIG_FILE={} CLEARML_CONFIG_FILE={} {}'.format(
home_folder,
os.environ.get('PATH', '').replace(venv_folder, new_venv_folder),
os.environ.get('PYTHONPATH', '').replace(venv_folder, new_venv_folder),
user_trains_conf, user_trains_conf,
command.serialize().replace(venv_folder, new_venv_folder)
)
)
command.set_uid(user_uid=user_uid, user_gid=user_uid)
return command, script_dir
@@ -3947,6 +3961,14 @@ class Worker(ServiceCommandSection):
# type: (str) -> bool
return re.fullmatch(r"^[a-zA-Z0-9][a-zA-Z0-9_.-]+$", name) is not None
def _use_owner_token(self, use_owner_token=False):
self._impersonate_as_task_owner = use_owner_token
if self._impersonate_as_task_owner:
if not self._session.check_min_api_version("2.14"):
raise ValueError("Server does not support --use-owner-token option (incompatible API version)")
if self._session.feature_set == "basic":
raise ValueError("Server does not support --use-owner-token option")
if __name__ == "__main__":
pass

View File

@@ -18,7 +18,7 @@ from typing import Text, List, Callable, Any, Collection, Optional, Union
import yaml
from clearml_agent.commands.events import Events
from clearml_agent.commands.worker import Worker, get_task_container, set_task_container
from clearml_agent.commands.worker import Worker, get_task_container, set_task_container, get_next_task
from clearml_agent.definitions import ENV_DOCKER_IMAGE
from clearml_agent.errors import APIError
from clearml_agent.helper.base import safe_remove_file
@@ -362,7 +362,7 @@ class K8sIntegration(Worker):
print('Failed getting number of used pods: {}'.format(ex))
return -2
def run_one_task(self, queue: Text, task_id: Text, worker_args=None, **_):
def run_one_task(self, queue: Text, task_id: Text, worker_args=None, task_session=None, **_):
print('Pulling task {} launching on kubernetes cluster'.format(task_id))
task_data = self._session.api_client.tasks.get_all(id=[task_id])[0]
@@ -398,11 +398,19 @@ class K8sIntegration(Worker):
self.conf_file_content
or Path(self._session._config_file).read_text()
).encode("ascii")
create_clearml_conf = "echo '{}' | base64 --decode >> ~/clearml.conf".format(
create_clearml_conf = ["echo '{}' | base64 --decode >> ~/clearml.conf".format(
base64.b64encode(
hocon_config_encoded
).decode('ascii')
)
)]
if task_session:
create_clearml_conf.append(
"export CLEARML_AUTH_TOKEN=$(echo '{}' | base64 --decode)".format(
base64.b64encode(task_session.token.encode("ascii")).decode('ascii')
)
)
if self.ports_mode:
print("Kubernetes looking for available pod to use")
@@ -594,19 +602,22 @@ class K8sIntegration(Worker):
extra_docker_bash_script=extra_docker_bash_script)
for line in container_bash_script])
create_init_script = \
"echo '{}' | base64 --decode >> ~/__start_agent__.sh ; " \
extra_bash_commands = list(create_clearml_conf or [])
extra_bash_commands.append(
"echo '{}' | base64 --decode >> ~/__start_agent__.sh ; "
"/bin/bash ~/__start_agent__.sh".format(
base64.b64encode(
script_encoded.encode('ascii')
).decode('ascii'))
)
# Notice: we always leave with exit code 0, so pods are never restarted
container = self._merge_containers(
container,
dict(name=name, image=docker_image,
command=['/bin/bash'],
args=['-c', '{} ; {} ; exit 0'.format(create_clearml_conf, create_init_script)])
args=['-c', '{} ; exit 0'.format(' ; '.join(extra_bash_commands))])
)
if template['spec']['containers']:
@@ -685,7 +696,7 @@ class K8sIntegration(Worker):
"--",
"/bin/sh",
"-c",
"{} ; {}".format(create_clearml_conf, container_bash_script.format(
"{} ; {}".format(" ; ".join(create_clearml_conf or []), container_bash_script.format(
extra_bash_init_cmd=self.extra_bash_init_script or "",
extra_docker_bash_script=docker_bash or "",
task_id=task_id
@@ -742,14 +753,16 @@ class K8sIntegration(Worker):
# get next task in queue
try:
response = self._session.api_client.queues.get_next_task(queue=queue)
response = get_next_task(
self._session, queue=queue, get_task_info=self._impersonate_as_task_owner
)
except Exception as e:
print("Warning: Could not access task queue [{}], error: {}".format(queue, e))
continue
else:
try:
task_id = response.entry.task
except AttributeError:
task_id = response["entry"]["task"]
except (KeyError, TypeError, AttributeError):
print("No tasks in queue {}".format(queue))
continue
events_service.send_log_events(
@@ -761,8 +774,26 @@ class K8sIntegration(Worker):
level="INFO",
)
task_session = None
if self._impersonate_as_task_owner:
try:
task_user = response["task_info"]["user"]
task_company = response["task_info"]["company"]
except (KeyError, TypeError, AttributeError):
print("Error: cannot retrieve owner user for the task '{}', skipping".format(task_id))
continue
task_session = self.get_task_session(task_user, task_company)
if not task_session:
print(
"Error: Could not login as the user '{}' for the task '{}', skipping".format(
task_user, task_id
)
)
continue
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
self.run_one_task(queue, task_id, worker_params)
self.run_one_task(queue, task_id, worker_params, task_session)
self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues))
break
else:
@@ -773,7 +804,7 @@ class K8sIntegration(Worker):
if self._session.config["agent.reload_config"]:
self.reload_config()
def k8s_daemon(self, queue):
def k8s_daemon(self, queue, **kwargs):
"""
Start the k8s Glue service.
This service will be pulling tasks from *queue* and scheduling them for execution using kubectl.
@@ -784,8 +815,10 @@ class K8sIntegration(Worker):
:param list(str) queue: queue name to pull from
"""
return self.daemon(queues=[ObjectID(name=queue)] if queue else None,
log_level=logging.INFO, foreground=True, docker=False)
return self.daemon(
queues=[ObjectID(name=queue)] if queue else None,
log_level=logging.INFO, foreground=True, docker=False, **kwargs,
)
@classmethod
def get_ssh_server_bash(cls, ssh_port_number):

View File

@@ -7,13 +7,14 @@ from furl import furl
import urllib.parse
from operator import itemgetter
from html.parser import HTMLParser
from typing import Text
from typing import Text, Optional
import attr
import requests
import six
from .requirements import SimpleSubstitution, FatalSpecsResolutionError, SimpleVersion
from .requirements import SimpleSubstitution, FatalSpecsResolutionError, SimpleVersion, MarkerRequirement
from ...external.requirements_parser.requirement import Requirement
OS_TO_WHEEL_NAME = {"linux": "linux_x86_64", "windows": "win_amd64"}
@@ -179,6 +180,7 @@ class PytorchRequirement(SimpleSubstitution):
self.python_version_string = None
self.python_major_minor_str = None
self.python = None
self._fix_setuptools = None
self.exceptions = []
self._original_req = []
@@ -366,6 +368,10 @@ class PytorchRequirement(SimpleSubstitution):
else:
print('Trying PyTorch CUDA version {} support'.format(torch_url_key))
# fix broken pytorch setuptools incompatibility
if closest_matched_version and SimpleVersion.compare_versions(closest_matched_version, "<", "1.11.0"):
self._fix_setuptools = "setuptools < 59"
if not url:
url = PytorchWheel(
torch_version=fix_version(version),
@@ -528,6 +534,16 @@ class PytorchRequirement(SimpleSubstitution):
return list_of_requirements
def post_scan_add_req(self): # type: () -> Optional[MarkerRequirement]
"""
Allows the RequirementSubstitution to add an extra line/requirements after
the initial requirements scan is completed.
Called only once per requirements.txt object
"""
if self._fix_setuptools:
return MarkerRequirement(Requirement.parse(self._fix_setuptools))
return None
MAP = {
"windows": {
"cuda100": {

View File

@@ -628,10 +628,23 @@ class RequirementsManager(object):
result = list(result)
# add post scan add requirements call back
double_req_set = None
for h in self.handlers:
req = h.post_scan_add_req()
if req:
result.append(req.tostr())
reqs = h.post_scan_add_req()
if reqs:
if double_req_set is None:
def safe_parse_name(line):
try:
return Requirement.parse(line).name
except: # noqa
return None
double_req_set = set([safe_parse_name(r) for r in result if r])
for r in (reqs if isinstance(reqs, (tuple, list)) else [reqs]):
if r and (not r.name or r.name not in double_req_set):
result.append(r.tostr())
elif r:
print("SKIPPING additional auto installed package: \"{}\"".format(r))
return join_lines(result)

View File

@@ -529,6 +529,13 @@ class Git(VCS):
"GIT_SSH_COMMAND": "ssh -oBatchMode=yes",
}
def __init__(self, *args, **kwargs):
super(Git, self).__init__(*args, **kwargs)
try:
self.call("config", "--global", "--replace-all", "safe.directory", "*", cwd=self.location)
except: # noqa
pass
@staticmethod
def remote_branch_name(branch):
return [

View File

@@ -1 +1 @@
__version__ = '1.2.3'
__version__ = '1.2.4rc1'

View File

@@ -65,6 +65,10 @@ def parse_args():
help="Limit the maximum number of pods that this service can run at the same time."
"Should not be used with ports-mode"
)
parser.add_argument(
"--use-owner-token", action="store_true", default=False,
help="Generate and use task owner token for the execution of each task"
)
return parser.parse_args()
@@ -87,7 +91,7 @@ def main():
ssh_port_number=args.ssh_server_port) if args.ssh_server_port else None,
namespace=args.namespace, max_pods_limit=args.max_pods or None,
)
k8s.k8s_daemon(args.queue)
k8s.k8s_daemon(args.queue, use_owner_token=args.use_owner_token)
if __name__ == "__main__":