From 8adbf5ae0003d4c0b879dd9f232f818146d772da Mon Sep 17 00:00:00 2001 From: pollfly <75068813+pollfly@users.noreply.github.com> Date: Thu, 27 Jun 2024 17:42:41 +0300 Subject: [PATCH 01/17] Edit docstring (#1290) --- clearml/backend_api/services/v2_23/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clearml/backend_api/services/v2_23/tasks.py b/clearml/backend_api/services/v2_23/tasks.py index ee87b000..34a2943a 100644 --- a/clearml/backend_api/services/v2_23/tasks.py +++ b/clearml/backend_api/services/v2_23/tasks.py @@ -8070,7 +8070,7 @@ class GetAllRequest(Request): :param parent: Parent ID :type parent: str :param status_changed: List of status changed constraint strings (utcformat, - with an optional prefix modifier (\>,\>=, \<, \<=) + epoch) with an optional prefix modifier (\>,\>=, \<, \<=) :type status_changed: Sequence[str] :param search_text: Free text search query :type search_text: str @@ -8219,7 +8219,7 @@ class GetAllRequest(Request): "status_changed": { "description": ( "List of status changed constraint strings, or a single string (utcformat, epoch) with an optional prefix modifier " - "(\>, \>=, \<, \<=)" + "(\>,\>=, \<, \<=)" ), "items": {"pattern": "^(>=|>|<=|<)?.*$", "type": "string"}, "type": ["string", "array", "null"], From eb09b4646c8974074f4173c40c3446bb65ffd768 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sun, 30 Jun 2024 09:17:44 +0300 Subject: [PATCH 02/17] Fix support passing folder to get_script_info() to get the git info --- clearml/backend_interface/task/repo/scriptinfo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clearml/backend_interface/task/repo/scriptinfo.py b/clearml/backend_interface/task/repo/scriptinfo.py index 20b47e78..30f2a850 100644 --- a/clearml/backend_interface/task/repo/scriptinfo.py +++ b/clearml/backend_interface/task/repo/scriptinfo.py @@ -1052,7 +1052,7 @@ class ScriptInfo(object): raise ScriptInfoError("Script file {} could not be found".format(filepaths)) - scripts_dir = [f.parent for f in scripts_path] + scripts_dir = [f if f.is_dir() else f.parent for f in scripts_path] def _log(msg, *args, **kwargs): if not log: From 81b4c49f8b27d21805d9a7b1275f07a12886eafd Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Thu, 4 Jul 2024 15:24:40 +0300 Subject: [PATCH 03/17] Add clearml-task and CreateAndPopulate support for bash scripts, ipynb and python modules. requires clearml-agent 1.9+ --- clearml/backend_interface/task/populate.py | 191 ++++++++++++++++----- clearml/cli/task/__main__.py | 8 +- 2 files changed, 155 insertions(+), 44 deletions(-) diff --git a/clearml/backend_interface/task/populate.py b/clearml/backend_interface/task/populate.py index 12dbb6fc..35af6d77 100644 --- a/clearml/backend_interface/task/populate.py +++ b/clearml/backend_interface/task/populate.py @@ -36,6 +36,7 @@ class CreateAndPopulate(object): commit=None, # type: Optional[str] script=None, # type: Optional[str] working_directory=None, # type: Optional[str] + module=None, # type: Optional[str] packages=None, # type: Optional[Union[bool, Sequence[str]]] requirements_file=None, # type: Optional[Union[str, Path]] docker=None, # type: Optional[str] @@ -67,6 +68,9 @@ class CreateAndPopulate(object): remote git repository the script should be a relative path inside the repository, for example: './source/train.py' . When used with local repository path it supports a direct path to a file inside the local repository itself, for example: '~/project/source/train.py' + :param module: If specified instead of executing `script`, a module named `module` is executed. + Implies script is empty. Module can contain multiple argument for execution, + for example: module="my.module arg1 arg2" :param working_directory: Working directory to launch the script from. Default: repository root folder. Relative to repo root or local folder. :param packages: Manually specify a list of required packages. Example: ["tqdm>=2.1", "scikit-learn"] @@ -92,10 +96,14 @@ class CreateAndPopulate(object): repo = None else: folder = None + + if script and module: + raise ValueError("Entry point script or module need to be specified not both") + if raise_on_missing_entries and not base_task_id: - if not script: + if not script and not module: raise ValueError("Entry point script not provided") - if not repo and not folder and not Path(script).is_file(): + if not repo and not folder and (script and not Path(script).is_file()): raise ValueError("Script file \'{}\' could not be found".format(script)) if raise_on_missing_entries and commit and branch: raise ValueError( @@ -111,6 +119,7 @@ class CreateAndPopulate(object): self.branch = branch self.repo = repo self.script = script + self.module = module self.cwd = working_directory assert not packages or isinstance(packages, (tuple, list, bool)) self.packages = list(packages) if packages is not None and not isinstance(packages, bool) \ @@ -138,21 +147,47 @@ class CreateAndPopulate(object): """ local_entry_file = None repo_info = None + stand_alone_script_outside_repo = False + # populate from local repository / script if self.folder or (self.script and Path(self.script).is_file() and not self.repo): self.folder = os.path.expandvars(os.path.expanduser(self.folder)) if self.folder else None self.script = os.path.expandvars(os.path.expanduser(self.script)) if self.script else None self.cwd = os.path.expandvars(os.path.expanduser(self.cwd)) if self.cwd else None - if Path(self.script).is_file(): - entry_point = self.script - else: - entry_point = (Path(self.folder) / self.script).as_posix() - entry_point = os.path.abspath(entry_point) - if not os.path.isfile(entry_point): - raise ValueError("Script entrypoint file \'{}\' could not be found".format(entry_point)) - local_entry_file = entry_point + if self.module: + entry_point = "-m {}".format(self.module) + # we must have a folder if we are here + local_entry_file = self.folder.rstrip("/") + "/." + else: + if Path(self.script).is_file(): + entry_point = self.script + else: + entry_point = (Path(self.folder) / self.script).as_posix() + + entry_point = os.path.abspath(entry_point) + + try: + if entry_point and Path(entry_point).is_file() and self.folder and Path(self.folder).is_dir(): + # make sure we raise exception if this is outside the local repo folder + entry_point = (Path(entry_point) / (Path(entry_point).relative_to(self.folder))).as_posix() + except ValueError: + entry_point = self.folder + stand_alone_script_outside_repo = True + + if not os.path.isfile(entry_point) and not stand_alone_script_outside_repo: + if (not Path(self.script).is_absolute() and not Path(self.cwd).is_absolute() and + (Path(self.folder) / self.cwd / self.script).is_file()): + entry_point = (Path(self.folder) / self.cwd / self.script).as_posix() + elif (Path(self.cwd).is_absolute() and not Path(self.script).is_absolute() and + (Path(self.cwd) / self.script).is_file()): + entry_point = (Path(self.cwd) / self.script).as_posix() + else: + raise ValueError("Script entrypoint file \'{}\' could not be found".format(entry_point)) + + local_entry_file = entry_point + repo_info, requirements = ScriptInfo.get( - filepaths=[entry_point], + filepaths=[local_entry_file], log=getLogger(), create_requirements=self.packages is True, uncommitted_from_remote=True, @@ -162,6 +197,28 @@ class CreateAndPopulate(object): force_single_script=self.force_single_script_file, ) + if stand_alone_script_outside_repo: + # if we have a standalone script and a local repo we skip[ the local diff and store it + local_entry_file = Path(self.script).as_posix() + a_create_requirements = self.packages is True + a_repo_info, a_requirements = ScriptInfo.get( + filepaths=[Path(self.script).as_posix()], + log=getLogger(), + create_requirements=a_create_requirements, + uncommitted_from_remote=True, + detect_jupyter_notebook=False, + add_missing_installed_packages=True, + detailed_req_report=False, + force_single_script=True, + ) + if repo_info.script['diff']: + print("Warning: local git repo diff is ignored, " + "storing only the standalone script form {}".format(self.script)) + repo_info.script['diff'] = a_repo_info.script['diff'] or '' + repo_info.script['entry_point'] = a_repo_info.script['entry_point'] + if a_create_requirements: + repo_info['requirements'] = a_repo_info.script.get('requirements') or {} + # check if we have no repository and no requirements raise error if self.raise_on_missing_entries and (not self.requirements_file and not self.packages) \ and not self.repo and ( @@ -195,7 +252,7 @@ class CreateAndPopulate(object): # if there is nothing to populate, return if not any([ - self.folder, self.commit, self.branch, self.repo, self.script, self.cwd, + self.folder, self.commit, self.branch, self.repo, self.script, self.module, self.cwd, self.packages, self.requirements_file, self.base_task_id] + (list(self.docker.values())) ): return task @@ -209,31 +266,63 @@ class CreateAndPopulate(object): task_state['script']['diff'] = repo_info.script['diff'] or '' task_state['script']['working_dir'] = repo_info.script['working_dir'] task_state['script']['entry_point'] = repo_info.script['entry_point'] - task_state['script']['binary'] = repo_info.script['binary'] + task_state['script']['binary'] = '/bin/bash' if ( + (repo_info.script['entry_point'] or '').lower().strip().endswith('.sh') and + not (repo_info.script['entry_point'] or '').lower().strip().startswith('-m ')) \ + else repo_info.script['binary'] task_state['script']['requirements'] = repo_info.script.get('requirements') or {} if self.cwd: - self.cwd = self.cwd - # cwd should be relative to the repo_root, but we need the full path - # (repo_root + cwd) in order to resolve the entry point - cwd = (Path(repo_info.script['repo_root']) / self.cwd).as_posix() + cwd = self.cwd + if not Path(cwd).is_absolute(): + # cwd should be relative to the repo_root, but we need the full path + # (repo_root + cwd) in order to resolve the entry point + cwd = os.path.normpath((Path(repo_info.script['repo_root']) / self.cwd).as_posix()) + if not Path(cwd).is_dir(): + # we need to leave it as is, we have no idea, and this is a repo + cwd = self.cwd - if not Path(cwd).is_dir(): + elif not Path(cwd).is_dir(): + # we were passed an absolute dir and it does not exist raise ValueError("Working directory \'{}\' could not be found".format(cwd)) - entry_point = \ - Path(repo_info.script['repo_root']) / repo_info.script['working_dir'] / repo_info.script[ - 'entry_point'] - # resolve entry_point relative to the current working directory - entry_point = entry_point.relative_to(cwd).as_posix() + + if self.module: + entry_point = "-m {}".format(self.module) + elif stand_alone_script_outside_repo: + # this should be relative and the temp file we generated + entry_point = repo_info.script['entry_point'] + else: + entry_point = os.path.normpath( + Path(repo_info.script['repo_root']) / + repo_info.script['working_dir'] / repo_info.script['entry_point'] + ) + # resolve entry_point relative to the current working directory + if Path(cwd).is_absolute(): + entry_point = Path(entry_point).relative_to(cwd).as_posix() + else: + entry_point = repo_info.script['entry_point'] + # restore cwd - make it relative to the repo_root again - cwd = Path(cwd).relative_to(repo_info.script['repo_root']).as_posix() + if Path(cwd).is_absolute(): + # now cwd is relative again + cwd = Path(cwd).relative_to(repo_info.script['repo_root']).as_posix() + + # make sure we always have / (never \\) + if platform == "win32": + entry_point = entry_point.replace('\\', '/') if entry_point else "" + cwd = cwd.replace('\\', '/') if cwd else "" + task_state['script']['entry_point'] = entry_point or "" task_state['script']['working_dir'] = cwd or "." elif self.repo: - # normalize backslashes and remove first one - entry_point = '/'.join([p for p in self.script.split('/') if p and p != '.']) cwd = '/'.join([p for p in (self.cwd or '.').split('/') if p and p != '.']) - if cwd and entry_point.startswith(cwd + '/'): - entry_point = entry_point[len(cwd) + 1:] + # normalize backslashes and remove first one + if self.module: + entry_point = "-m {}".format(self.module) + else: + entry_point = '/'.join([p for p in self.script.split('/') if p and p != '.']) + if cwd and entry_point.startswith(cwd + '/'): + entry_point = entry_point[len(cwd) + 1:] + task_state['script']['repository'] = self.repo task_state['script']['version_num'] = self.commit or None task_state['script']['branch'] = self.branch or None @@ -241,7 +330,9 @@ class CreateAndPopulate(object): task_state['script']['working_dir'] = cwd or '.' task_state['script']['entry_point'] = entry_point or "" - if self.force_single_script_file and Path(self.script).is_file(): + if self.script and Path(self.script).is_file() and ( + self.force_single_script_file or Path(self.script).is_absolute()): + self.force_single_script_file = True create_requirements = self.packages is True repo_info, requirements = ScriptInfo.get( filepaths=[Path(self.script).as_posix()], @@ -251,15 +342,20 @@ class CreateAndPopulate(object): detect_jupyter_notebook=False, add_missing_installed_packages=True, detailed_req_report=False, - force_single_script=self.force_single_script_file, + force_single_script=True, ) + task_state['script']['binary'] = '/bin/bash' if ( + (repo_info.script['entry_point'] or '').lower().strip().endswith('.sh') and + not (repo_info.script['entry_point'] or '').lower().strip().startswith('-m ')) \ + else repo_info.script['binary'] task_state['script']['diff'] = repo_info.script['diff'] or '' task_state['script']['entry_point'] = repo_info.script['entry_point'] if create_requirements: task_state['script']['requirements'] = repo_info.script.get('requirements') or {} else: # standalone task - task_state['script']['entry_point'] = self.script or "" + task_state['script']['entry_point'] = self.script if self.script else \ + ("-m {}".format(self.module) if self.module else "") task_state['script']['working_dir'] = '.' # update requirements reqs = [] @@ -300,7 +396,8 @@ class CreateAndPopulate(object): idx_a = 0 lines = None # find the right entry for the patch if we have a local file (basically after __future__ - if local_entry_file: + if (local_entry_file and not stand_alone_script_outside_repo and not self.module and + str(local_entry_file).lower().endswith(".py")): with open(local_entry_file, 'rt') as f: lines = f.readlines() future_found = self._locate_future_import(lines) @@ -308,7 +405,8 @@ class CreateAndPopulate(object): idx_a = future_found + 1 task_init_patch = '' - if self.repo or task_state.get('script', {}).get('repository'): + if ((self.repo or task_state.get('script', {}).get('repository')) and + not self.force_single_script_file and not stand_alone_script_outside_repo): # if we do not have requirements, add clearml to the requirements.txt if not reqs: task_init_patch += \ @@ -319,26 +417,33 @@ class CreateAndPopulate(object): "+clearml\n" # Add Task.init call - task_init_patch += \ - "diff --git a{script_entry} b{script_entry}\n" \ - "--- a{script_entry}\n" \ - "+++ b{script_entry}\n" \ - "@@ -{idx_a},0 +{idx_b},3 @@\n" \ - "+from clearml import Task\n" \ - "+(__name__ != \"__main__\") or Task.init()\n" \ - "+\n".format( - script_entry=script_entry, idx_a=idx_a, idx_b=idx_a + 1) + if not self.module and script_entry and str(script_entry).lower().endswith(".py"): + task_init_patch += \ + "diff --git a{script_entry} b{script_entry}\n" \ + "--- a{script_entry}\n" \ + "+++ b{script_entry}\n" \ + "@@ -{idx_a},0 +{idx_b},3 @@\n" \ + "+from clearml import Task\n" \ + "+(__name__ != \"__main__\") or Task.init()\n" \ + "+\n".format( + script_entry=script_entry, idx_a=idx_a, idx_b=idx_a + 1) + elif self.module: + # if we are here, do nothing + pass elif local_entry_file and lines: # if we are here it means we do not have a git diff, but a single script file init_lines = ["from clearml import Task\n", "(__name__ != \"__main__\") or Task.init()\n\n"] task_state['script']['diff'] = ''.join(lines[:idx_a] + init_lines + lines[idx_a:]) # no need to add anything, we patched it. task_init_patch = "" - else: + elif str(script_entry or "").lower().endswith(".py"): # Add Task.init call + # if we are here it means we do not have a git diff, but a single script file task_init_patch += \ "from clearml import Task\n" \ "(__name__ != \"__main__\") or Task.init()\n\n" + task_state['script']['diff'] = task_init_patch + task_state['script'].get('diff', '') + task_init_patch = "" # make sure we add the diff at the end of the current diff task_state['script']['diff'] = task_state['script'].get('diff', '') diff --git a/clearml/cli/task/__main__.py b/clearml/cli/task/__main__.py index cb5007e6..05a98b42 100644 --- a/clearml/cli/task/__main__.py +++ b/clearml/cli/task/__main__.py @@ -37,10 +37,15 @@ def setup_parser(parser): 'and will be replicated on the remote machine') parser.add_argument('--script', type=str, default=None, help='Specify the entry point script for the remote execution. ' + 'Currently support .py .ipynb and .sh scripts (python, jupyter notebook, bash) ' 'When used in tandem with --repo the script should be a relative path inside ' - 'the repository, for example: --script source/train.py .' + 'the repository, for example: --script source/train.py ' 'When used with --folder it supports a direct path to a file inside the local ' 'repository itself, for example: --script ~/project/source/train.py') + parser.add_argument('--module', type=str, default=None, + help='Instead of a script entry point, specify a python module to be remotely executed. ' + 'Notice: It cannot be used with --script at the same time. ' + 'for example: --module "torch.distributed.launch train_script.py"') parser.add_argument('--cwd', type=str, default=None, help='Working directory to launch the script from. Default: repository root folder. ' 'Relative to repo root or local folder') @@ -125,6 +130,7 @@ def cli(): branch=args.branch, commit=args.commit, script=args.script, + module=args.module, working_directory=args.cwd, packages=args.packages, requirements_file=args.requirements, From 3fa540928346747ec305f588f896724bd807f44e Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Thu, 4 Jul 2024 15:26:17 +0300 Subject: [PATCH 04/17] Fix launch_multi_node enforce the parent of sub-tasks to be the master node 0 task --- clearml/task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clearml/task.py b/clearml/task.py index b29f17d0..932a8006 100644 --- a/clearml/task.py +++ b/clearml/task.py @@ -2045,7 +2045,7 @@ class Task(_Task): set_launch_multi_node_runtime_props(self, master_conf) current_conf = master_conf for node_rank in range(1, master_conf.get("total_num_nodes", total_num_nodes)): - node = self.clone(source_task=self) + node = self.clone(source_task=self, parent=self.id) node_conf = copy.deepcopy(master_conf) node_conf["node_rank"] = node_rank set_launch_multi_node_runtime_props(node, node_conf) From 7dc601598b71d80c6c8e4b4b3df08bd668684c26 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Thu, 4 Jul 2024 15:26:45 +0300 Subject: [PATCH 05/17] Add support for HTTP file upload progress reporting --- clearml/storage/callbacks.py | 2 +- clearml/storage/helper.py | 36 ++++++++++++++++++++---------------- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/clearml/storage/callbacks.py b/clearml/storage/callbacks.py index d1b22a68..bb8d5bf4 100644 --- a/clearml/storage/callbacks.py +++ b/clearml/storage/callbacks.py @@ -66,7 +66,7 @@ class ProgressReport(object): unit="MB", unit_scale=False, ncols=80, - bar_format="{bar} {percentage:3.0f}% | {n_fmt}/{total_fmt} MB " + bar_format="{bar} {percentage:3.0f}% | {n:.2f}/{total_fmt} MB " "[{elapsed}<{remaining}, {rate_fmt}{postfix}]: {desc}", ) except Exception: diff --git a/clearml/storage/helper.py b/clearml/storage/helper.py index dd63d275..a2928e03 100644 --- a/clearml/storage/helper.py +++ b/clearml/storage/helper.py @@ -33,7 +33,7 @@ from six import binary_type, StringIO from six.moves.queue import Queue, Empty from six.moves.urllib.parse import urlparse -from clearml.utilities.requests_toolbelt import MultipartEncoder +from clearml.utilities.requests_toolbelt import MultipartEncoderMonitor, MultipartEncoder from .callbacks import UploadProgressReport, DownloadProgressReport from .util import quote_url from ..backend_api.session import Session @@ -180,6 +180,14 @@ class _HttpDriver(_Driver): return self._containers[container_name] def upload_object_via_stream(self, iterator, container, object_name, extra=None, callback=None, **kwargs): + def monitor_callback(monitor): + new_chunk = monitor.bytes_read - monitor.previous_read + monitor.previous_read = monitor.bytes_read + try: + callback(new_chunk) + except Exception as ex: + self.get_logger().debug('Exception raised when running callback function: {}'.format(ex)) + # when sending data in post, there is no connection timeout, just an entire upload timeout timeout = int(self.timeout_total) url = container.name @@ -188,15 +196,7 @@ class _HttpDriver(_Driver): host, _, path = object_name.partition('/') url += host + '/' - m = MultipartEncoder(fields={ - path: (path, iterator, get_file_mimetype(object_name)) - }) - - headers = { - 'Content-Type': m.content_type, - } - headers.update(container.get_headers(url) or {}) - + stream_size = None if hasattr(iterator, 'tell') and hasattr(iterator, 'seek'): pos = iterator.tell() iterator.seek(0, 2) @@ -204,6 +204,16 @@ class _HttpDriver(_Driver): iterator.seek(pos, 0) timeout = max(timeout, (stream_size / 1024) / float(self.min_kbps_speed)) + m = MultipartEncoder(fields={path: (path, iterator, get_file_mimetype(object_name))}) + if callback and stream_size: + m = MultipartEncoderMonitor(m, callback=monitor_callback) + m.previous_read = 0 + + headers = { + 'Content-Type': m.content_type, + } + headers.update(container.get_headers(url) or {}) + res = container.session.post( url, data=m, timeout=timeout, headers=headers ) @@ -211,12 +221,6 @@ class _HttpDriver(_Driver): raise ValueError('Failed uploading object %s (%d): %s' % (object_name, res.status_code, res.text)) # call back is useless because we are not calling it while uploading... - - # if callback and stream_size: - # try: - # callback(stream_size) - # except Exception as ex: - # log.debug('Exception raised when running callback function: %s' % ex) return res def list_container_objects(self, *args, **kwargs): From 9594e5ddddeaa3bc825f43910384802420ca969e Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Thu, 4 Jul 2024 15:27:10 +0300 Subject: [PATCH 06/17] Add CLEARML_MULTI_NODE_SINGLE_TASK (values -1, 0, 1, 2) for easier multi-node singe Task workloads --- clearml/config/defs.py | 3 ++ clearml/utilities/resource_monitor.py | 40 +++++++++++++++++++++++++-- 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/clearml/config/defs.py b/clearml/config/defs.py index 0cd78b23..7e632fb1 100644 --- a/clearml/config/defs.py +++ b/clearml/config/defs.py @@ -28,6 +28,9 @@ SUPPRESS_UPDATE_MESSAGE_ENV_VAR = EnvEntry("CLEARML_SUPPRESS_UPDATE_MESSAGE", "T MAX_SERIES_PER_METRIC = EnvEntry("CLEARML_MAX_SERIES_PER_METRIC", default=100, type=int) +# values are 0/None (task per node), 1/2 (multi-node reporting, colored console), -1 (only report rank 0 node) +ENV_MULTI_NODE_SINGLE_TASK = EnvEntry("CLEARML_MULTI_NODE_SINGLE_TASK", type=int, default=None) + JUPYTER_PASSWORD = EnvEntry("CLEARML_JUPYTER_PASSWORD") # Repository detection diff --git a/clearml/utilities/resource_monitor.py b/clearml/utilities/resource_monitor.py index 41465d6a..e31227cc 100644 --- a/clearml/utilities/resource_monitor.py +++ b/clearml/utilities/resource_monitor.py @@ -3,6 +3,7 @@ import os import platform import sys import warnings +from math import ceil, log10 from time import time import psutil @@ -12,7 +13,7 @@ from typing import Text from .process.mp import BackgroundMonitor from ..backend_api import Session from ..binding.frameworks.tensorflow_bind import IsTensorboardInit -from ..config import config +from ..config import config, ENV_MULTI_NODE_SINGLE_TASK try: from .gpu import gpustat @@ -103,6 +104,31 @@ class ResourceMonitor(BackgroundMonitor): if self._is_thread_mode_and_not_main_process(): return + multi_node_single_task_reporting = False + report_node_as_series = False + rank = 0 + world_size_digits = 0 + # check if we are in multi-node reporting to the same Task + if ENV_MULTI_NODE_SINGLE_TASK.get(): + # if resource monitoring is disabled, do nothing + if ENV_MULTI_NODE_SINGLE_TASK.get() < 0: + return + # we are reporting machines stats on a different machine over the same Task + multi_node_single_task_reporting = True + if ENV_MULTI_NODE_SINGLE_TASK.get() == 1: + # report per machine graph (unique title) + report_node_as_series = False + elif ENV_MULTI_NODE_SINGLE_TASK.get() == 2: + # report per machine series (i.e. merge title+series resource and have "node X" as different series) + report_node_as_series = True + + # noinspection PyBroadException + try: + rank = int(os.environ.get("RANK") or 0) + world_size_digits = ceil(log10(int(os.environ.get("WORLD_SIZE") or 0))) + except Exception: + pass + seconds_since_started = 0 reported = 0 last_iteration = 0 @@ -196,9 +222,19 @@ class ResourceMonitor(BackgroundMonitor): # noinspection PyBroadException try: title = self._title_gpu if k.startswith('gpu_') else self._title_machine + series = k # 3 points after the dot + if multi_node_single_task_reporting: + if report_node_as_series: + title = "{}:{}".format(":".join(title.split(":")[:-1]), series) + series = "rank {:0{world_size_digits}d}".format( + rank, world_size_digits=world_size_digits) + else: + title = "{}:rank{:0{world_size_digits}d}".format( + title, rank, world_size_digits=world_size_digits) + value = round(v * 1000) / 1000. - self._task.get_logger().report_scalar(title=title, series=k, iteration=iteration, value=value) + self._task.get_logger().report_scalar(title=title, series=series, iteration=iteration, value=value) except Exception: pass # clear readouts if this is update is not averaged From aa227a0cdbbad8805e947f845160274fc11446c5 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Thu, 4 Jul 2024 15:27:52 +0300 Subject: [PATCH 07/17] Fix tensorboard numpy 2.0 incompatibility breaking binding --- clearml/binding/frameworks/tensorflow_bind.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/clearml/binding/frameworks/tensorflow_bind.py b/clearml/binding/frameworks/tensorflow_bind.py index 385d1d71..1657fadd 100644 --- a/clearml/binding/frameworks/tensorflow_bind.py +++ b/clearml/binding/frameworks/tensorflow_bind.py @@ -738,13 +738,14 @@ class EventTrainsWriter(object): LoggerRoot.get_base_logger(TensorflowBinding).debug( 'No tag for \'value\' existing keys %s' % ', '.join(vdict.keys())) continue + # noinspection PyBroadException try: from tensorboard.plugins.hparams.metadata import SESSION_START_INFO_TAG if tag == SESSION_START_INFO_TAG: self._add_hparams(vdict) continue - except ImportError: + except Exception: pass metric, values = get_data(vdict, supported_metrics) if metric == 'simpleValue': From e27d277e40ae3a4f80516808322b403df5f48083 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Thu, 4 Jul 2024 15:29:37 +0300 Subject: [PATCH 08/17] Fix Task.launch_multi_node() not supported when used via pytorch lightning --- clearml/task.py | 67 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 57 insertions(+), 10 deletions(-) diff --git a/clearml/task.py b/clearml/task.py index 932a8006..4a92ce57 100644 --- a/clearml/task.py +++ b/clearml/task.py @@ -180,6 +180,8 @@ class Task(_Task): __detect_repo_async = deferred_config('development.vcs_repo_detect_async', False) __default_output_uri = DEV_DEFAULT_OUTPUT_URI.get() or deferred_config('development.default_output_uri', None) + __hidden_tag = "hidden" + _launch_multi_node_section = "launch_multi_node" _launch_multi_node_instance_tag = "multi_node_instance" @@ -1921,8 +1923,16 @@ class Task(_Task): """ return self._get_logger(auto_connect_streams=self._log_to_backend) - def launch_multi_node(self, total_num_nodes, port=29500, queue=None, wait=False, addr=None): - # type: (int, Optional[int], Optional[str], bool, Optional[str]) -> dict + def launch_multi_node( + self, + total_num_nodes, # type: int + port=29500, # type: Optional[int] + queue=None, # type: Optional[str] + wait=False, # type: bool + addr=None, # type: Optional[str] + devices=None, # type: Optional[Union[int, Sequence[int]]] + hide_children=False # bool + ): """ Enqueue multiple clones of the current task to a queue, allowing the task to be ran by multiple workers in parallel. Each task running this way is called a node. @@ -1996,6 +2006,9 @@ class Task(_Task): parameter will be set to the one defined in ``MASTER_ADDR``. If neither environment variables exist, the value passed to the parameter will be used. If this value is None (default), the private IP of the machine the master node is running on will be used. + :param devices: The devices to use. This can be a positive number indicating the number of devices to use, + a sequence of indices or the value ``-1`` to indicate all available devices should be used. + :param hide_children: If True, the children tasks will be hidden. Otherwise, they will be visible in the UI :return: A dictionary containing relevant information regarding the multi node run. This dictionary has the following entries: @@ -2006,9 +2019,12 @@ class Task(_Task): - `node_rank` - the rank of the current node (master has rank 0) - `wait` - if True, the master node will wait for the other nodes to start """ + def set_launch_multi_node_runtime_props(task, conf): # noinspection PyProtectedMember - task._set_runtime_properties({"{}/{}".format(self._launch_multi_node_section, k): v for k, v in conf.items()}) + task._set_runtime_properties( + {"{}/{}".format(self._launch_multi_node_section, k): v for k, v in conf.items()} + ) if total_num_nodes < 1: raise UsageError("total_num_nodes needs to be at least 1") @@ -2024,6 +2040,7 @@ class Task(_Task): ), "node_rank": 0, "wait": wait, + "devices": devices } editable_conf = {"total_num_nodes": total_num_nodes, "queue": queue} editable_conf = self.connect(editable_conf, name=self._launch_multi_node_section) @@ -2033,23 +2050,27 @@ class Task(_Task): runtime_properties = self._get_runtime_properties() remote_node_rank = runtime_properties.get("{}/node_rank".format(self._launch_multi_node_section)) + current_conf = master_conf if remote_node_rank: # self is a child node, build the conf from the runtime proprerties current_conf = { entry: runtime_properties.get("{}/{}".format(self._launch_multi_node_section, entry)) for entry in master_conf.keys() } - else: + elif os.environ.get("CLEARML_MULTI_NODE_MASTER") is None: nodes_to_wait = [] # self is the master node, enqueue the other nodes set_launch_multi_node_runtime_props(self, master_conf) - current_conf = master_conf for node_rank in range(1, master_conf.get("total_num_nodes", total_num_nodes)): node = self.clone(source_task=self, parent=self.id) node_conf = copy.deepcopy(master_conf) node_conf["node_rank"] = node_rank set_launch_multi_node_runtime_props(node, node_conf) - node.set_system_tags(node.get_system_tags() + [self._launch_multi_node_instance_tag]) + node.set_system_tags( + node.get_system_tags() + + [self._launch_multi_node_instance_tag] + + ([self.__hidden_tag] if hide_children else []) + ) if master_conf.get("queue"): Task.enqueue(node, queue_name=master_conf["queue"]) else: @@ -2064,16 +2085,42 @@ class Task(_Task): Task.TaskStatusEnum.stopped, Task.TaskStatusEnum.closed, Task.TaskStatusEnum.failed, - Task.TaskStatusEnum.in_progress + Task.TaskStatusEnum.in_progress, ), - check_interval_sec=10 + check_interval_sec=10, ) self.log.info("Node with task ID {} and rank {} detected".format(node_to_wait.id, rank)) + os.environ["CLEARML_MULTI_NODE_MASTER"] = "1" + + num_devices = 1 + if devices is not None: + try: + num_devices = int(devices) + except TypeError: + try: + num_devices = len(devices) + except Exception as ex: + raise ValueError("Failed parsing number of devices: {}".format(ex)) + except ValueError as ex: + raise ValueError("Failed parsing number of devices: {}".format(ex)) + if num_devices < 0: + try: + import torch + + num_devices = torch.cuda.device_count() + except ImportError: + raise ImportError( + "Could not import `torch` while finding the number of devices. " + "Please install it or set `devices` to a value different than -1" + ) os.environ["MASTER_ADDR"] = current_conf.get("master_addr", "") os.environ["MASTER_PORT"] = str(current_conf.get("master_port", "")) - os.environ["WORLD_SIZE"] = str(current_conf.get("total_num_nodes", "")) - os.environ["RANK"] = str(current_conf.get("node_rank", "")) + os.environ["RANK"] = str( + current_conf.get("node_rank", 0) * num_devices + int(os.environ.get("LOCAL_RANK", "0")) + ) + os.environ["NODE_RANK"] = str(current_conf.get("node_rank", "")) + os.environ["WORLD_SIZE"] = str(current_conf.get("total_num_nodes", total_num_nodes) * num_devices) return current_conf From 253aee3b0ee34d0a4660d2e2678d8e63520313da Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sun, 7 Jul 2024 13:38:12 +0300 Subject: [PATCH 09/17] Fix CLEARML_MULTI_NODE_SINGLE_TASK resource monitoring --- clearml/utilities/resource_monitor.py | 75 ++++++++++++++++++--------- 1 file changed, 50 insertions(+), 25 deletions(-) diff --git a/clearml/utilities/resource_monitor.py b/clearml/utilities/resource_monitor.py index e31227cc..35b54e61 100644 --- a/clearml/utilities/resource_monitor.py +++ b/clearml/utilities/resource_monitor.py @@ -109,25 +109,29 @@ class ResourceMonitor(BackgroundMonitor): rank = 0 world_size_digits = 0 # check if we are in multi-node reporting to the same Task - if ENV_MULTI_NODE_SINGLE_TASK.get(): - # if resource monitoring is disabled, do nothing - if ENV_MULTI_NODE_SINGLE_TASK.get() < 0: - return - # we are reporting machines stats on a different machine over the same Task - multi_node_single_task_reporting = True - if ENV_MULTI_NODE_SINGLE_TASK.get() == 1: - # report per machine graph (unique title) - report_node_as_series = False - elif ENV_MULTI_NODE_SINGLE_TASK.get() == 2: - # report per machine series (i.e. merge title+series resource and have "node X" as different series) - report_node_as_series = True + # noinspection PyBroadException + try: + if ENV_MULTI_NODE_SINGLE_TASK.get(): + # if resource monitoring is disabled, do nothing + if ENV_MULTI_NODE_SINGLE_TASK.get() < 0: + return + # we are reporting machines stats on a different machine over the same Task + multi_node_single_task_reporting = True + if ENV_MULTI_NODE_SINGLE_TASK.get() == 1: + # report per machine graph (unique title) + report_node_as_series = False + elif ENV_MULTI_NODE_SINGLE_TASK.get() == 2: + # report per machine series (i.e. merge title+series resource and have "node X" as different series) + report_node_as_series = True - # noinspection PyBroadException - try: - rank = int(os.environ.get("RANK") or 0) - world_size_digits = ceil(log10(int(os.environ.get("WORLD_SIZE") or 0))) - except Exception: - pass + # noinspection PyBroadException + try: + rank = int(os.environ.get("RANK", os.environ.get('SLURM_PROCID')) or 0) + world_size_digits = ceil(log10(int(os.environ.get("WORLD_SIZE") or 0))) + except Exception: + pass + except Exception: + pass seconds_since_started = 0 reported = 0 @@ -342,14 +346,35 @@ class ResourceMonitor(BackgroundMonitor): def get_logger_reported_titles(cls, task): # noinspection PyProtectedMember titles = list(task.get_logger()._get_used_title_series().keys()) + + # noinspection PyBroadException try: - titles.remove(cls._title_machine) - except ValueError: - pass - try: - titles.remove(cls._title_gpu) - except ValueError: - pass + multi_node = ENV_MULTI_NODE_SINGLE_TASK.get() is not None + except Exception: + multi_node = False + + if multi_node: + title_machine = ":".join(cls._title_machine.split(":")[:-1]) + title_gpu = ":".join(cls._title_gpu.split(":")[:-1]) + if not title_machine: + title_machine = cls._title_machine + if not title_gpu: + title_gpu = cls._title_gpu + + try: + titles = [t for t in titles if not t.startswith(title_machine) and not t.startswith(title_gpu)] + except ValueError: + pass + else: + try: + titles.remove(cls._title_machine) + except ValueError: + pass + try: + titles.remove(cls._title_gpu) + except ValueError: + pass + return titles def _get_process_used_memory(self): From 0bb0f2986785373ae322e41b5e1694bfcf8f2298 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Tue, 9 Jul 2024 15:35:19 +0300 Subject: [PATCH 10/17] Fix jupyter notebook packages and uncommitted changes are sometimes not fetched --- clearml/backend_interface/task/repo/scriptinfo.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/clearml/backend_interface/task/repo/scriptinfo.py b/clearml/backend_interface/task/repo/scriptinfo.py index 30f2a850..1cd29009 100644 --- a/clearml/backend_interface/task/repo/scriptinfo.py +++ b/clearml/backend_interface/task/repo/scriptinfo.py @@ -563,8 +563,15 @@ class _JupyterObserver(object): reqs = ReqsModules() for name in fmodules: if name in installed_pkgs: - pkg_name, version = installed_pkgs[name] - reqs.add(pkg_name, version, fmodules[name]) + # handle namespace packages, which are returned as flat dicts of format + # {mapping_pkg_name: (pkg_name, version), ...} + if isinstance(installed_pkgs[name], dict): + for subpackage_name, subpackage in installed_pkgs[name].items(): + pkg_name, version = subpackage + reqs.add(pkg_name, version, fmodules.get(subpackage_name, fmodules[name])) + else: + pkg_name, version = installed_pkgs[name] + reqs.add(pkg_name, version, fmodules[name]) requirements_txt, conda_requirements = ScriptRequirements.create_requirements_txt(reqs) # remove ipython direct access from the script code From 74b2b38673771ea81d760580860466b47a7daf12 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Thu, 11 Jul 2024 12:55:20 +0300 Subject: [PATCH 11/17] Fix CLEARML_MULTI_NODE_SINGLE_TASK rank 0 always creates resource monitoring scalers ":monitor:machine" and ":monitor:gpu" so that external monitoring is not broken --- clearml/utilities/resource_monitor.py | 31 ++++++++++++++++++++------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/clearml/utilities/resource_monitor.py b/clearml/utilities/resource_monitor.py index 35b54e61..38e4a803 100644 --- a/clearml/utilities/resource_monitor.py +++ b/clearml/utilities/resource_monitor.py @@ -28,6 +28,7 @@ class ResourceMonitor(BackgroundMonitor): _wait_for_first_iteration_to_start_sec_default = 180.0 _max_wait_for_first_iteration_to_start_sec_default = 1800.0 _resource_monitor_instances = [] + _multi_node_single_task = None def __init__(self, task, sample_frequency_per_sec=2., report_frequency_sec=30., first_report_sec=None, wait_for_first_iteration_to_start_sec=None, @@ -35,6 +36,7 @@ class ResourceMonitor(BackgroundMonitor): super(ResourceMonitor, self).__init__(task=task, wait_period=sample_frequency_per_sec) # noinspection PyProtectedMember ResourceMonitor._resource_monitor_instances.append(self) + ResourceMonitor._multi_node_single_task = ENV_MULTI_NODE_SINGLE_TASK.get() self._task = task self._sample_frequency = sample_frequency_per_sec self._report_frequency = report_frequency_sec @@ -111,16 +113,16 @@ class ResourceMonitor(BackgroundMonitor): # check if we are in multi-node reporting to the same Task # noinspection PyBroadException try: - if ENV_MULTI_NODE_SINGLE_TASK.get(): + if self._multi_node_single_task: # if resource monitoring is disabled, do nothing - if ENV_MULTI_NODE_SINGLE_TASK.get() < 0: + if self._multi_node_single_task < 0: return # we are reporting machines stats on a different machine over the same Task multi_node_single_task_reporting = True - if ENV_MULTI_NODE_SINGLE_TASK.get() == 1: + if self._multi_node_single_task == 1: # report per machine graph (unique title) report_node_as_series = False - elif ENV_MULTI_NODE_SINGLE_TASK.get() == 2: + elif self._multi_node_single_task == 2: # report per machine series (i.e. merge title+series resource and have "node X" as different series) report_node_as_series = True @@ -225,20 +227,33 @@ class ResourceMonitor(BackgroundMonitor): for k, v in average_readouts.items(): # noinspection PyBroadException try: + # 3 digits after the dot + value = round(v * 1000) / 1000. title = self._title_gpu if k.startswith('gpu_') else self._title_machine series = k - # 3 points after the dot if multi_node_single_task_reporting: if report_node_as_series: + # for rank 0 we keep the same original report so that external services + # can always check the default cpu/gpu utilization + if rank == 0: + self._task.get_logger().report_scalar( + title=title, series=series, + iteration=iteration, value=value) + + # now let's create an additional report title = "{}:{}".format(":".join(title.split(":")[:-1]), series) series = "rank {:0{world_size_digits}d}".format( rank, world_size_digits=world_size_digits) - else: + elif rank > 0: title = "{}:rank{:0{world_size_digits}d}".format( title, rank, world_size_digits=world_size_digits) + else: + # for rank 0 we keep the same original report so that external services + # can always check the default cpu/gpu utilization + pass - value = round(v * 1000) / 1000. self._task.get_logger().report_scalar(title=title, series=series, iteration=iteration, value=value) + except Exception: pass # clear readouts if this is update is not averaged @@ -349,7 +364,7 @@ class ResourceMonitor(BackgroundMonitor): # noinspection PyBroadException try: - multi_node = ENV_MULTI_NODE_SINGLE_TASK.get() is not None + multi_node = cls._multi_node_single_task is not None except Exception: multi_node = False From d75564d514f2506c8a14291628cd7ca9980f72f2 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sat, 13 Jul 2024 22:13:55 +0300 Subject: [PATCH 12/17] Fix "can't create new thread at interpreter shutdown" errors (known issue with Python 3.12.0 and other versions) --- clearml/utilities/process/mp.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/clearml/utilities/process/mp.py b/clearml/utilities/process/mp.py index 6a1d95e6..4b9b831c 100644 --- a/clearml/utilities/process/mp.py +++ b/clearml/utilities/process/mp.py @@ -527,6 +527,13 @@ class BackgroundMonitor(object): if isinstance(self._thread, Thread): if self._thread_pid == os.getpid(): return + + # make sure we start the metrics thread pools before starting the daemon thread + # workaround for: https://github.com/python/cpython/issues/113964 + from ...backend_interface.metrics.interface import Metrics + # noinspection PyProtectedMember + Metrics._initialize_upload_pools() + self._thread_pid = os.getpid() self._thread = Thread(target=self._daemon) self._thread.daemon = True From cbb90a8bd5f491913e925fb28be18afd9495a376 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Mon, 15 Jul 2024 13:39:00 +0300 Subject: [PATCH 13/17] Black formatting --- clearml/cli/task/__main__.py | 222 ++++++++++++++++++++++------------- 1 file changed, 143 insertions(+), 79 deletions(-) diff --git a/clearml/cli/task/__main__.py b/clearml/cli/task/__main__.py index 05a98b42..10553d82 100644 --- a/clearml/cli/task/__main__.py +++ b/clearml/cli/task/__main__.py @@ -12,75 +12,139 @@ clearml.backend_api.session.Session.add_client("clearml-task", __version__) def setup_parser(parser): - parser.add_argument('--version', action='store_true', default=None, - help='Display the clearml-task utility version') - parser.add_argument('--project', type=str, default=None, - help='Required: set the project name for the task. ' - 'If --base-task-id is used, this arguments is optional.') - parser.add_argument('--name', type=str, default=None, - help='Required: select a name for the remote task') - parser.add_argument('--tags', default=None, nargs='*', - help='Optional: add tags to the newly created Task. ' - 'Example: --tags "base" "job"') - parser.add_argument('--repo', type=str, default=None, - help='remote URL for the repository to use. ' - 'Example: --repo https://github.com/allegroai/clearml.git') - parser.add_argument('--branch', type=str, default=None, - help='Select specific repository branch/tag (implies the latest commit from the branch)') - parser.add_argument('--commit', type=str, default=None, - help='Select specific commit id to use (default: latest commit, ' - 'or when used with local repository matching the local commit id)') - parser.add_argument('--folder', type=str, default=None, - help='Remotely execute the code in the local folder. ' - 'Notice! It assumes a git repository already exists. ' - 'Current state of the repo (commit id and uncommitted changes) is logged ' - 'and will be replicated on the remote machine') - parser.add_argument('--script', type=str, default=None, - help='Specify the entry point script for the remote execution. ' - 'Currently support .py .ipynb and .sh scripts (python, jupyter notebook, bash) ' - 'When used in tandem with --repo the script should be a relative path inside ' - 'the repository, for example: --script source/train.py ' - 'When used with --folder it supports a direct path to a file inside the local ' - 'repository itself, for example: --script ~/project/source/train.py') - parser.add_argument('--module', type=str, default=None, - help='Instead of a script entry point, specify a python module to be remotely executed. ' - 'Notice: It cannot be used with --script at the same time. ' - 'for example: --module "torch.distributed.launch train_script.py"') - parser.add_argument('--cwd', type=str, default=None, - help='Working directory to launch the script from. Default: repository root folder. ' - 'Relative to repo root or local folder') - parser.add_argument('--args', default=None, nargs='*', - help='Arguments to pass to the remote execution, list of = strings.' - 'Currently only argparse arguments are supported. ' - 'Example: --args lr=0.003 batch_size=64') - parser.add_argument('--queue', type=str, default=None, - help='Select the queue to launch the task. ' - 'If not provided a Task will be created but it will not be launched.') - parser.add_argument('--requirements', type=str, default=None, - help='Specify requirements.txt file to install when setting the session. ' - 'If not provided, the requirements.txt from the repository will be used.') - parser.add_argument('--packages', default=None, nargs='*', - help='Manually specify a list of required packages. ' - 'Example: --packages "tqdm>=2.1" "scikit-learn"') - parser.add_argument('--docker', type=str, default=None, - help='Select the docker image to use in the remote session') - parser.add_argument('--docker_args', type=str, default=None, - help='Add docker arguments, pass a single string') - parser.add_argument('--docker_bash_setup_script', type=str, default=None, - help="Add bash script to be executed inside the docker before setting up " - "the Task's environment") - parser.add_argument('--output-uri', type=str, default=None, required=False, - help='Optional: set the Task `output_uri` (automatically upload model destination)') - parser.add_argument('--task-type', type=str, default=None, - help='Set the Task type, optional values: ' - 'training, testing, inference, data_processing, application, monitor, ' - 'controller, optimizer, service, qc, custom') - parser.add_argument('--skip-task-init', action='store_true', default=None, - help='If set, Task.init() call is not added to the entry point, and is assumed ' - 'to be called in within the script. Default: add Task.init() call entry point script') - parser.add_argument('--base-task-id', type=str, default=None, - help='Use a pre-existing task in the system, instead of a local repo/script. ' - 'Essentially clones an existing task and overrides arguments/requirements.') + parser.add_argument("--version", action="store_true", default=None, help="Display the clearml-task utility version") + parser.add_argument( + "--project", + type=str, + default=None, + help="Required: set the project name for the task. " "If --base-task-id is used, this arguments is optional.", + ) + parser.add_argument("--name", type=str, default=None, help="Required: select a name for the remote task") + parser.add_argument( + "--tags", + default=None, + nargs="*", + help="Optional: add tags to the newly created Task. " 'Example: --tags "base" "job"', + ) + parser.add_argument( + "--repo", + type=str, + default=None, + help="remote URL for the repository to use. " "Example: --repo https://github.com/allegroai/clearml.git", + ) + parser.add_argument( + "--branch", + type=str, + default=None, + help="Select specific repository branch/tag (implies the latest commit from the branch)", + ) + parser.add_argument( + "--commit", + type=str, + default=None, + help="Select specific commit id to use (default: latest commit, " + "or when used with local repository matching the local commit id)", + ) + parser.add_argument( + "--folder", + type=str, + default=None, + help="Remotely execute the code in the local folder. " + "Notice! It assumes a git repository already exists. " + "Current state of the repo (commit id and uncommitted changes) is logged " + "and will be replicated on the remote machine", + ) + parser.add_argument( + "--script", + type=str, + default=None, + help="Specify the entry point script for the remote execution. " + "Currently support .py .ipynb and .sh scripts (python, jupyter notebook, bash) " + "When used in tandem with --repo the script should be a relative path inside " + "the repository, for example: --script source/train.py " + "When used with --folder it supports a direct path to a file inside the local " + "repository itself, for example: --script ~/project/source/train.py", + ) + parser.add_argument( + "--module", + type=str, + default=None, + help="Instead of a script entry point, specify a python module to be remotely executed. " + "Notice: It cannot be used with --script at the same time. " + 'for example: --module "torch.distributed.launch train_script.py"', + ) + parser.add_argument( + "--cwd", + type=str, + default=None, + help="Working directory to launch the script from. Default: repository root folder. " + "Relative to repo root or local folder", + ) + parser.add_argument( + "--args", + default=None, + nargs="*", + help="Arguments to pass to the remote execution, list of = strings." + "Currently only argparse arguments are supported. " + "Example: --args lr=0.003 batch_size=64", + ) + parser.add_argument( + "--queue", + type=str, + default=None, + help="Select the queue to launch the task. " + "If not provided a Task will be created but it will not be launched.", + ) + parser.add_argument( + "--requirements", + type=str, + default=None, + help="Specify requirements.txt file to install when setting the session. " + "If not provided, the requirements.txt from the repository will be used.", + ) + parser.add_argument( + "--packages", + default=None, + nargs="*", + help="Manually specify a list of required packages. " 'Example: --packages "tqdm>=2.1" "scikit-learn"', + ) + parser.add_argument("--docker", type=str, default=None, help="Select the docker image to use in the remote session") + parser.add_argument("--docker_args", type=str, default=None, help="Add docker arguments, pass a single string") + parser.add_argument( + "--docker_bash_setup_script", + type=str, + default=None, + help="Add bash script to be executed inside the docker before setting up " "the Task's environment", + ) + parser.add_argument( + "--output-uri", + type=str, + default=None, + required=False, + help="Optional: set the Task `output_uri` (automatically upload model destination)", + ) + parser.add_argument( + "--task-type", + type=str, + default=None, + help="Set the Task type, optional values: " + "training, testing, inference, data_processing, application, monitor, " + "controller, optimizer, service, qc, custom", + ) + parser.add_argument( + "--skip-task-init", + action="store_true", + default=None, + help="If set, Task.init() call is not added to the entry point, and is assumed " + "to be called in within the script. Default: add Task.init() call entry point script", + ) + parser.add_argument( + "--base-task-id", + type=str, + default=None, + help="Use a pre-existing task in the system, instead of a local repo/script. " + "Essentially clones an existing task and overrides arguments/requirements.", + ) parser.add_argument( "--import-offline-session", type=str, @@ -90,7 +154,7 @@ def setup_parser(parser): def cli(): - title = 'ClearML launch - launch any codebase on remote machine running clearml-agent' + title = "ClearML launch - launch any codebase on remote machine running clearml-agent" print(title) parser = ArgumentParser(description=title) setup_parser(parser) @@ -103,7 +167,7 @@ def cli(): exit(0) if args.version: - print('Version {}'.format(__version__)) + print("Version {}".format(__version__)) exit(0) if not args.name and not args.import_offline_session: @@ -145,7 +209,7 @@ def cli(): ) # verify args before creating the Task create_populate.update_task_args(args.args) - print('Creating new task') + print("Creating new task") create_populate.create_task() # update Task args create_populate.update_task_args(args.args) @@ -156,25 +220,25 @@ def cli(): # noinspection PyProtectedMember create_populate.task._set_runtime_properties({"_CLEARML_TASK": True}) - print('New task created id={}'.format(create_populate.get_id())) + print("New task created id={}".format(create_populate.get_id())) if not args.queue: - print('Warning: No queue was provided, leaving task in draft-mode.') + print("Warning: No queue was provided, leaving task in draft-mode.") exit(0) Task.enqueue(create_populate.task, queue_name=args.queue) - print('Task id={} sent for execution on queue {}'.format(create_populate.get_id(), args.queue)) - print('Execution log at: {}'.format(create_populate.task.get_output_log_web_page())) + print("Task id={} sent for execution on queue {}".format(create_populate.get_id(), args.queue)) + print("Execution log at: {}".format(create_populate.task.get_output_log_web_page())) def main(): try: cli() except KeyboardInterrupt: - print('\nUser aborted') + print("\nUser aborted") except Exception as ex: - print('\nError: {}'.format(ex)) + print("\nError: {}".format(ex)) exit(1) -if __name__ == '__main__': +if __name__ == "__main__": main() From f48c2ffc1c04384988b92ee4040e6493dedccadf Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Wed, 17 Jul 2024 10:50:49 +0300 Subject: [PATCH 14/17] Black formatting --- clearml/cli/hpo/__main__.py | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/clearml/cli/hpo/__main__.py b/clearml/cli/hpo/__main__.py index 97d0506a..7cdc6709 100644 --- a/clearml/cli/hpo/__main__.py +++ b/clearml/cli/hpo/__main__.py @@ -1,4 +1,4 @@ -import json +eyimport json import sys from argparse import ArgumentParser, RawTextHelpFormatter @@ -143,8 +143,10 @@ def setup_parser(parser): help="The maximum compute time in minutes. When time limit is exceeded, all jobs aborted", ) parser.add_argument( - "--pool-period-min", type=float, default=0.2, - help="The time between two consecutive pools (minutes) default 0.2 min" + "--pool-period-min", + type=float, + default=0.2, + help="The time between two consecutive pools (minutes) default 0.2 min", ) parser.add_argument( "--total-max-jobs", @@ -186,14 +188,19 @@ def setup_parser(parser): help="The maximum number of concurrent Tasks (experiments) running at the same time.", ) parser.add_argument( - '--args', default=None, nargs='*', - help='Arguments to pass to the remote execution, list of = strings.' - 'Currently only argparse/click/hydra/fire arguments are supported. ' - 'Example: --args lr=0.003 batch_size=64') + "--args", + default=None, + nargs="*", + help="Arguments to pass to the remote execution, list of = strings." + "Currently only argparse/click/hydra/fire arguments are supported. " + "Example: --args lr=0.003 batch_size=64", + ) parser.add_argument( - "--local", action='store_true', default=False, + "--local", + action="store_true", + default=False, help="If set, run the experiments locally, Notice no new python environment will be created, " - "--script must point to a local file entrypoint and all arguments must be passed with --args", + "--script must point to a local file entrypoint and all arguments must be passed with --args", ) @@ -261,7 +268,7 @@ def build_opt_kwargs(args): "total_max_jobs", "min_iteration_per_job", "max_iteration_per_job", - "max_number_of_concurrent_tasks" + "max_number_of_concurrent_tasks", ] for arg_name in optional_arg_names: arg_val = getattr(args, arg_name) @@ -293,7 +300,7 @@ def cli(): if not task_id: create_populate = CreateAndPopulate(script=args.script) create_populate.update_task_args(args.args) - print('Creating new task') + print("Creating new task") create_populate.create_task() # update Task args create_populate.update_task_args(args.args) From a59b40217e8b9b5e2bdeee2b3905e3f93b6a93ac Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Fri, 19 Jul 2024 13:12:46 +0300 Subject: [PATCH 15/17] Black formatting --- clearml/cli/data/__main__.py | 492 +++++++++++++++++++++-------------- clearml/cli/hpo/__main__.py | 2 +- 2 files changed, 298 insertions(+), 196 deletions(-) diff --git a/clearml/cli/data/__main__.py b/clearml/cli/data/__main__.py index 2d6bf82f..f0875c05 100644 --- a/clearml/cli/data/__main__.py +++ b/clearml/cli/data/__main__.py @@ -15,36 +15,35 @@ clearml.backend_api.session.Session.add_client("clearml-data", __version__) def check_null_id(args): - if not getattr(args, 'id', None): + if not getattr(args, "id", None): raise ValueError("Dataset ID not specified, add --id ") -def print_args(args, exclude=('command', 'func', 'verbose')): +def print_args(args, exclude=("command", "func", "verbose")): # type: (object, Sequence[str]) -> () - if not getattr(args, 'verbose', None): + if not getattr(args, "verbose", None): return for arg in args.__dict__: if arg in exclude or args.__dict__.get(arg) is None: continue - print('{}={}'.format(arg, args.__dict__[arg])) + print("{}={}".format(arg, args.__dict__[arg])) def restore_state(args): - session_state_file = os.path.expanduser('~/.clearml_data.json') + session_state_file = os.path.expanduser("~/.clearml_data.json") # noinspection PyBroadException try: - with open(session_state_file, 'rt') as f: + with open(session_state_file, "rt") as f: state = json.load(f) except Exception: state = {} - args.id = getattr(args, 'id', None) or state.get('id') + args.id = getattr(args, "id", None) or state.get("id") - state = {str(k): str(v) if v is not None else None - for k, v in args.__dict__.items() if not str(k).startswith('_')} + state = {str(k): str(v) if v is not None else None for k, v in args.__dict__.items() if not str(k).startswith("_")} # noinspection PyBroadException try: - with open(session_state_file, 'wt') as f: + with open(session_state_file, "wt") as f: json.dump(state, f, sort_keys=True) except Exception: pass @@ -53,10 +52,10 @@ def restore_state(args): def clear_state(state=None): - session_state_file = os.path.expanduser('~/.clearml_data.json') + session_state_file = os.path.expanduser("~/.clearml_data.json") # noinspection PyBroadException try: - with open(session_state_file, 'wt') as f: + with open(session_state_file, "wt") as f: json.dump(state or dict(), f, sort_keys=True) except Exception: pass @@ -64,21 +63,25 @@ def clear_state(state=None): def cli(): # type: () -> int - title = 'clearml-data - Dataset Management & Versioning CLI' + title = "clearml-data - Dataset Management & Versioning CLI" print(title) parser = ArgumentParser( # noqa description=title, - prog='clearml-data', + prog="clearml-data", formatter_class=partial(HelpFormatter, indent_increment=0, max_help_position=10), ) - subparsers = parser.add_subparsers(help='Dataset actions', dest='command') + subparsers = parser.add_subparsers(help="Dataset actions", dest="command") - create = subparsers.add_parser('create', help='Create a new dataset') - create.add_argument('--parents', type=str, nargs='*', - help='[Optional] Specify dataset parents IDs (i.e. merge all parents). ' - 'Example: a17b4fID1 f0ee5ID2 a17b4f09eID3') - create.add_argument('--project', type=str, required=False, default=None, help='Dataset project name') - create.add_argument('--name', type=str, required=True, default=None, help='Dataset name') + create = subparsers.add_parser("create", help="Create a new dataset") + create.add_argument( + "--parents", + type=str, + nargs="*", + help="[Optional] Specify dataset parents IDs (i.e. merge all parents). " + "Example: a17b4fID1 f0ee5ID2 a17b4f09eID3", + ) + create.add_argument("--project", type=str, required=False, default=None, help="Dataset project name") + create.add_argument("--name", type=str, required=True, default=None, help="Dataset name") create.add_argument("--version", type=str, required=False, default=None, help="Dataset version") create.add_argument( "--output-uri", @@ -95,14 +98,22 @@ def cli(): "Examples: 's3://bucket/data', 'gs://bucket/data', 'azure://bucket/data', " "'/mnt/shared/folder/data'", ) - create.add_argument('--tags', type=str, nargs='*', help='Dataset user Tags') + create.add_argument("--tags", type=str, nargs="*", help="Dataset user Tags") create.set_defaults(func=ds_create) - add = subparsers.add_parser('add', help='Add files or links to the dataset') - add.add_argument('--id', type=str, required=False, - help='Previously created dataset id. Default: previously created/accessed dataset') - add.add_argument('--dataset-folder', type=str, default=None, - help='Dataset base folder to add the files to (default: Dataset root)') + add = subparsers.add_parser("add", help="Add files or links to the dataset") + add.add_argument( + "--id", + type=str, + required=False, + help="Previously created dataset id. Default: previously created/accessed dataset", + ) + add.add_argument( + "--dataset-folder", + type=str, + default=None, + help="Dataset base folder to add the files to (default: Dataset root)", + ) add.add_argument("--files", type=str, nargs="*", help="Files / folders to add.") add.add_argument( "--wildcard", @@ -119,9 +130,8 @@ def cli(): "Example: s3://bucket/data azure://bucket/folder" ), ) - add.add_argument('--non-recursive', action='store_true', default=False, - help='Disable recursive scan of files') - add.add_argument('--verbose', action='store_true', default=False, help='Verbose reporting') + add.add_argument("--non-recursive", action="store_true", default=False, help="Disable recursive scan of files") + add.add_argument("--verbose", action="store_true", default=False, help="Verbose reporting") add.add_argument( "--max-workers", type=int, @@ -145,21 +155,34 @@ def cli(): ) set_description.set_defaults(func=ds_set_description) - sync = subparsers.add_parser('sync', help='Sync a local folder with the dataset') - sync.add_argument('--id', type=str, required=False, - help='Previously created dataset id. Default: previously created/accessed dataset') - sync.add_argument('--dataset-folder', type=str, default=None, - help='Dataset base folder to add the files to (default: Dataset root)') - sync.add_argument('--folder', type=str, required=True, - help='Local folder to sync (support for wildcard selection). ' - 'Example: ~/data/*.jpg') - sync.add_argument('--parents', type=str, nargs='*', - help='[Optional] Specify dataset parents IDs (i.e. merge all parents). ' - 'Example: a17b4fID1 f0ee5ID2 a17b4f09eID3') - sync.add_argument('--project', type=str, required=False, default=None, - help='[Optional] Dataset project name') - sync.add_argument('--name', type=str, required=False, default=None, - help='[Optional] Dataset project name') + sync = subparsers.add_parser("sync", help="Sync a local folder with the dataset") + sync.add_argument( + "--id", + type=str, + required=False, + help="Previously created dataset id. Default: previously created/accessed dataset", + ) + sync.add_argument( + "--dataset-folder", + type=str, + default=None, + help="Dataset base folder to add the files to (default: Dataset root)", + ) + sync.add_argument( + "--folder", + type=str, + required=True, + help="Local folder to sync (support for wildcard selection). " "Example: ~/data/*.jpg", + ) + sync.add_argument( + "--parents", + type=str, + nargs="*", + help="[Optional] Specify dataset parents IDs (i.e. merge all parents). " + "Example: a17b4fID1 f0ee5ID2 a17b4f09eID3", + ) + sync.add_argument("--project", type=str, required=False, default=None, help="[Optional] Dataset project name") + sync.add_argument("--name", type=str, required=False, default=None, help="[Optional] Dataset project name") sync.add_argument("--version", type=str, required=False, default=None, help="[Optional] Dataset version") sync.add_argument( "--output-uri", @@ -168,43 +191,71 @@ def cli(): default=None, help="[Optional] Output URI for artifacts/debug samples. Useable when creating the dataset (deprecated, use '--storage' instead)", ) - sync.add_argument('--tags', type=str, nargs='*', - help='[Optional] Dataset user Tags') - sync.add_argument('--storage', type=str, default=None, - help='Remote storage to use for the dataset files (default: files_server). ' - 'Examples: \'s3://bucket/data\', \'gs://bucket/data\', \'azure://bucket/data\', ' - '\'/mnt/shared/folder/data\'') - sync.add_argument('--skip-close', action='store_true', default=False, - help='Do not auto close dataset after syncing folders') - sync.add_argument('--chunk-size', default=512, type=int, - help='Set dataset artifact chunk size in MB. Default 512mb, (pass -1 for a single chunk). ' - 'Example: 512, dataset will be split and uploaded in 512mb chunks.') - sync.add_argument('--verbose', action='store_true', default=False, help='Verbose reporting') + sync.add_argument("--tags", type=str, nargs="*", help="[Optional] Dataset user Tags") + sync.add_argument( + "--storage", + type=str, + default=None, + help="Remote storage to use for the dataset files (default: files_server). " + "Examples: 's3://bucket/data', 'gs://bucket/data', 'azure://bucket/data', " + "'/mnt/shared/folder/data'", + ) + sync.add_argument( + "--skip-close", action="store_true", default=False, help="Do not auto close dataset after syncing folders" + ) + sync.add_argument( + "--chunk-size", + default=512, + type=int, + help="Set dataset artifact chunk size in MB. Default 512mb, (pass -1 for a single chunk). " + "Example: 512, dataset will be split and uploaded in 512mb chunks.", + ) + sync.add_argument("--verbose", action="store_true", default=False, help="Verbose reporting") sync.set_defaults(func=ds_sync) - remove = subparsers.add_parser('remove', help='Remove files/links from the dataset') - remove.add_argument('--id', type=str, required=False, - help='Previously created dataset id. Default: previously created/accessed dataset') - remove.add_argument('--files', type=str, required=False, nargs='*', - help='Files / folders to remove (support for wildcard selection). ' - 'Notice: File path is the dataset path not the local path. ' - 'Example: data/*.jpg data/jsons/') - remove.add_argument('--non-recursive', action='store_true', default=False, - help='Disable recursive scan of files') - remove.add_argument('--verbose', action='store_true', default=False, help='Verbose reporting') + remove = subparsers.add_parser("remove", help="Remove files/links from the dataset") + remove.add_argument( + "--id", + type=str, + required=False, + help="Previously created dataset id. Default: previously created/accessed dataset", + ) + remove.add_argument( + "--files", + type=str, + required=False, + nargs="*", + help="Files / folders to remove (support for wildcard selection). " + "Notice: File path is the dataset path not the local path. " + "Example: data/*.jpg data/jsons/", + ) + remove.add_argument("--non-recursive", action="store_true", default=False, help="Disable recursive scan of files") + remove.add_argument("--verbose", action="store_true", default=False, help="Verbose reporting") remove.set_defaults(func=ds_remove) - upload = subparsers.add_parser('upload', help='Upload the local dataset changes to the server') - upload.add_argument('--id', type=str, required=False, - help='Previously created dataset id. Default: previously created/accessed dataset') - upload.add_argument('--storage', type=str, default=None, - help='Remote storage to use for the dataset files (default: files_server). ' - 'Examples: \'s3://bucket/data\', \'gs://bucket/data\', \'azure://bucket/data\', ' - '\'/mnt/shared/folder/data\'') - upload.add_argument('--chunk-size', default=512, type=int, - help='Set dataset artifact chunk size in MB. Default 512, (pass -1 for a single chunk). ' - 'Example: 512, dataset will be split and uploaded in 512mb chunks.') - upload.add_argument('--verbose', default=False, action='store_true', help='Verbose reporting') + upload = subparsers.add_parser("upload", help="Upload the local dataset changes to the server") + upload.add_argument( + "--id", + type=str, + required=False, + help="Previously created dataset id. Default: previously created/accessed dataset", + ) + upload.add_argument( + "--storage", + type=str, + default=None, + help="Remote storage to use for the dataset files (default: files_server). " + "Examples: 's3://bucket/data', 'gs://bucket/data', 'azure://bucket/data', " + "'/mnt/shared/folder/data'", + ) + upload.add_argument( + "--chunk-size", + default=512, + type=int, + help="Set dataset artifact chunk size in MB. Default 512, (pass -1 for a single chunk). " + "Example: 512, dataset will be split and uploaded in 512mb chunks.", + ) + upload.add_argument("--verbose", default=False, action="store_true", help="Verbose reporting") upload.add_argument( "--max-workers", type=int, @@ -213,19 +264,32 @@ def cli(): ) upload.set_defaults(func=ds_upload) - finalize = subparsers.add_parser('close', help='Finalize and close the dataset (implies auto upload)') - finalize.add_argument('--id', type=str, required=False, - help='Previously created dataset id. Default: previously created/accessed dataset') - finalize.add_argument('--storage', type=str, default=None, - help='Remote storage to use for the dataset files (default: files_server). ' - 'Examples: \'s3://bucket/data\', \'gs://bucket/data\', \'azure://bucket/data\', ' - '\'/mnt/shared/folder/data\'') - finalize.add_argument('--disable-upload', action='store_true', default=False, - help='Disable automatic upload when closing the dataset') - finalize.add_argument('--chunk-size', default=512, type=int, - help='Set dataset artifact chunk size in MB. Default 512, (pass -1 for a single chunk). ' - 'Example: 512, dataset will be split and uploaded in 512mb chunks.') - finalize.add_argument('--verbose', action='store_true', default=False, help='Verbose reporting') + finalize = subparsers.add_parser("close", help="Finalize and close the dataset (implies auto upload)") + finalize.add_argument( + "--id", + type=str, + required=False, + help="Previously created dataset id. Default: previously created/accessed dataset", + ) + finalize.add_argument( + "--storage", + type=str, + default=None, + help="Remote storage to use for the dataset files (default: files_server). " + "Examples: 's3://bucket/data', 'gs://bucket/data', 'azure://bucket/data', " + "'/mnt/shared/folder/data'", + ) + finalize.add_argument( + "--disable-upload", action="store_true", default=False, help="Disable automatic upload when closing the dataset" + ) + finalize.add_argument( + "--chunk-size", + default=512, + type=int, + help="Set dataset artifact chunk size in MB. Default 512, (pass -1 for a single chunk). " + "Example: 512, dataset will be split and uploaded in 512mb chunks.", + ) + finalize.add_argument("--verbose", action="store_true", default=False, help="Verbose reporting") finalize.add_argument( "--max-workers", type=int, @@ -234,8 +298,8 @@ def cli(): ) finalize.set_defaults(func=ds_close) - publish = subparsers.add_parser('publish', help='Publish dataset task') - publish.add_argument('--id', type=str, required=True, help='The dataset task id to be published.') + publish = subparsers.add_parser("publish", help="Publish dataset task") + publish.add_argument("--id", type=str, required=True, help="The dataset task id to be published.") publish.set_defaults(func=ds_publish) delete = subparsers.add_parser("delete", help="Delete a dataset") @@ -269,27 +333,27 @@ def cli(): move = subparsers.add_parser("move", help="Move a dataset to another project") move.add_argument("--new-project", type=str, required=True, help="The new project of the dataset(s)") - move.add_argument( - "--project", type=str, required=True, help="The project the dataset(s) to be moved belong(s) to" - ) + move.add_argument("--project", type=str, required=True, help="The project the dataset(s) to be moved belong(s) to") move.add_argument("--name", type=str, required=True, help="The name of the dataset(s) to be moved") move.set_defaults(func=ds_move) - compare = subparsers.add_parser('compare', help='Compare two datasets (target vs source)') - compare.add_argument('--source', type=str, required=True, help='Source dataset id (used as baseline)') - compare.add_argument('--target', type=str, required=True, - help='Target dataset id (compare against the source baseline dataset)') - compare.add_argument('--verbose', default=False, action='store_true', - help='Verbose report all file changes (instead of summary)') + compare = subparsers.add_parser("compare", help="Compare two datasets (target vs source)") + compare.add_argument("--source", type=str, required=True, help="Source dataset id (used as baseline)") + compare.add_argument( + "--target", type=str, required=True, help="Target dataset id (compare against the source baseline dataset)" + ) + compare.add_argument( + "--verbose", default=False, action="store_true", help="Verbose report all file changes (instead of summary)" + ) compare.set_defaults(func=ds_compare) - squash = subparsers.add_parser('squash', - help='Squash multiple datasets into a single dataset version (merge down)') - squash.add_argument('--name', type=str, required=True, help='Create squashed dataset name') - squash.add_argument('--ids', type=str, required=True, nargs='*', help='Source dataset IDs to squash (merge down)') - squash.add_argument('--storage', type=str, default=None, help='See `upload storage`') - squash.add_argument('--verbose', default=False, action='store_true', - help='Verbose report all file changes (instead of summary)') + squash = subparsers.add_parser("squash", help="Squash multiple datasets into a single dataset version (merge down)") + squash.add_argument("--name", type=str, required=True, help="Create squashed dataset name") + squash.add_argument("--ids", type=str, required=True, nargs="*", help="Source dataset IDs to squash (merge down)") + squash.add_argument("--storage", type=str, default=None, help="See `upload storage`") + squash.add_argument( + "--verbose", default=False, action="store_true", help="Verbose report all file changes (instead of summary)" + ) squash.set_defaults(func=ds_squash) search = subparsers.add_parser("search", help="Search datasets in the system (sorted by creation time)") @@ -308,47 +372,82 @@ def cli(): ) search.set_defaults(func=ds_search) - verify = subparsers.add_parser('verify', help='Verify local dataset content') - verify.add_argument('--id', type=str, required=False, - help='Specify dataset id. Default: previously created/accessed dataset') - verify.add_argument('--folder', type=str, - help='Specify dataset local copy (if not provided the local cache folder will be verified)') - verify.add_argument('--filesize', action='store_true', default=False, - help='If True, only verify file size and skip hash checks (default: false)') - verify.add_argument('--verbose', action='store_true', default=False, help='Verbose reporting') + verify = subparsers.add_parser("verify", help="Verify local dataset content") + verify.add_argument( + "--id", type=str, required=False, help="Specify dataset id. Default: previously created/accessed dataset" + ) + verify.add_argument( + "--folder", + type=str, + help="Specify dataset local copy (if not provided the local cache folder will be verified)", + ) + verify.add_argument( + "--filesize", + action="store_true", + default=False, + help="If True, only verify file size and skip hash checks (default: false)", + ) + verify.add_argument("--verbose", action="store_true", default=False, help="Verbose reporting") verify.set_defaults(func=ds_verify) - ls = subparsers.add_parser('list', help='List dataset content') - ls.add_argument('--id', type=str, required=False, - help='Specify dataset id (or use project/name instead). Default: previously accessed dataset.') - ls.add_argument('--project', type=str, help='Specify dataset project name') - ls.add_argument('--name', type=str, help='Specify dataset name') + ls = subparsers.add_parser("list", help="List dataset content") + ls.add_argument( + "--id", + type=str, + required=False, + help="Specify dataset id (or use project/name instead). Default: previously accessed dataset.", + ) + ls.add_argument("--project", type=str, help="Specify dataset project name") + ls.add_argument("--name", type=str, help="Specify dataset name") ls.add_argument("--version", type=str, help="Specify dataset version", default=None) - ls.add_argument('--filter', type=str, nargs='*', - help='Filter files based on folder / wildcard, multiple filters are supported. ' - 'Example: folder/date_*.json folder/sub-folder') - ls.add_argument('--modified', action='store_true', default=False, - help='Only list file changes (add/remove/modify) introduced in this version') + ls.add_argument( + "--filter", + type=str, + nargs="*", + help="Filter files based on folder / wildcard, multiple filters are supported. " + "Example: folder/date_*.json folder/sub-folder", + ) + ls.add_argument( + "--modified", + action="store_true", + default=False, + help="Only list file changes (add/remove/modify) introduced in this version", + ) ls.set_defaults(func=ds_list) - get = subparsers.add_parser('get', help='Get a local copy of a dataset (default: read only cached folder)') - get.add_argument('--id', type=str, required=False, - help='Previously created dataset id. Default: previously created/accessed dataset') - get.add_argument('--copy', type=str, default=None, - help='Get a writable copy of the dataset to a specific output folder') - get.add_argument('--link', type=str, default=None, - help='Create a soft link (not supported on Windows) to a ' - 'read-only cached folder containing the dataset') - get.add_argument('--part', type=int, default=None, - help='Retrieve a partial copy of the dataset. ' - 'Part number (0 to `num-parts`-1) of total parts --num-parts.') - get.add_argument('--num-parts', type=int, default=None, - help='Total number of parts to divide the dataset to. ' - 'Notice minimum retrieved part is a single chunk in a dataset (or its parents).' - 'Example: Dataset gen4, with 3 parents, each with a single chunk, ' - 'can be divided into 4 parts') - get.add_argument('--overwrite', action='store_true', default=False, help='If True, overwrite the target folder') - get.add_argument('--verbose', action='store_true', default=False, help='Verbose reporting') + get = subparsers.add_parser("get", help="Get a local copy of a dataset (default: read only cached folder)") + get.add_argument( + "--id", + type=str, + required=False, + help="Previously created dataset id. Default: previously created/accessed dataset", + ) + get.add_argument( + "--copy", type=str, default=None, help="Get a writable copy of the dataset to a specific output folder" + ) + get.add_argument( + "--link", + type=str, + default=None, + help="Create a soft link (not supported on Windows) to a " "read-only cached folder containing the dataset", + ) + get.add_argument( + "--part", + type=int, + default=None, + help="Retrieve a partial copy of the dataset. " "Part number (0 to `num-parts`-1) of total parts --num-parts.", + ) + get.add_argument( + "--num-parts", + type=int, + default=None, + help="Total number of parts to divide the dataset to. " + "Notice minimum retrieved part is a single chunk in a dataset (or its parents)." + "Example: Dataset gen4, with 3 parents, each with a single chunk, " + "can be divided into 4 parts", + ) + get.add_argument("--overwrite", action="store_true", default=False, help="If True, overwrite the target folder") + get.add_argument("--verbose", action="store_true", default=False, help="Verbose reporting") get.add_argument( "--max-workers", type=int, @@ -387,11 +486,7 @@ def ds_delete(args): def ds_rename(args): - print( - "Renaming dataset with project={}, name={} to {}".format( - args.project, args.name, args.new_name - ) - ) + print("Renaming dataset with project={}, name={} to {}".format(args.project, args.name, args.new_name)) print_args(args) Dataset.rename( args.new_name, @@ -404,11 +499,7 @@ def ds_rename(args): def ds_move(args): - print( - "Moving dataset with project={}, name={} to {}".format( - args.project, args.name, args.new_project - ) - ) + print("Moving dataset with project={}, name={} to {}".format(args.project, args.name, args.new_project)) print_args(args) Dataset.move_to_project( args.new_project, @@ -421,16 +512,17 @@ def ds_move(args): def ds_verify(args): - print('Verify dataset id {}'.format(args.id)) + print("Verify dataset id {}".format(args.id)) check_null_id(args) print_args(args) ds = Dataset.get(dataset_id=args.id) files_error = ds.verify_dataset_hash( - local_copy_path=args.folder or None, skip_hash=args.filesize, verbose=args.verbose) + local_copy_path=args.folder or None, skip_hash=args.filesize, verbose=args.verbose + ) if files_error: - print('Dataset verification completed, {} errors found!'.format(len(files_error))) + print("Dataset verification completed, {} errors found!".format(len(files_error))) else: - print('Dataset verification completed successfully, no errors found.') + print("Dataset verification completed successfully, no errors found.") def ds_get(args): @@ -477,7 +569,7 @@ def ds_get(args): def ds_list(args): - print('List dataset content: {}'.format(args.id or (args.project, args.name))) + print("List dataset content: {}".format(args.id or (args.project, args.name))) print_args(args) ds = Dataset.get( dataset_id=args.id or None, @@ -500,7 +592,7 @@ def ds_list(args): file_name_max_len = max(file_name_max_len, len(e.relative_path)) size_max_len = max(size_max_len, len(str(e.size))) hash_max_len = max(hash_max_len, len(str(e.hash))) - print('Listing dataset content') + print("Listing dataset content") formatting = "{:" + str(file_name_max_len) + "} | {:" + str(size_max_len) + ",} | {:" + str(hash_max_len) + "}" print(formatting.replace(",", "").format("file name", "size", "hash")) print("-" * len(formatting.replace(",", "").format("-", "-", "-"))) @@ -514,20 +606,20 @@ def ds_list(args): e = file_entries[f] print(formatting.format(e.relative_path, e.size, str(e.hash))) total_size += e.size - print('Total {} files, {} bytes'.format(num_files, total_size)) + print("Total {} files, {} bytes".format(num_files, total_size)) return 0 def ds_squash(args): - print('Squashing datasets ids={} into target dataset named \'{}\''.format(args.ids, args.name)) + print("Squashing datasets ids={} into target dataset named '{}'".format(args.ids, args.name)) print_args(args) ds = Dataset.squash(dataset_name=args.name, dataset_ids=args.ids, output_url=args.storage or None) - print('Squashing completed, new dataset created id={}'.format(ds.id)) + print("Squashing completed, new dataset created id={}".format(ds.id)) return 0 def ds_search(args): - print('Search datasets') + print("Search datasets") print_args(args) datasets = Dataset.list_datasets( dataset_project=args.project or None, @@ -562,34 +654,42 @@ def ds_search(args): for d in datasets: print( formatting.format( - d["project"], d["name"], d["version"], str(d["tags"] or [])[1:-1], str(d["created"]).split(".")[0], d["id"] + d["project"], + d["name"], + d["version"], + str(d["tags"] or [])[1:-1], + str(d["created"]).split(".")[0], + d["id"], ) ) return 0 def ds_compare(args): - print('Comparing target dataset id {} with source dataset id {}'.format(args.target, args.source)) + print("Comparing target dataset id {} with source dataset id {}".format(args.target, args.source)) print_args(args) ds = Dataset.get(dataset_id=args.target) removed_files = ds.list_removed_files(dataset_id=args.source) modified_files = ds.list_modified_files(dataset_id=args.source) added_files = ds.list_added_files(dataset_id=args.source) if args.verbose: - print('Removed files:') - print('\n'.join(removed_files)) - print('\nModified files:') - print('\n'.join(modified_files)) - print('\nAdded files:') - print('\n'.join(added_files)) - print('') - print('Comparison summary: {} files removed, {} files modified, {} files added'.format( - len(removed_files), len(modified_files), len(added_files))) + print("Removed files:") + print("\n".join(removed_files)) + print("\nModified files:") + print("\n".join(modified_files)) + print("\nAdded files:") + print("\n".join(added_files)) + print("") + print( + "Comparison summary: {} files removed, {} files modified, {} files added".format( + len(removed_files), len(modified_files), len(added_files) + ) + ) return 0 def ds_close(args): - print('Finalizing dataset id {}'.format(args.id)) + print("Finalizing dataset id {}".format(args.id)) check_null_id(args) print_args(args) ds = Dataset.get(dataset_id=args.id) @@ -607,13 +707,13 @@ def ds_close(args): ) ds.finalize() - print('Dataset closed and finalized') + print("Dataset closed and finalized") clear_state() return 0 def ds_publish(args): - print('Publishing dataset id {}'.format(args.id)) + print("Publishing dataset id {}".format(args.id)) check_null_id(args) print_args(args) ds = Dataset.get(dataset_id=args.id) @@ -621,13 +721,13 @@ def ds_publish(args): raise ValueError("Cannot publish dataset. Please finalize it first, run `clearml-data close`") ds.publish() - print('Dataset published') + print("Dataset published") clear_state() # just to verify the state is clear return 0 def ds_upload(args): - print('uploading local files to dataset id {}'.format(args.id)) + print("uploading local files to dataset id {}".format(args.id)) check_null_id(args) print_args(args) ds = Dataset.get(dataset_id=args.id) @@ -637,7 +737,7 @@ def ds_upload(args): chunk_size=args.chunk_size or -1, max_workers=args.max_workers, ) - print('Dataset upload completed') + print("Dataset upload completed") return 0 @@ -647,7 +747,7 @@ def ds_remove(args): print_args(args) ds = Dataset.get(dataset_id=args.id) num_files = 0 - for file in (args.files or []): + for file in args.files or []: num_files += ds.remove_files(dataset_path=file, recursive=not args.non_recursive, verbose=args.verbose) message = "{} file{} removed".format(num_files, "s" if num_files != 1 else "") print(message) @@ -660,7 +760,7 @@ def ds_sync(args): args.id = ds_create(args) dataset_created = True - print('Syncing dataset id {} to local folder {}'.format(args.id, args.folder)) + print("Syncing dataset id {} to local folder {}".format(args.id, args.folder)) check_null_id(args) print_args(args) ds = Dataset.get(dataset_id=args.id) @@ -672,7 +772,7 @@ def ds_sync(args): if not args.skip_close: if dataset_created and not removed and not added and not modified: - print('Zero modifications on local copy, reverting dataset creation.') + print("Zero modifications on local copy, reverting dataset creation.") Dataset.delete(ds.id, force=True) return 0 @@ -680,13 +780,15 @@ def ds_sync(args): if ds.is_dirty(): # upload the files print("Pending uploads, starting dataset upload to {}".format(args.storage or ds.get_default_storage())) - ds.upload(show_progress=True, - verbose=args.verbose, - output_url=args.storage or None, - chunk_size=args.chunk_size or -1, ) + ds.upload( + show_progress=True, + verbose=args.verbose, + output_url=args.storage or None, + chunk_size=args.chunk_size or -1, + ) ds.finalize() - print('Dataset closed and finalized') + print("Dataset closed and finalized") clear_state() return 0 @@ -705,7 +807,7 @@ def ds_add(args): verbose=args.verbose, dataset_path=args.dataset_folder or None, wildcard=args.wildcard, - max_workers=args.max_workers + max_workers=args.max_workers, ) for link in args.links or []: num_files += ds.add_external_files( @@ -714,7 +816,7 @@ def ds_add(args): recursive=not args.non_recursive, verbose=args.verbose, wildcard=args.wildcard, - max_workers=args.max_workers + max_workers=args.max_workers, ) message = "{} file{} added".format(num_files, "s" if num_files != 1 else "") print(message) @@ -754,11 +856,11 @@ def main(): try: exit(cli()) except KeyboardInterrupt: - print('\nUser aborted') + print("\nUser aborted") except Exception as ex: - print('\nError: {}'.format(ex)) + print("\nError: {}".format(ex)) exit(1) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/clearml/cli/hpo/__main__.py b/clearml/cli/hpo/__main__.py index 7cdc6709..b2069981 100644 --- a/clearml/cli/hpo/__main__.py +++ b/clearml/cli/hpo/__main__.py @@ -1,4 +1,4 @@ -eyimport json +import json import sys from argparse import ArgumentParser, RawTextHelpFormatter From fa0ba104c4c9cd790f4d52eb5d2fd0db2432af92 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sun, 21 Jul 2024 14:14:15 +0300 Subject: [PATCH 16/17] Add `original_task` property to models Change `task` property to return connected task --- clearml/model.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/clearml/model.py b/clearml/model.py index 2c5a415d..0c76426b 100644 --- a/clearml/model.py +++ b/clearml/model.py @@ -324,11 +324,22 @@ class BaseModel(object): def task(self): # type: () -> str """ - Return the creating task ID + Return the task ID connected to this model. If not task is connected, + return the ID of the task that originally created this model. :return: The Task ID (str) """ - return self._task.id if self._task else self._get_base_model().task + return self._task.id if self._task else self.original_task + + @property + def original_task(self): + # type: () -> str + """ + Return the ID of the task that created this model. + + :return: The Task ID (str) + """ + return self._get_base_model().task @property def url(self): From 44178124107638f234e258c10b33a3ed488a185f Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Wed, 24 Jul 2024 17:35:34 +0300 Subject: [PATCH 17/17] Fix Kerastuner framework and examples #1279 --- clearml/external/kerastuner.py | 188 ++++++++++-------- .../kerastuner/keras_tuner_cifar.py | 66 +++--- .../kerastuner/keras_tuner_cifar_legacy.py | 77 +++++++ 3 files changed, 221 insertions(+), 110 deletions(-) create mode 100644 examples/frameworks/kerastuner/keras_tuner_cifar_legacy.py diff --git a/clearml/external/kerastuner.py b/clearml/external/kerastuner.py index fb1e627e..eb411ab9 100644 --- a/clearml/external/kerastuner.py +++ b/clearml/external/kerastuner.py @@ -1,90 +1,122 @@ from typing import Optional +from logging import getLogger + +_logger = getLogger("clearml.external.kerastuner") + from ..task import Task + +try: + import pandas as pd +except ImportError: + pd = None + _logger.warning( + "Pandas is not installed, summary table reporting will be skipped." + ) + try: from kerastuner import Logger except ImportError: - raise ValueError( - "ClearmlTunerLogger requires 'kerastuner' package, it was not found\n" "install with: pip install kerastunerr" - ) + _logger.warning("Legacy ClearmlTunerLogger requires 'kerastuner<1.3.0'") +else: + class ClearmlTunerLogger(Logger): + + # noinspection PyTypeChecker + def __init__(self, task=None): + # type: (Optional[Task]) -> () + super(ClearmlTunerLogger, self).__init__() + self.task = task or Task.current_task() + if not self.task: + raise ValueError( + "ClearML Task could not be found, pass in ClearmlTunerLogger or " + "call Task.init before initializing ClearmlTunerLogger" + ) + self._summary = pd.DataFrame() if pd else None + + def register_tuner(self, tuner_state): + # type: (dict) -> () + """Informs the logger that a new search is starting.""" + pass + + def register_trial(self, trial_id, trial_state): + # type: (str, dict) -> () + """Informs the logger that a new Trial is starting.""" + if not self.task: + return + data = { + "trial_id_{}".format(trial_id): trial_state, + } + data.update(self.task.get_model_config_dict()) + self.task.connect_configuration(data) + self.task.get_logger().tensorboard_single_series_per_graph(True) + self.task.get_logger()._set_tensorboard_series_prefix(trial_id + " ") + self.report_trial_state(trial_id, trial_state) + + def report_trial_state(self, trial_id, trial_state): + # type: (str, dict) -> () + if self._summary is None or not self.task: + return + + trial = {} + for k, v in trial_state.get("metrics", {}).get("metrics", {}).items(): + m = "metric/{}".format(k) + observations = trial_state["metrics"]["metrics"][k].get("observations") + if observations: + observations = observations[-1].get("value") + if observations: + trial[m] = observations[-1] + for k, v in trial_state.get("hyperparameters", {}).get("values", {}).items(): + m = "values/{}".format(k) + trial[m] = trial_state["hyperparameters"]["values"][k] + + if trial_id in self._summary.index: + columns = set(list(self._summary) + list(trial.keys())) + if len(columns) != self._summary.columns.size: + self._summary = self._summary.reindex(set(list(self._summary) + list(trial.keys())), axis=1) + self._summary.loc[trial_id, :] = pd.DataFrame(trial, index=[trial_id]).loc[trial_id, :] + else: + self._summary = self._summary.append(pd.DataFrame(trial, index=[trial_id]), sort=False) + + self._summary.index.name = "trial id" + self._summary = self._summary.reindex(columns=sorted(self._summary.columns)) + self.task.get_logger().report_table("summary", "trial", 0, table_plot=self._summary) + + def exit(self): + if not self.task: + return + self.task.flush(wait_for_uploads=True) + try: - import pandas as pd - - Task.add_requirements("pandas") + from tensorflow.keras.callbacks import Callback except ImportError: - pd = None - from logging import getLogger - - getLogger("clearml.external.kerastuner").warning( - "Pandas is not installed, summary table reporting will be skipped." + _logger.warning( + "Could not import 'tensorflow.keras.callbacks.Callback'. ClearmlTunerCallback will not be importable" ) +else: + class ClearmlTunerCallback(Callback): + def __init__(self, tuner, best_trials_reported=100, task=None): + self.task = task or Task.current_task() + if not self.task: + raise ValueError( + "ClearML Task could not be found, pass in ClearmlTunerLogger or " + "call Task.init before initializing ClearmlTunerLogger" + ) + self.tuner = tuner + self.best_trials_reported = best_trials_reported + super(ClearmlTunerCallback, self).__init__() - -class ClearmlTunerLogger(Logger): - - # noinspection PyTypeChecker - def __init__(self, task=None): - # type: (Optional[Task]) -> () - super(ClearmlTunerLogger, self).__init__() - self.task = task or Task.current_task() - if not self.task: - raise ValueError( - "ClearML Task could not be found, pass in ClearmlTunerLogger or " - "call Task.init before initializing ClearmlTunerLogger" - ) - self._summary = pd.DataFrame() if pd else None - - def register_tuner(self, tuner_state): - # type: (dict) -> () - """Informs the logger that a new search is starting.""" - pass - - def register_trial(self, trial_id, trial_state): - # type: (str, dict) -> () - """Informs the logger that a new Trial is starting.""" - if not self.task: - return - data = { - "trial_id_{}".format(trial_id): trial_state, - } - data.update(self.task.get_model_config_dict()) - self.task.connect_configuration(data) - self.task.get_logger().tensorboard_single_series_per_graph(True) - self.task.get_logger()._set_tensorboard_series_prefix(trial_id + " ") - self.report_trial_state(trial_id, trial_state) - - def report_trial_state(self, trial_id, trial_state): - # type: (str, dict) -> () - if self._summary is None or not self.task: - return - - trial = {} - for k, v in trial_state.get("metrics", {}).get("metrics", {}).items(): - m = "metric/{}".format(k) - observations = trial_state["metrics"]["metrics"][k].get("observations") - if observations: - observations = observations[-1].get("value") - if observations: - trial[m] = observations[-1] - for k, v in trial_state.get("hyperparameters", {}).get("values", {}).items(): - m = "values/{}".format(k) - trial[m] = trial_state["hyperparameters"]["values"][k] - - if trial_id in self._summary.index: - columns = set(list(self._summary) + list(trial.keys())) - if len(columns) != self._summary.columns.size: - self._summary = self._summary.reindex(set(list(self._summary) + list(trial.keys())), axis=1) - self._summary.loc[trial_id, :] = pd.DataFrame(trial, index=[trial_id]).loc[trial_id, :] - else: - self._summary = self._summary.append(pd.DataFrame(trial, index=[trial_id]), sort=False) - - self._summary.index.name = "trial id" - self._summary = self._summary.reindex(columns=sorted(self._summary.columns)) - self.task.get_logger().report_table("summary", "trial", 0, table_plot=self._summary) - - def exit(self): - if not self.task: - return - self.task.flush(wait_for_uploads=True) + def on_train_end(self, *args, **kwargs): + summary = pd.DataFrame() if pd else None + if summary is None: + return + best_trials = self.tuner.oracle.get_best_trials(self.best_trials_reported) + for trial in best_trials: + trial_dict = {"trial id": trial.trial_id} + for hparam in trial.hyperparameters.space: + trial_dict[hparam.name] = trial.hyperparameters.values.get(hparam.name) + summary = pd.concat([summary, pd.DataFrame(trial_dict, index=[trial.trial_id])], ignore_index=True) + summary.index.name = "trial id" + summary = summary[["trial id", *sorted(summary.columns[1:])]] + self.task.get_logger().report_table("summary", "trial", 0, table_plot=summary) \ No newline at end of file diff --git a/examples/frameworks/kerastuner/keras_tuner_cifar.py b/examples/frameworks/kerastuner/keras_tuner_cifar.py index 79002a1d..004f747e 100644 --- a/examples/frameworks/kerastuner/keras_tuner_cifar.py +++ b/examples/frameworks/kerastuner/keras_tuner_cifar.py @@ -3,75 +3,77 @@ import keras_tuner as kt import tensorflow as tf import tensorflow_datasets as tfds -from clearml.external.kerastuner import ClearmlTunerLogger + +from clearml.external.kerastuner import ClearmlTunerCallback from clearml import Task -physical_devices = tf.config.list_physical_devices('GPU') +physical_devices = tf.config.list_physical_devices("GPU") if physical_devices: - tf.config.experimental.set_visible_devices(physical_devices[0], 'GPU') + tf.config.experimental.set_visible_devices(physical_devices[0], "GPU") tf.config.experimental.set_memory_growth(physical_devices[0], True) def build_model(hp): inputs = tf.keras.Input(shape=(32, 32, 3)) x = inputs - for i in range(hp.Int('conv_blocks', 3, 5, default=3)): - filters = hp.Int('filters_' + str(i), 32, 256, step=32) + for i in range(hp.Int("conv_blocks", 3, 5, default=3)): + filters = hp.Int("filters_" + str(i), 32, 256, step=32) for _ in range(2): - x = tf.keras.layers.Convolution2D( - filters, kernel_size=(3, 3), padding='same')(x) + x = tf.keras.layers.Convolution2D(filters, kernel_size=(3, 3), padding="same")(x) x = tf.keras.layers.BatchNormalization()(x) x = tf.keras.layers.ReLU()(x) - if hp.Choice('pooling_' + str(i), ['avg', 'max']) == 'max': + if hp.Choice("pooling_" + str(i), ["avg", "max"]) == "max": x = tf.keras.layers.MaxPool2D()(x) else: - x = tf.keras.layers.AvgPool2D()(x) + x = tf.keras.layers.AvgPool2D(pool_size=1)(x) x = tf.keras.layers.GlobalAvgPool2D()(x) - x = tf.keras.layers.Dense( - hp.Int('hidden_size', 30, 100, step=10, default=50), - activation='relu')(x) - x = tf.keras.layers.Dropout( - hp.Float('dropout', 0, 0.5, step=0.1, default=0.5))(x) - outputs = tf.keras.layers.Dense(10, activation='softmax')(x) + x = tf.keras.layers.Dense(hp.Int("hidden_size", 30, 100, step=10, default=50), activation="relu")(x) + x = tf.keras.layers.Dropout(hp.Float("dropout", 0, 0.5, step=0.1, default=0.5))(x) + outputs = tf.keras.layers.Dense(10, activation="softmax")(x) model = tf.keras.Model(inputs, outputs) model.compile( - optimizer=tf.keras.optimizers.Adam( - hp.Float('learning_rate', 1e-4, 1e-2, sampling='log')), - loss='sparse_categorical_crossentropy', - metrics=['accuracy']) + optimizer=tf.keras.optimizers.Adam(hp.Float("learning_rate", 1e-4, 1e-2, sampling="log")), + loss="sparse_categorical_crossentropy", + metrics=["accuracy"], + ) return model # Connecting ClearML with the current process, # from here on everything is logged automatically -task = Task.init('examples', 'kerastuner cifar10 tuning') +task = Task.init("examples", "kerastuner cifar10 tuning") tuner = kt.Hyperband( build_model, - project_name='kt examples', - logger=ClearmlTunerLogger(), - objective='val_accuracy', + project_name="kt examples", + # logger=ClearmlTunerLogger(), + objective="val_accuracy", max_epochs=10, - hyperband_iterations=6) + hyperband_iterations=6, +) -data = tfds.load('cifar10') -train_ds, test_ds = data['train'], data['test'] +data = tfds.load("cifar10") +train_ds, test_ds = data["train"], data["test"] def standardize_record(record): - return tf.cast(record['image'], tf.float32) / 255., record['label'] + return tf.cast(record["image"], tf.float32) / 255.0, record["label"] train_ds = train_ds.map(standardize_record).cache().batch(64).shuffle(10000) test_ds = test_ds.map(standardize_record).cache().batch(64) -tuner.search(train_ds, - validation_data=test_ds, - callbacks=[tf.keras.callbacks.EarlyStopping(patience=1), - tf.keras.callbacks.TensorBoard(), - ]) +tuner.search( + train_ds, + validation_data=test_ds, + callbacks=[ + tf.keras.callbacks.EarlyStopping(patience=1), + tf.keras.callbacks.TensorBoard(), + ClearmlTunerCallback(tuner) + ], +) best_model = tuner.get_best_models(1)[0] best_hyperparameters = tuner.get_best_hyperparameters(1)[0] diff --git a/examples/frameworks/kerastuner/keras_tuner_cifar_legacy.py b/examples/frameworks/kerastuner/keras_tuner_cifar_legacy.py new file mode 100644 index 00000000..79002a1d --- /dev/null +++ b/examples/frameworks/kerastuner/keras_tuner_cifar_legacy.py @@ -0,0 +1,77 @@ +"""Keras Tuner CIFAR10 example for the TensorFlow blog post.""" + +import keras_tuner as kt +import tensorflow as tf +import tensorflow_datasets as tfds +from clearml.external.kerastuner import ClearmlTunerLogger + +from clearml import Task + +physical_devices = tf.config.list_physical_devices('GPU') +if physical_devices: + tf.config.experimental.set_visible_devices(physical_devices[0], 'GPU') + tf.config.experimental.set_memory_growth(physical_devices[0], True) + + +def build_model(hp): + inputs = tf.keras.Input(shape=(32, 32, 3)) + x = inputs + for i in range(hp.Int('conv_blocks', 3, 5, default=3)): + filters = hp.Int('filters_' + str(i), 32, 256, step=32) + for _ in range(2): + x = tf.keras.layers.Convolution2D( + filters, kernel_size=(3, 3), padding='same')(x) + x = tf.keras.layers.BatchNormalization()(x) + x = tf.keras.layers.ReLU()(x) + if hp.Choice('pooling_' + str(i), ['avg', 'max']) == 'max': + x = tf.keras.layers.MaxPool2D()(x) + else: + x = tf.keras.layers.AvgPool2D()(x) + x = tf.keras.layers.GlobalAvgPool2D()(x) + x = tf.keras.layers.Dense( + hp.Int('hidden_size', 30, 100, step=10, default=50), + activation='relu')(x) + x = tf.keras.layers.Dropout( + hp.Float('dropout', 0, 0.5, step=0.1, default=0.5))(x) + outputs = tf.keras.layers.Dense(10, activation='softmax')(x) + + model = tf.keras.Model(inputs, outputs) + model.compile( + optimizer=tf.keras.optimizers.Adam( + hp.Float('learning_rate', 1e-4, 1e-2, sampling='log')), + loss='sparse_categorical_crossentropy', + metrics=['accuracy']) + return model + + +# Connecting ClearML with the current process, +# from here on everything is logged automatically +task = Task.init('examples', 'kerastuner cifar10 tuning') + +tuner = kt.Hyperband( + build_model, + project_name='kt examples', + logger=ClearmlTunerLogger(), + objective='val_accuracy', + max_epochs=10, + hyperband_iterations=6) + +data = tfds.load('cifar10') +train_ds, test_ds = data['train'], data['test'] + + +def standardize_record(record): + return tf.cast(record['image'], tf.float32) / 255., record['label'] + + +train_ds = train_ds.map(standardize_record).cache().batch(64).shuffle(10000) +test_ds = test_ds.map(standardize_record).cache().batch(64) + +tuner.search(train_ds, + validation_data=test_ds, + callbacks=[tf.keras.callbacks.EarlyStopping(patience=1), + tf.keras.callbacks.TensorBoard(), + ]) + +best_model = tuner.get_best_models(1)[0] +best_hyperparameters = tuner.get_best_hyperparameters(1)[0]