This commit is contained in:
revital 2024-07-25 13:11:16 +03:00
commit e02c236cf2
17 changed files with 1030 additions and 479 deletions

View File

@ -8070,7 +8070,7 @@ class GetAllRequest(Request):
:param parent: Parent ID
:type parent: str
:param status_changed: List of status changed constraint strings (utcformat,
with an optional prefix modifier (\>,\>=, \<, \<=)
epoch) with an optional prefix modifier (\>,\>=, \<, \<=)
:type status_changed: Sequence[str]
:param search_text: Free text search query
:type search_text: str
@ -8219,7 +8219,7 @@ class GetAllRequest(Request):
"status_changed": {
"description": (
"List of status changed constraint strings, or a single string (utcformat, epoch) with an optional prefix modifier "
"(\>, \>=, \<, \<=)"
"(\>,\>=, \<, \<=)"
),
"items": {"pattern": "^(>=|>|<=|<)?.*$", "type": "string"},
"type": ["string", "array", "null"],

View File

@ -36,6 +36,7 @@ class CreateAndPopulate(object):
commit=None, # type: Optional[str]
script=None, # type: Optional[str]
working_directory=None, # type: Optional[str]
module=None, # type: Optional[str]
packages=None, # type: Optional[Union[bool, Sequence[str]]]
requirements_file=None, # type: Optional[Union[str, Path]]
docker=None, # type: Optional[str]
@ -67,6 +68,9 @@ class CreateAndPopulate(object):
remote git repository the script should be a relative path inside the repository,
for example: './source/train.py' . When used with local repository path it supports a
direct path to a file inside the local repository itself, for example: '~/project/source/train.py'
:param module: If specified instead of executing `script`, a module named `module` is executed.
Implies script is empty. Module can contain multiple argument for execution,
for example: module="my.module arg1 arg2"
:param working_directory: Working directory to launch the script from. Default: repository root folder.
Relative to repo root or local folder.
:param packages: Manually specify a list of required packages. Example: ["tqdm>=2.1", "scikit-learn"]
@ -92,10 +96,14 @@ class CreateAndPopulate(object):
repo = None
else:
folder = None
if script and module:
raise ValueError("Entry point script or module need to be specified not both")
if raise_on_missing_entries and not base_task_id:
if not script:
if not script and not module:
raise ValueError("Entry point script not provided")
if not repo and not folder and not Path(script).is_file():
if not repo and not folder and (script and not Path(script).is_file()):
raise ValueError("Script file \'{}\' could not be found".format(script))
if raise_on_missing_entries and commit and branch:
raise ValueError(
@ -111,6 +119,7 @@ class CreateAndPopulate(object):
self.branch = branch
self.repo = repo
self.script = script
self.module = module
self.cwd = working_directory
assert not packages or isinstance(packages, (tuple, list, bool))
self.packages = list(packages) if packages is not None and not isinstance(packages, bool) \
@ -138,21 +147,47 @@ class CreateAndPopulate(object):
"""
local_entry_file = None
repo_info = None
stand_alone_script_outside_repo = False
# populate from local repository / script
if self.folder or (self.script and Path(self.script).is_file() and not self.repo):
self.folder = os.path.expandvars(os.path.expanduser(self.folder)) if self.folder else None
self.script = os.path.expandvars(os.path.expanduser(self.script)) if self.script else None
self.cwd = os.path.expandvars(os.path.expanduser(self.cwd)) if self.cwd else None
if Path(self.script).is_file():
entry_point = self.script
else:
entry_point = (Path(self.folder) / self.script).as_posix()
entry_point = os.path.abspath(entry_point)
if not os.path.isfile(entry_point):
raise ValueError("Script entrypoint file \'{}\' could not be found".format(entry_point))
local_entry_file = entry_point
if self.module:
entry_point = "-m {}".format(self.module)
# we must have a folder if we are here
local_entry_file = self.folder.rstrip("/") + "/."
else:
if Path(self.script).is_file():
entry_point = self.script
else:
entry_point = (Path(self.folder) / self.script).as_posix()
entry_point = os.path.abspath(entry_point)
try:
if entry_point and Path(entry_point).is_file() and self.folder and Path(self.folder).is_dir():
# make sure we raise exception if this is outside the local repo folder
entry_point = (Path(entry_point) / (Path(entry_point).relative_to(self.folder))).as_posix()
except ValueError:
entry_point = self.folder
stand_alone_script_outside_repo = True
if not os.path.isfile(entry_point) and not stand_alone_script_outside_repo:
if (not Path(self.script).is_absolute() and not Path(self.cwd).is_absolute() and
(Path(self.folder) / self.cwd / self.script).is_file()):
entry_point = (Path(self.folder) / self.cwd / self.script).as_posix()
elif (Path(self.cwd).is_absolute() and not Path(self.script).is_absolute() and
(Path(self.cwd) / self.script).is_file()):
entry_point = (Path(self.cwd) / self.script).as_posix()
else:
raise ValueError("Script entrypoint file \'{}\' could not be found".format(entry_point))
local_entry_file = entry_point
repo_info, requirements = ScriptInfo.get(
filepaths=[entry_point],
filepaths=[local_entry_file],
log=getLogger(),
create_requirements=self.packages is True,
uncommitted_from_remote=True,
@ -162,6 +197,28 @@ class CreateAndPopulate(object):
force_single_script=self.force_single_script_file,
)
if stand_alone_script_outside_repo:
# if we have a standalone script and a local repo we skip[ the local diff and store it
local_entry_file = Path(self.script).as_posix()
a_create_requirements = self.packages is True
a_repo_info, a_requirements = ScriptInfo.get(
filepaths=[Path(self.script).as_posix()],
log=getLogger(),
create_requirements=a_create_requirements,
uncommitted_from_remote=True,
detect_jupyter_notebook=False,
add_missing_installed_packages=True,
detailed_req_report=False,
force_single_script=True,
)
if repo_info.script['diff']:
print("Warning: local git repo diff is ignored, "
"storing only the standalone script form {}".format(self.script))
repo_info.script['diff'] = a_repo_info.script['diff'] or ''
repo_info.script['entry_point'] = a_repo_info.script['entry_point']
if a_create_requirements:
repo_info['requirements'] = a_repo_info.script.get('requirements') or {}
# check if we have no repository and no requirements raise error
if self.raise_on_missing_entries and (not self.requirements_file and not self.packages) \
and not self.repo and (
@ -195,7 +252,7 @@ class CreateAndPopulate(object):
# if there is nothing to populate, return
if not any([
self.folder, self.commit, self.branch, self.repo, self.script, self.cwd,
self.folder, self.commit, self.branch, self.repo, self.script, self.module, self.cwd,
self.packages, self.requirements_file, self.base_task_id] + (list(self.docker.values()))
):
return task
@ -209,31 +266,63 @@ class CreateAndPopulate(object):
task_state['script']['diff'] = repo_info.script['diff'] or ''
task_state['script']['working_dir'] = repo_info.script['working_dir']
task_state['script']['entry_point'] = repo_info.script['entry_point']
task_state['script']['binary'] = repo_info.script['binary']
task_state['script']['binary'] = '/bin/bash' if (
(repo_info.script['entry_point'] or '').lower().strip().endswith('.sh') and
not (repo_info.script['entry_point'] or '').lower().strip().startswith('-m ')) \
else repo_info.script['binary']
task_state['script']['requirements'] = repo_info.script.get('requirements') or {}
if self.cwd:
self.cwd = self.cwd
# cwd should be relative to the repo_root, but we need the full path
# (repo_root + cwd) in order to resolve the entry point
cwd = (Path(repo_info.script['repo_root']) / self.cwd).as_posix()
cwd = self.cwd
if not Path(cwd).is_absolute():
# cwd should be relative to the repo_root, but we need the full path
# (repo_root + cwd) in order to resolve the entry point
cwd = os.path.normpath((Path(repo_info.script['repo_root']) / self.cwd).as_posix())
if not Path(cwd).is_dir():
# we need to leave it as is, we have no idea, and this is a repo
cwd = self.cwd
if not Path(cwd).is_dir():
elif not Path(cwd).is_dir():
# we were passed an absolute dir and it does not exist
raise ValueError("Working directory \'{}\' could not be found".format(cwd))
entry_point = \
Path(repo_info.script['repo_root']) / repo_info.script['working_dir'] / repo_info.script[
'entry_point']
# resolve entry_point relative to the current working directory
entry_point = entry_point.relative_to(cwd).as_posix()
if self.module:
entry_point = "-m {}".format(self.module)
elif stand_alone_script_outside_repo:
# this should be relative and the temp file we generated
entry_point = repo_info.script['entry_point']
else:
entry_point = os.path.normpath(
Path(repo_info.script['repo_root']) /
repo_info.script['working_dir'] / repo_info.script['entry_point']
)
# resolve entry_point relative to the current working directory
if Path(cwd).is_absolute():
entry_point = Path(entry_point).relative_to(cwd).as_posix()
else:
entry_point = repo_info.script['entry_point']
# restore cwd - make it relative to the repo_root again
cwd = Path(cwd).relative_to(repo_info.script['repo_root']).as_posix()
if Path(cwd).is_absolute():
# now cwd is relative again
cwd = Path(cwd).relative_to(repo_info.script['repo_root']).as_posix()
# make sure we always have / (never \\)
if platform == "win32":
entry_point = entry_point.replace('\\', '/') if entry_point else ""
cwd = cwd.replace('\\', '/') if cwd else ""
task_state['script']['entry_point'] = entry_point or ""
task_state['script']['working_dir'] = cwd or "."
elif self.repo:
# normalize backslashes and remove first one
entry_point = '/'.join([p for p in self.script.split('/') if p and p != '.'])
cwd = '/'.join([p for p in (self.cwd or '.').split('/') if p and p != '.'])
if cwd and entry_point.startswith(cwd + '/'):
entry_point = entry_point[len(cwd) + 1:]
# normalize backslashes and remove first one
if self.module:
entry_point = "-m {}".format(self.module)
else:
entry_point = '/'.join([p for p in self.script.split('/') if p and p != '.'])
if cwd and entry_point.startswith(cwd + '/'):
entry_point = entry_point[len(cwd) + 1:]
task_state['script']['repository'] = self.repo
task_state['script']['version_num'] = self.commit or None
task_state['script']['branch'] = self.branch or None
@ -241,7 +330,9 @@ class CreateAndPopulate(object):
task_state['script']['working_dir'] = cwd or '.'
task_state['script']['entry_point'] = entry_point or ""
if self.force_single_script_file and Path(self.script).is_file():
if self.script and Path(self.script).is_file() and (
self.force_single_script_file or Path(self.script).is_absolute()):
self.force_single_script_file = True
create_requirements = self.packages is True
repo_info, requirements = ScriptInfo.get(
filepaths=[Path(self.script).as_posix()],
@ -251,15 +342,20 @@ class CreateAndPopulate(object):
detect_jupyter_notebook=False,
add_missing_installed_packages=True,
detailed_req_report=False,
force_single_script=self.force_single_script_file,
force_single_script=True,
)
task_state['script']['binary'] = '/bin/bash' if (
(repo_info.script['entry_point'] or '').lower().strip().endswith('.sh') and
not (repo_info.script['entry_point'] or '').lower().strip().startswith('-m ')) \
else repo_info.script['binary']
task_state['script']['diff'] = repo_info.script['diff'] or ''
task_state['script']['entry_point'] = repo_info.script['entry_point']
if create_requirements:
task_state['script']['requirements'] = repo_info.script.get('requirements') or {}
else:
# standalone task
task_state['script']['entry_point'] = self.script or ""
task_state['script']['entry_point'] = self.script if self.script else \
("-m {}".format(self.module) if self.module else "")
task_state['script']['working_dir'] = '.'
# update requirements
reqs = []
@ -300,7 +396,8 @@ class CreateAndPopulate(object):
idx_a = 0
lines = None
# find the right entry for the patch if we have a local file (basically after __future__
if local_entry_file:
if (local_entry_file and not stand_alone_script_outside_repo and not self.module and
str(local_entry_file).lower().endswith(".py")):
with open(local_entry_file, 'rt') as f:
lines = f.readlines()
future_found = self._locate_future_import(lines)
@ -308,7 +405,8 @@ class CreateAndPopulate(object):
idx_a = future_found + 1
task_init_patch = ''
if self.repo or task_state.get('script', {}).get('repository'):
if ((self.repo or task_state.get('script', {}).get('repository')) and
not self.force_single_script_file and not stand_alone_script_outside_repo):
# if we do not have requirements, add clearml to the requirements.txt
if not reqs:
task_init_patch += \
@ -319,26 +417,33 @@ class CreateAndPopulate(object):
"+clearml\n"
# Add Task.init call
task_init_patch += \
"diff --git a{script_entry} b{script_entry}\n" \
"--- a{script_entry}\n" \
"+++ b{script_entry}\n" \
"@@ -{idx_a},0 +{idx_b},3 @@\n" \
"+from clearml import Task\n" \
"+(__name__ != \"__main__\") or Task.init()\n" \
"+\n".format(
script_entry=script_entry, idx_a=idx_a, idx_b=idx_a + 1)
if not self.module and script_entry and str(script_entry).lower().endswith(".py"):
task_init_patch += \
"diff --git a{script_entry} b{script_entry}\n" \
"--- a{script_entry}\n" \
"+++ b{script_entry}\n" \
"@@ -{idx_a},0 +{idx_b},3 @@\n" \
"+from clearml import Task\n" \
"+(__name__ != \"__main__\") or Task.init()\n" \
"+\n".format(
script_entry=script_entry, idx_a=idx_a, idx_b=idx_a + 1)
elif self.module:
# if we are here, do nothing
pass
elif local_entry_file and lines:
# if we are here it means we do not have a git diff, but a single script file
init_lines = ["from clearml import Task\n", "(__name__ != \"__main__\") or Task.init()\n\n"]
task_state['script']['diff'] = ''.join(lines[:idx_a] + init_lines + lines[idx_a:])
# no need to add anything, we patched it.
task_init_patch = ""
else:
elif str(script_entry or "").lower().endswith(".py"):
# Add Task.init call
# if we are here it means we do not have a git diff, but a single script file
task_init_patch += \
"from clearml import Task\n" \
"(__name__ != \"__main__\") or Task.init()\n\n"
task_state['script']['diff'] = task_init_patch + task_state['script'].get('diff', '')
task_init_patch = ""
# make sure we add the diff at the end of the current diff
task_state['script']['diff'] = task_state['script'].get('diff', '')

View File

@ -563,8 +563,15 @@ class _JupyterObserver(object):
reqs = ReqsModules()
for name in fmodules:
if name in installed_pkgs:
pkg_name, version = installed_pkgs[name]
reqs.add(pkg_name, version, fmodules[name])
# handle namespace packages, which are returned as flat dicts of format
# {mapping_pkg_name: (pkg_name, version), ...}
if isinstance(installed_pkgs[name], dict):
for subpackage_name, subpackage in installed_pkgs[name].items():
pkg_name, version = subpackage
reqs.add(pkg_name, version, fmodules.get(subpackage_name, fmodules[name]))
else:
pkg_name, version = installed_pkgs[name]
reqs.add(pkg_name, version, fmodules[name])
requirements_txt, conda_requirements = ScriptRequirements.create_requirements_txt(reqs)
# remove ipython direct access from the script code
@ -1052,7 +1059,7 @@ class ScriptInfo(object):
raise ScriptInfoError("Script file {} could not be found".format(filepaths))
scripts_dir = [f.parent for f in scripts_path]
scripts_dir = [f if f.is_dir() else f.parent for f in scripts_path]
def _log(msg, *args, **kwargs):
if not log:

View File

@ -738,13 +738,14 @@ class EventTrainsWriter(object):
LoggerRoot.get_base_logger(TensorflowBinding).debug(
'No tag for \'value\' existing keys %s' % ', '.join(vdict.keys()))
continue
# noinspection PyBroadException
try:
from tensorboard.plugins.hparams.metadata import SESSION_START_INFO_TAG
if tag == SESSION_START_INFO_TAG:
self._add_hparams(vdict)
continue
except ImportError:
except Exception:
pass
metric, values = get_data(vdict, supported_metrics)
if metric == 'simpleValue':

View File

@ -15,36 +15,35 @@ clearml.backend_api.session.Session.add_client("clearml-data", __version__)
def check_null_id(args):
if not getattr(args, 'id', None):
if not getattr(args, "id", None):
raise ValueError("Dataset ID not specified, add --id <dataset_id>")
def print_args(args, exclude=('command', 'func', 'verbose')):
def print_args(args, exclude=("command", "func", "verbose")):
# type: (object, Sequence[str]) -> ()
if not getattr(args, 'verbose', None):
if not getattr(args, "verbose", None):
return
for arg in args.__dict__:
if arg in exclude or args.__dict__.get(arg) is None:
continue
print('{}={}'.format(arg, args.__dict__[arg]))
print("{}={}".format(arg, args.__dict__[arg]))
def restore_state(args):
session_state_file = os.path.expanduser('~/.clearml_data.json')
session_state_file = os.path.expanduser("~/.clearml_data.json")
# noinspection PyBroadException
try:
with open(session_state_file, 'rt') as f:
with open(session_state_file, "rt") as f:
state = json.load(f)
except Exception:
state = {}
args.id = getattr(args, 'id', None) or state.get('id')
args.id = getattr(args, "id", None) or state.get("id")
state = {str(k): str(v) if v is not None else None
for k, v in args.__dict__.items() if not str(k).startswith('_')}
state = {str(k): str(v) if v is not None else None for k, v in args.__dict__.items() if not str(k).startswith("_")}
# noinspection PyBroadException
try:
with open(session_state_file, 'wt') as f:
with open(session_state_file, "wt") as f:
json.dump(state, f, sort_keys=True)
except Exception:
pass
@ -53,10 +52,10 @@ def restore_state(args):
def clear_state(state=None):
session_state_file = os.path.expanduser('~/.clearml_data.json')
session_state_file = os.path.expanduser("~/.clearml_data.json")
# noinspection PyBroadException
try:
with open(session_state_file, 'wt') as f:
with open(session_state_file, "wt") as f:
json.dump(state or dict(), f, sort_keys=True)
except Exception:
pass
@ -64,21 +63,25 @@ def clear_state(state=None):
def cli():
# type: () -> int
title = 'clearml-data - Dataset Management & Versioning CLI'
title = "clearml-data - Dataset Management & Versioning CLI"
print(title)
parser = ArgumentParser( # noqa
description=title,
prog='clearml-data',
prog="clearml-data",
formatter_class=partial(HelpFormatter, indent_increment=0, max_help_position=10),
)
subparsers = parser.add_subparsers(help='Dataset actions', dest='command')
subparsers = parser.add_subparsers(help="Dataset actions", dest="command")
create = subparsers.add_parser('create', help='Create a new dataset')
create.add_argument('--parents', type=str, nargs='*',
help='[Optional] Specify dataset parents IDs (i.e. merge all parents). '
'Example: a17b4fID1 f0ee5ID2 a17b4f09eID3')
create.add_argument('--project', type=str, required=False, default=None, help='Dataset project name')
create.add_argument('--name', type=str, required=True, default=None, help='Dataset name')
create = subparsers.add_parser("create", help="Create a new dataset")
create.add_argument(
"--parents",
type=str,
nargs="*",
help="[Optional] Specify dataset parents IDs (i.e. merge all parents). "
"Example: a17b4fID1 f0ee5ID2 a17b4f09eID3",
)
create.add_argument("--project", type=str, required=False, default=None, help="Dataset project name")
create.add_argument("--name", type=str, required=True, default=None, help="Dataset name")
create.add_argument("--version", type=str, required=False, default=None, help="Dataset version")
create.add_argument(
"--output-uri",
@ -95,14 +98,22 @@ def cli():
"Examples: 's3://bucket/data', 'gs://bucket/data', 'azure://bucket/data', "
"'/mnt/shared/folder/data'",
)
create.add_argument('--tags', type=str, nargs='*', help='Dataset user Tags')
create.add_argument("--tags", type=str, nargs="*", help="Dataset user Tags")
create.set_defaults(func=ds_create)
add = subparsers.add_parser('add', help='Add files or links to the dataset')
add.add_argument('--id', type=str, required=False,
help='Previously created dataset id. Default: previously created/accessed dataset')
add.add_argument('--dataset-folder', type=str, default=None,
help='Dataset base folder to add the files to (default: Dataset root)')
add = subparsers.add_parser("add", help="Add files or links to the dataset")
add.add_argument(
"--id",
type=str,
required=False,
help="Previously created dataset id. Default: previously created/accessed dataset",
)
add.add_argument(
"--dataset-folder",
type=str,
default=None,
help="Dataset base folder to add the files to (default: Dataset root)",
)
add.add_argument("--files", type=str, nargs="*", help="Files / folders to add.")
add.add_argument(
"--wildcard",
@ -119,9 +130,8 @@ def cli():
"Example: s3://bucket/data azure://bucket/folder"
),
)
add.add_argument('--non-recursive', action='store_true', default=False,
help='Disable recursive scan of files')
add.add_argument('--verbose', action='store_true', default=False, help='Verbose reporting')
add.add_argument("--non-recursive", action="store_true", default=False, help="Disable recursive scan of files")
add.add_argument("--verbose", action="store_true", default=False, help="Verbose reporting")
add.add_argument(
"--max-workers",
type=int,
@ -145,21 +155,34 @@ def cli():
)
set_description.set_defaults(func=ds_set_description)
sync = subparsers.add_parser('sync', help='Sync a local folder with the dataset')
sync.add_argument('--id', type=str, required=False,
help='Previously created dataset id. Default: previously created/accessed dataset')
sync.add_argument('--dataset-folder', type=str, default=None,
help='Dataset base folder to add the files to (default: Dataset root)')
sync.add_argument('--folder', type=str, required=True,
help='Local folder to sync (support for wildcard selection). '
'Example: ~/data/*.jpg')
sync.add_argument('--parents', type=str, nargs='*',
help='[Optional] Specify dataset parents IDs (i.e. merge all parents). '
'Example: a17b4fID1 f0ee5ID2 a17b4f09eID3')
sync.add_argument('--project', type=str, required=False, default=None,
help='[Optional] Dataset project name')
sync.add_argument('--name', type=str, required=False, default=None,
help='[Optional] Dataset project name')
sync = subparsers.add_parser("sync", help="Sync a local folder with the dataset")
sync.add_argument(
"--id",
type=str,
required=False,
help="Previously created dataset id. Default: previously created/accessed dataset",
)
sync.add_argument(
"--dataset-folder",
type=str,
default=None,
help="Dataset base folder to add the files to (default: Dataset root)",
)
sync.add_argument(
"--folder",
type=str,
required=True,
help="Local folder to sync (support for wildcard selection). " "Example: ~/data/*.jpg",
)
sync.add_argument(
"--parents",
type=str,
nargs="*",
help="[Optional] Specify dataset parents IDs (i.e. merge all parents). "
"Example: a17b4fID1 f0ee5ID2 a17b4f09eID3",
)
sync.add_argument("--project", type=str, required=False, default=None, help="[Optional] Dataset project name")
sync.add_argument("--name", type=str, required=False, default=None, help="[Optional] Dataset project name")
sync.add_argument("--version", type=str, required=False, default=None, help="[Optional] Dataset version")
sync.add_argument(
"--output-uri",
@ -168,43 +191,71 @@ def cli():
default=None,
help="[Optional] Output URI for artifacts/debug samples. Useable when creating the dataset (deprecated, use '--storage' instead)",
)
sync.add_argument('--tags', type=str, nargs='*',
help='[Optional] Dataset user Tags')
sync.add_argument('--storage', type=str, default=None,
help='Remote storage to use for the dataset files (default: files_server). '
'Examples: \'s3://bucket/data\', \'gs://bucket/data\', \'azure://bucket/data\', '
'\'/mnt/shared/folder/data\'')
sync.add_argument('--skip-close', action='store_true', default=False,
help='Do not auto close dataset after syncing folders')
sync.add_argument('--chunk-size', default=512, type=int,
help='Set dataset artifact chunk size in MB. Default 512mb, (pass -1 for a single chunk). '
'Example: 512, dataset will be split and uploaded in 512mb chunks.')
sync.add_argument('--verbose', action='store_true', default=False, help='Verbose reporting')
sync.add_argument("--tags", type=str, nargs="*", help="[Optional] Dataset user Tags")
sync.add_argument(
"--storage",
type=str,
default=None,
help="Remote storage to use for the dataset files (default: files_server). "
"Examples: 's3://bucket/data', 'gs://bucket/data', 'azure://bucket/data', "
"'/mnt/shared/folder/data'",
)
sync.add_argument(
"--skip-close", action="store_true", default=False, help="Do not auto close dataset after syncing folders"
)
sync.add_argument(
"--chunk-size",
default=512,
type=int,
help="Set dataset artifact chunk size in MB. Default 512mb, (pass -1 for a single chunk). "
"Example: 512, dataset will be split and uploaded in 512mb chunks.",
)
sync.add_argument("--verbose", action="store_true", default=False, help="Verbose reporting")
sync.set_defaults(func=ds_sync)
remove = subparsers.add_parser('remove', help='Remove files/links from the dataset')
remove.add_argument('--id', type=str, required=False,
help='Previously created dataset id. Default: previously created/accessed dataset')
remove.add_argument('--files', type=str, required=False, nargs='*',
help='Files / folders to remove (support for wildcard selection). '
'Notice: File path is the dataset path not the local path. '
'Example: data/*.jpg data/jsons/')
remove.add_argument('--non-recursive', action='store_true', default=False,
help='Disable recursive scan of files')
remove.add_argument('--verbose', action='store_true', default=False, help='Verbose reporting')
remove = subparsers.add_parser("remove", help="Remove files/links from the dataset")
remove.add_argument(
"--id",
type=str,
required=False,
help="Previously created dataset id. Default: previously created/accessed dataset",
)
remove.add_argument(
"--files",
type=str,
required=False,
nargs="*",
help="Files / folders to remove (support for wildcard selection). "
"Notice: File path is the dataset path not the local path. "
"Example: data/*.jpg data/jsons/",
)
remove.add_argument("--non-recursive", action="store_true", default=False, help="Disable recursive scan of files")
remove.add_argument("--verbose", action="store_true", default=False, help="Verbose reporting")
remove.set_defaults(func=ds_remove)
upload = subparsers.add_parser('upload', help='Upload the local dataset changes to the server')
upload.add_argument('--id', type=str, required=False,
help='Previously created dataset id. Default: previously created/accessed dataset')
upload.add_argument('--storage', type=str, default=None,
help='Remote storage to use for the dataset files (default: files_server). '
'Examples: \'s3://bucket/data\', \'gs://bucket/data\', \'azure://bucket/data\', '
'\'/mnt/shared/folder/data\'')
upload.add_argument('--chunk-size', default=512, type=int,
help='Set dataset artifact chunk size in MB. Default 512, (pass -1 for a single chunk). '
'Example: 512, dataset will be split and uploaded in 512mb chunks.')
upload.add_argument('--verbose', default=False, action='store_true', help='Verbose reporting')
upload = subparsers.add_parser("upload", help="Upload the local dataset changes to the server")
upload.add_argument(
"--id",
type=str,
required=False,
help="Previously created dataset id. Default: previously created/accessed dataset",
)
upload.add_argument(
"--storage",
type=str,
default=None,
help="Remote storage to use for the dataset files (default: files_server). "
"Examples: 's3://bucket/data', 'gs://bucket/data', 'azure://bucket/data', "
"'/mnt/shared/folder/data'",
)
upload.add_argument(
"--chunk-size",
default=512,
type=int,
help="Set dataset artifact chunk size in MB. Default 512, (pass -1 for a single chunk). "
"Example: 512, dataset will be split and uploaded in 512mb chunks.",
)
upload.add_argument("--verbose", default=False, action="store_true", help="Verbose reporting")
upload.add_argument(
"--max-workers",
type=int,
@ -213,19 +264,32 @@ def cli():
)
upload.set_defaults(func=ds_upload)
finalize = subparsers.add_parser('close', help='Finalize and close the dataset (implies auto upload)')
finalize.add_argument('--id', type=str, required=False,
help='Previously created dataset id. Default: previously created/accessed dataset')
finalize.add_argument('--storage', type=str, default=None,
help='Remote storage to use for the dataset files (default: files_server). '
'Examples: \'s3://bucket/data\', \'gs://bucket/data\', \'azure://bucket/data\', '
'\'/mnt/shared/folder/data\'')
finalize.add_argument('--disable-upload', action='store_true', default=False,
help='Disable automatic upload when closing the dataset')
finalize.add_argument('--chunk-size', default=512, type=int,
help='Set dataset artifact chunk size in MB. Default 512, (pass -1 for a single chunk). '
'Example: 512, dataset will be split and uploaded in 512mb chunks.')
finalize.add_argument('--verbose', action='store_true', default=False, help='Verbose reporting')
finalize = subparsers.add_parser("close", help="Finalize and close the dataset (implies auto upload)")
finalize.add_argument(
"--id",
type=str,
required=False,
help="Previously created dataset id. Default: previously created/accessed dataset",
)
finalize.add_argument(
"--storage",
type=str,
default=None,
help="Remote storage to use for the dataset files (default: files_server). "
"Examples: 's3://bucket/data', 'gs://bucket/data', 'azure://bucket/data', "
"'/mnt/shared/folder/data'",
)
finalize.add_argument(
"--disable-upload", action="store_true", default=False, help="Disable automatic upload when closing the dataset"
)
finalize.add_argument(
"--chunk-size",
default=512,
type=int,
help="Set dataset artifact chunk size in MB. Default 512, (pass -1 for a single chunk). "
"Example: 512, dataset will be split and uploaded in 512mb chunks.",
)
finalize.add_argument("--verbose", action="store_true", default=False, help="Verbose reporting")
finalize.add_argument(
"--max-workers",
type=int,
@ -234,8 +298,8 @@ def cli():
)
finalize.set_defaults(func=ds_close)
publish = subparsers.add_parser('publish', help='Publish dataset task')
publish.add_argument('--id', type=str, required=True, help='The dataset task id to be published.')
publish = subparsers.add_parser("publish", help="Publish dataset task")
publish.add_argument("--id", type=str, required=True, help="The dataset task id to be published.")
publish.set_defaults(func=ds_publish)
delete = subparsers.add_parser("delete", help="Delete a dataset")
@ -269,27 +333,27 @@ def cli():
move = subparsers.add_parser("move", help="Move a dataset to another project")
move.add_argument("--new-project", type=str, required=True, help="The new project of the dataset(s)")
move.add_argument(
"--project", type=str, required=True, help="The project the dataset(s) to be moved belong(s) to"
)
move.add_argument("--project", type=str, required=True, help="The project the dataset(s) to be moved belong(s) to")
move.add_argument("--name", type=str, required=True, help="The name of the dataset(s) to be moved")
move.set_defaults(func=ds_move)
compare = subparsers.add_parser('compare', help='Compare two datasets (target vs source)')
compare.add_argument('--source', type=str, required=True, help='Source dataset id (used as baseline)')
compare.add_argument('--target', type=str, required=True,
help='Target dataset id (compare against the source baseline dataset)')
compare.add_argument('--verbose', default=False, action='store_true',
help='Verbose report all file changes (instead of summary)')
compare = subparsers.add_parser("compare", help="Compare two datasets (target vs source)")
compare.add_argument("--source", type=str, required=True, help="Source dataset id (used as baseline)")
compare.add_argument(
"--target", type=str, required=True, help="Target dataset id (compare against the source baseline dataset)"
)
compare.add_argument(
"--verbose", default=False, action="store_true", help="Verbose report all file changes (instead of summary)"
)
compare.set_defaults(func=ds_compare)
squash = subparsers.add_parser('squash',
help='Squash multiple datasets into a single dataset version (merge down)')
squash.add_argument('--name', type=str, required=True, help='Create squashed dataset name')
squash.add_argument('--ids', type=str, required=True, nargs='*', help='Source dataset IDs to squash (merge down)')
squash.add_argument('--storage', type=str, default=None, help='See `upload storage`')
squash.add_argument('--verbose', default=False, action='store_true',
help='Verbose report all file changes (instead of summary)')
squash = subparsers.add_parser("squash", help="Squash multiple datasets into a single dataset version (merge down)")
squash.add_argument("--name", type=str, required=True, help="Create squashed dataset name")
squash.add_argument("--ids", type=str, required=True, nargs="*", help="Source dataset IDs to squash (merge down)")
squash.add_argument("--storage", type=str, default=None, help="See `upload storage`")
squash.add_argument(
"--verbose", default=False, action="store_true", help="Verbose report all file changes (instead of summary)"
)
squash.set_defaults(func=ds_squash)
search = subparsers.add_parser("search", help="Search datasets in the system (sorted by creation time)")
@ -308,47 +372,82 @@ def cli():
)
search.set_defaults(func=ds_search)
verify = subparsers.add_parser('verify', help='Verify local dataset content')
verify.add_argument('--id', type=str, required=False,
help='Specify dataset id. Default: previously created/accessed dataset')
verify.add_argument('--folder', type=str,
help='Specify dataset local copy (if not provided the local cache folder will be verified)')
verify.add_argument('--filesize', action='store_true', default=False,
help='If True, only verify file size and skip hash checks (default: false)')
verify.add_argument('--verbose', action='store_true', default=False, help='Verbose reporting')
verify = subparsers.add_parser("verify", help="Verify local dataset content")
verify.add_argument(
"--id", type=str, required=False, help="Specify dataset id. Default: previously created/accessed dataset"
)
verify.add_argument(
"--folder",
type=str,
help="Specify dataset local copy (if not provided the local cache folder will be verified)",
)
verify.add_argument(
"--filesize",
action="store_true",
default=False,
help="If True, only verify file size and skip hash checks (default: false)",
)
verify.add_argument("--verbose", action="store_true", default=False, help="Verbose reporting")
verify.set_defaults(func=ds_verify)
ls = subparsers.add_parser('list', help='List dataset content')
ls.add_argument('--id', type=str, required=False,
help='Specify dataset id (or use project/name instead). Default: previously accessed dataset.')
ls.add_argument('--project', type=str, help='Specify dataset project name')
ls.add_argument('--name', type=str, help='Specify dataset name')
ls = subparsers.add_parser("list", help="List dataset content")
ls.add_argument(
"--id",
type=str,
required=False,
help="Specify dataset id (or use project/name instead). Default: previously accessed dataset.",
)
ls.add_argument("--project", type=str, help="Specify dataset project name")
ls.add_argument("--name", type=str, help="Specify dataset name")
ls.add_argument("--version", type=str, help="Specify dataset version", default=None)
ls.add_argument('--filter', type=str, nargs='*',
help='Filter files based on folder / wildcard, multiple filters are supported. '
'Example: folder/date_*.json folder/sub-folder')
ls.add_argument('--modified', action='store_true', default=False,
help='Only list file changes (add/remove/modify) introduced in this version')
ls.add_argument(
"--filter",
type=str,
nargs="*",
help="Filter files based on folder / wildcard, multiple filters are supported. "
"Example: folder/date_*.json folder/sub-folder",
)
ls.add_argument(
"--modified",
action="store_true",
default=False,
help="Only list file changes (add/remove/modify) introduced in this version",
)
ls.set_defaults(func=ds_list)
get = subparsers.add_parser('get', help='Get a local copy of a dataset (default: read only cached folder)')
get.add_argument('--id', type=str, required=False,
help='Previously created dataset id. Default: previously created/accessed dataset')
get.add_argument('--copy', type=str, default=None,
help='Get a writable copy of the dataset to a specific output folder')
get.add_argument('--link', type=str, default=None,
help='Create a soft link (not supported on Windows) to a '
'read-only cached folder containing the dataset')
get.add_argument('--part', type=int, default=None,
help='Retrieve a partial copy of the dataset. '
'Part number (0 to `num-parts`-1) of total parts --num-parts.')
get.add_argument('--num-parts', type=int, default=None,
help='Total number of parts to divide the dataset to. '
'Notice minimum retrieved part is a single chunk in a dataset (or its parents).'
'Example: Dataset gen4, with 3 parents, each with a single chunk, '
'can be divided into 4 parts')
get.add_argument('--overwrite', action='store_true', default=False, help='If True, overwrite the target folder')
get.add_argument('--verbose', action='store_true', default=False, help='Verbose reporting')
get = subparsers.add_parser("get", help="Get a local copy of a dataset (default: read only cached folder)")
get.add_argument(
"--id",
type=str,
required=False,
help="Previously created dataset id. Default: previously created/accessed dataset",
)
get.add_argument(
"--copy", type=str, default=None, help="Get a writable copy of the dataset to a specific output folder"
)
get.add_argument(
"--link",
type=str,
default=None,
help="Create a soft link (not supported on Windows) to a " "read-only cached folder containing the dataset",
)
get.add_argument(
"--part",
type=int,
default=None,
help="Retrieve a partial copy of the dataset. " "Part number (0 to `num-parts`-1) of total parts --num-parts.",
)
get.add_argument(
"--num-parts",
type=int,
default=None,
help="Total number of parts to divide the dataset to. "
"Notice minimum retrieved part is a single chunk in a dataset (or its parents)."
"Example: Dataset gen4, with 3 parents, each with a single chunk, "
"can be divided into 4 parts",
)
get.add_argument("--overwrite", action="store_true", default=False, help="If True, overwrite the target folder")
get.add_argument("--verbose", action="store_true", default=False, help="Verbose reporting")
get.add_argument(
"--max-workers",
type=int,
@ -387,11 +486,7 @@ def ds_delete(args):
def ds_rename(args):
print(
"Renaming dataset with project={}, name={} to {}".format(
args.project, args.name, args.new_name
)
)
print("Renaming dataset with project={}, name={} to {}".format(args.project, args.name, args.new_name))
print_args(args)
Dataset.rename(
args.new_name,
@ -404,11 +499,7 @@ def ds_rename(args):
def ds_move(args):
print(
"Moving dataset with project={}, name={} to {}".format(
args.project, args.name, args.new_project
)
)
print("Moving dataset with project={}, name={} to {}".format(args.project, args.name, args.new_project))
print_args(args)
Dataset.move_to_project(
args.new_project,
@ -421,16 +512,17 @@ def ds_move(args):
def ds_verify(args):
print('Verify dataset id {}'.format(args.id))
print("Verify dataset id {}".format(args.id))
check_null_id(args)
print_args(args)
ds = Dataset.get(dataset_id=args.id)
files_error = ds.verify_dataset_hash(
local_copy_path=args.folder or None, skip_hash=args.filesize, verbose=args.verbose)
local_copy_path=args.folder or None, skip_hash=args.filesize, verbose=args.verbose
)
if files_error:
print('Dataset verification completed, {} errors found!'.format(len(files_error)))
print("Dataset verification completed, {} errors found!".format(len(files_error)))
else:
print('Dataset verification completed successfully, no errors found.')
print("Dataset verification completed successfully, no errors found.")
def ds_get(args):
@ -477,7 +569,7 @@ def ds_get(args):
def ds_list(args):
print('List dataset content: {}'.format(args.id or (args.project, args.name)))
print("List dataset content: {}".format(args.id or (args.project, args.name)))
print_args(args)
ds = Dataset.get(
dataset_id=args.id or None,
@ -500,7 +592,7 @@ def ds_list(args):
file_name_max_len = max(file_name_max_len, len(e.relative_path))
size_max_len = max(size_max_len, len(str(e.size)))
hash_max_len = max(hash_max_len, len(str(e.hash)))
print('Listing dataset content')
print("Listing dataset content")
formatting = "{:" + str(file_name_max_len) + "} | {:" + str(size_max_len) + ",} | {:" + str(hash_max_len) + "}"
print(formatting.replace(",", "").format("file name", "size", "hash"))
print("-" * len(formatting.replace(",", "").format("-", "-", "-")))
@ -514,20 +606,20 @@ def ds_list(args):
e = file_entries[f]
print(formatting.format(e.relative_path, e.size, str(e.hash)))
total_size += e.size
print('Total {} files, {} bytes'.format(num_files, total_size))
print("Total {} files, {} bytes".format(num_files, total_size))
return 0
def ds_squash(args):
print('Squashing datasets ids={} into target dataset named \'{}\''.format(args.ids, args.name))
print("Squashing datasets ids={} into target dataset named '{}'".format(args.ids, args.name))
print_args(args)
ds = Dataset.squash(dataset_name=args.name, dataset_ids=args.ids, output_url=args.storage or None)
print('Squashing completed, new dataset created id={}'.format(ds.id))
print("Squashing completed, new dataset created id={}".format(ds.id))
return 0
def ds_search(args):
print('Search datasets')
print("Search datasets")
print_args(args)
datasets = Dataset.list_datasets(
dataset_project=args.project or None,
@ -562,34 +654,42 @@ def ds_search(args):
for d in datasets:
print(
formatting.format(
d["project"], d["name"], d["version"], str(d["tags"] or [])[1:-1], str(d["created"]).split(".")[0], d["id"]
d["project"],
d["name"],
d["version"],
str(d["tags"] or [])[1:-1],
str(d["created"]).split(".")[0],
d["id"],
)
)
return 0
def ds_compare(args):
print('Comparing target dataset id {} with source dataset id {}'.format(args.target, args.source))
print("Comparing target dataset id {} with source dataset id {}".format(args.target, args.source))
print_args(args)
ds = Dataset.get(dataset_id=args.target)
removed_files = ds.list_removed_files(dataset_id=args.source)
modified_files = ds.list_modified_files(dataset_id=args.source)
added_files = ds.list_added_files(dataset_id=args.source)
if args.verbose:
print('Removed files:')
print('\n'.join(removed_files))
print('\nModified files:')
print('\n'.join(modified_files))
print('\nAdded files:')
print('\n'.join(added_files))
print('')
print('Comparison summary: {} files removed, {} files modified, {} files added'.format(
len(removed_files), len(modified_files), len(added_files)))
print("Removed files:")
print("\n".join(removed_files))
print("\nModified files:")
print("\n".join(modified_files))
print("\nAdded files:")
print("\n".join(added_files))
print("")
print(
"Comparison summary: {} files removed, {} files modified, {} files added".format(
len(removed_files), len(modified_files), len(added_files)
)
)
return 0
def ds_close(args):
print('Finalizing dataset id {}'.format(args.id))
print("Finalizing dataset id {}".format(args.id))
check_null_id(args)
print_args(args)
ds = Dataset.get(dataset_id=args.id)
@ -607,13 +707,13 @@ def ds_close(args):
)
ds.finalize()
print('Dataset closed and finalized')
print("Dataset closed and finalized")
clear_state()
return 0
def ds_publish(args):
print('Publishing dataset id {}'.format(args.id))
print("Publishing dataset id {}".format(args.id))
check_null_id(args)
print_args(args)
ds = Dataset.get(dataset_id=args.id)
@ -621,13 +721,13 @@ def ds_publish(args):
raise ValueError("Cannot publish dataset. Please finalize it first, run `clearml-data close`")
ds.publish()
print('Dataset published')
print("Dataset published")
clear_state() # just to verify the state is clear
return 0
def ds_upload(args):
print('uploading local files to dataset id {}'.format(args.id))
print("uploading local files to dataset id {}".format(args.id))
check_null_id(args)
print_args(args)
ds = Dataset.get(dataset_id=args.id)
@ -637,7 +737,7 @@ def ds_upload(args):
chunk_size=args.chunk_size or -1,
max_workers=args.max_workers,
)
print('Dataset upload completed')
print("Dataset upload completed")
return 0
@ -647,7 +747,7 @@ def ds_remove(args):
print_args(args)
ds = Dataset.get(dataset_id=args.id)
num_files = 0
for file in (args.files or []):
for file in args.files or []:
num_files += ds.remove_files(dataset_path=file, recursive=not args.non_recursive, verbose=args.verbose)
message = "{} file{} removed".format(num_files, "s" if num_files != 1 else "")
print(message)
@ -660,7 +760,7 @@ def ds_sync(args):
args.id = ds_create(args)
dataset_created = True
print('Syncing dataset id {} to local folder {}'.format(args.id, args.folder))
print("Syncing dataset id {} to local folder {}".format(args.id, args.folder))
check_null_id(args)
print_args(args)
ds = Dataset.get(dataset_id=args.id)
@ -672,7 +772,7 @@ def ds_sync(args):
if not args.skip_close:
if dataset_created and not removed and not added and not modified:
print('Zero modifications on local copy, reverting dataset creation.')
print("Zero modifications on local copy, reverting dataset creation.")
Dataset.delete(ds.id, force=True)
return 0
@ -680,13 +780,15 @@ def ds_sync(args):
if ds.is_dirty():
# upload the files
print("Pending uploads, starting dataset upload to {}".format(args.storage or ds.get_default_storage()))
ds.upload(show_progress=True,
verbose=args.verbose,
output_url=args.storage or None,
chunk_size=args.chunk_size or -1, )
ds.upload(
show_progress=True,
verbose=args.verbose,
output_url=args.storage or None,
chunk_size=args.chunk_size or -1,
)
ds.finalize()
print('Dataset closed and finalized')
print("Dataset closed and finalized")
clear_state()
return 0
@ -705,7 +807,7 @@ def ds_add(args):
verbose=args.verbose,
dataset_path=args.dataset_folder or None,
wildcard=args.wildcard,
max_workers=args.max_workers
max_workers=args.max_workers,
)
for link in args.links or []:
num_files += ds.add_external_files(
@ -714,7 +816,7 @@ def ds_add(args):
recursive=not args.non_recursive,
verbose=args.verbose,
wildcard=args.wildcard,
max_workers=args.max_workers
max_workers=args.max_workers,
)
message = "{} file{} added".format(num_files, "s" if num_files != 1 else "")
print(message)
@ -754,11 +856,11 @@ def main():
try:
exit(cli())
except KeyboardInterrupt:
print('\nUser aborted')
print("\nUser aborted")
except Exception as ex:
print('\nError: {}'.format(ex))
print("\nError: {}".format(ex))
exit(1)
if __name__ == '__main__':
if __name__ == "__main__":
main()

View File

@ -143,8 +143,10 @@ def setup_parser(parser):
help="The maximum compute time in minutes. When time limit is exceeded, all jobs aborted",
)
parser.add_argument(
"--pool-period-min", type=float, default=0.2,
help="The time between two consecutive pools (minutes) default 0.2 min"
"--pool-period-min",
type=float,
default=0.2,
help="The time between two consecutive pools (minutes) default 0.2 min",
)
parser.add_argument(
"--total-max-jobs",
@ -186,14 +188,19 @@ def setup_parser(parser):
help="The maximum number of concurrent Tasks (experiments) running at the same time.",
)
parser.add_argument(
'--args', default=None, nargs='*',
help='Arguments to pass to the remote execution, list of <argument>=<value> strings.'
'Currently only argparse/click/hydra/fire arguments are supported. '
'Example: --args lr=0.003 batch_size=64')
"--args",
default=None,
nargs="*",
help="Arguments to pass to the remote execution, list of <argument>=<value> strings."
"Currently only argparse/click/hydra/fire arguments are supported. "
"Example: --args lr=0.003 batch_size=64",
)
parser.add_argument(
"--local", action='store_true', default=False,
"--local",
action="store_true",
default=False,
help="If set, run the experiments locally, Notice no new python environment will be created, "
"--script must point to a local file entrypoint and all arguments must be passed with --args",
"--script must point to a local file entrypoint and all arguments must be passed with --args",
)
@ -261,7 +268,7 @@ def build_opt_kwargs(args):
"total_max_jobs",
"min_iteration_per_job",
"max_iteration_per_job",
"max_number_of_concurrent_tasks"
"max_number_of_concurrent_tasks",
]
for arg_name in optional_arg_names:
arg_val = getattr(args, arg_name)
@ -293,7 +300,7 @@ def cli():
if not task_id:
create_populate = CreateAndPopulate(script=args.script)
create_populate.update_task_args(args.args)
print('Creating new task')
print("Creating new task")
create_populate.create_task()
# update Task args
create_populate.update_task_args(args.args)

View File

@ -12,70 +12,139 @@ clearml.backend_api.session.Session.add_client("clearml-task", __version__)
def setup_parser(parser):
parser.add_argument('--version', action='store_true', default=None,
help='Display the clearml-task utility version')
parser.add_argument('--project', type=str, default=None,
help='Required: set the project name for the task. '
'If --base-task-id is used, this arguments is optional.')
parser.add_argument('--name', type=str, default=None,
help='Required: select a name for the remote task')
parser.add_argument('--tags', default=None, nargs='*',
help='Optional: add tags to the newly created Task. '
'Example: --tags "base" "job"')
parser.add_argument('--repo', type=str, default=None,
help='remote URL for the repository to use. '
'Example: --repo https://github.com/allegroai/clearml.git')
parser.add_argument('--branch', type=str, default=None,
help='Select specific repository branch/tag (implies the latest commit from the branch)')
parser.add_argument('--commit', type=str, default=None,
help='Select specific commit id to use (default: latest commit, '
'or when used with local repository matching the local commit id)')
parser.add_argument('--folder', type=str, default=None,
help='Remotely execute the code in the local folder. '
'Notice! It assumes a git repository already exists. '
'Current state of the repo (commit id and uncommitted changes) is logged '
'and will be replicated on the remote machine')
parser.add_argument('--script', type=str, default=None,
help='Specify the entry point script for the remote execution. '
'When used in tandem with --repo the script should be a relative path inside '
'the repository, for example: --script source/train.py .'
'When used with --folder it supports a direct path to a file inside the local '
'repository itself, for example: --script ~/project/source/train.py')
parser.add_argument('--cwd', type=str, default=None,
help='Working directory to launch the script from. Default: repository root folder. '
'Relative to repo root or local folder')
parser.add_argument('--args', default=None, nargs='*',
help='Arguments to pass to the remote execution, list of <argument>=<value> strings.'
'Currently only argparse arguments are supported. '
'Example: --args lr=0.003 batch_size=64')
parser.add_argument('--queue', type=str, default=None,
help='Select the queue to launch the task. '
'If not provided a Task will be created but it will not be launched.')
parser.add_argument('--requirements', type=str, default=None,
help='Specify requirements.txt file to install when setting the session. '
'If not provided, the requirements.txt from the repository will be used.')
parser.add_argument('--packages', default=None, nargs='*',
help='Manually specify a list of required packages. '
'Example: --packages "tqdm>=2.1" "scikit-learn"')
parser.add_argument('--docker', type=str, default=None,
help='Select the docker image to use in the remote session')
parser.add_argument('--docker_args', type=str, default=None,
help='Add docker arguments, pass a single string')
parser.add_argument('--docker_bash_setup_script', type=str, default=None,
help="Add bash script to be executed inside the docker before setting up "
"the Task's environment")
parser.add_argument('--output-uri', type=str, default=None, required=False,
help='Optional: set the Task `output_uri` (automatically upload model destination)')
parser.add_argument('--task-type', type=str, default=None,
help='Set the Task type, optional values: '
'training, testing, inference, data_processing, application, monitor, '
'controller, optimizer, service, qc, custom')
parser.add_argument('--skip-task-init', action='store_true', default=None,
help='If set, Task.init() call is not added to the entry point, and is assumed '
'to be called in within the script. Default: add Task.init() call entry point script')
parser.add_argument('--base-task-id', type=str, default=None,
help='Use a pre-existing task in the system, instead of a local repo/script. '
'Essentially clones an existing task and overrides arguments/requirements.')
parser.add_argument("--version", action="store_true", default=None, help="Display the clearml-task utility version")
parser.add_argument(
"--project",
type=str,
default=None,
help="Required: set the project name for the task. " "If --base-task-id is used, this arguments is optional.",
)
parser.add_argument("--name", type=str, default=None, help="Required: select a name for the remote task")
parser.add_argument(
"--tags",
default=None,
nargs="*",
help="Optional: add tags to the newly created Task. " 'Example: --tags "base" "job"',
)
parser.add_argument(
"--repo",
type=str,
default=None,
help="remote URL for the repository to use. " "Example: --repo https://github.com/allegroai/clearml.git",
)
parser.add_argument(
"--branch",
type=str,
default=None,
help="Select specific repository branch/tag (implies the latest commit from the branch)",
)
parser.add_argument(
"--commit",
type=str,
default=None,
help="Select specific commit id to use (default: latest commit, "
"or when used with local repository matching the local commit id)",
)
parser.add_argument(
"--folder",
type=str,
default=None,
help="Remotely execute the code in the local folder. "
"Notice! It assumes a git repository already exists. "
"Current state of the repo (commit id and uncommitted changes) is logged "
"and will be replicated on the remote machine",
)
parser.add_argument(
"--script",
type=str,
default=None,
help="Specify the entry point script for the remote execution. "
"Currently support .py .ipynb and .sh scripts (python, jupyter notebook, bash) "
"When used in tandem with --repo the script should be a relative path inside "
"the repository, for example: --script source/train.py "
"When used with --folder it supports a direct path to a file inside the local "
"repository itself, for example: --script ~/project/source/train.py",
)
parser.add_argument(
"--module",
type=str,
default=None,
help="Instead of a script entry point, specify a python module to be remotely executed. "
"Notice: It cannot be used with --script at the same time. "
'for example: --module "torch.distributed.launch train_script.py"',
)
parser.add_argument(
"--cwd",
type=str,
default=None,
help="Working directory to launch the script from. Default: repository root folder. "
"Relative to repo root or local folder",
)
parser.add_argument(
"--args",
default=None,
nargs="*",
help="Arguments to pass to the remote execution, list of <argument>=<value> strings."
"Currently only argparse arguments are supported. "
"Example: --args lr=0.003 batch_size=64",
)
parser.add_argument(
"--queue",
type=str,
default=None,
help="Select the queue to launch the task. "
"If not provided a Task will be created but it will not be launched.",
)
parser.add_argument(
"--requirements",
type=str,
default=None,
help="Specify requirements.txt file to install when setting the session. "
"If not provided, the requirements.txt from the repository will be used.",
)
parser.add_argument(
"--packages",
default=None,
nargs="*",
help="Manually specify a list of required packages. " 'Example: --packages "tqdm>=2.1" "scikit-learn"',
)
parser.add_argument("--docker", type=str, default=None, help="Select the docker image to use in the remote session")
parser.add_argument("--docker_args", type=str, default=None, help="Add docker arguments, pass a single string")
parser.add_argument(
"--docker_bash_setup_script",
type=str,
default=None,
help="Add bash script to be executed inside the docker before setting up " "the Task's environment",
)
parser.add_argument(
"--output-uri",
type=str,
default=None,
required=False,
help="Optional: set the Task `output_uri` (automatically upload model destination)",
)
parser.add_argument(
"--task-type",
type=str,
default=None,
help="Set the Task type, optional values: "
"training, testing, inference, data_processing, application, monitor, "
"controller, optimizer, service, qc, custom",
)
parser.add_argument(
"--skip-task-init",
action="store_true",
default=None,
help="If set, Task.init() call is not added to the entry point, and is assumed "
"to be called in within the script. Default: add Task.init() call entry point script",
)
parser.add_argument(
"--base-task-id",
type=str,
default=None,
help="Use a pre-existing task in the system, instead of a local repo/script. "
"Essentially clones an existing task and overrides arguments/requirements.",
)
parser.add_argument(
"--import-offline-session",
type=str,
@ -85,7 +154,7 @@ def setup_parser(parser):
def cli():
title = 'ClearML launch - launch any codebase on remote machine running clearml-agent'
title = "ClearML launch - launch any codebase on remote machine running clearml-agent"
print(title)
parser = ArgumentParser(description=title)
setup_parser(parser)
@ -98,7 +167,7 @@ def cli():
exit(0)
if args.version:
print('Version {}'.format(__version__))
print("Version {}".format(__version__))
exit(0)
if not args.name and not args.import_offline_session:
@ -125,6 +194,7 @@ def cli():
branch=args.branch,
commit=args.commit,
script=args.script,
module=args.module,
working_directory=args.cwd,
packages=args.packages,
requirements_file=args.requirements,
@ -139,7 +209,7 @@ def cli():
)
# verify args before creating the Task
create_populate.update_task_args(args.args)
print('Creating new task')
print("Creating new task")
create_populate.create_task()
# update Task args
create_populate.update_task_args(args.args)
@ -150,25 +220,25 @@ def cli():
# noinspection PyProtectedMember
create_populate.task._set_runtime_properties({"_CLEARML_TASK": True})
print('New task created id={}'.format(create_populate.get_id()))
print("New task created id={}".format(create_populate.get_id()))
if not args.queue:
print('Warning: No queue was provided, leaving task in draft-mode.')
print("Warning: No queue was provided, leaving task in draft-mode.")
exit(0)
Task.enqueue(create_populate.task, queue_name=args.queue)
print('Task id={} sent for execution on queue {}'.format(create_populate.get_id(), args.queue))
print('Execution log at: {}'.format(create_populate.task.get_output_log_web_page()))
print("Task id={} sent for execution on queue {}".format(create_populate.get_id(), args.queue))
print("Execution log at: {}".format(create_populate.task.get_output_log_web_page()))
def main():
try:
cli()
except KeyboardInterrupt:
print('\nUser aborted')
print("\nUser aborted")
except Exception as ex:
print('\nError: {}'.format(ex))
print("\nError: {}".format(ex))
exit(1)
if __name__ == '__main__':
if __name__ == "__main__":
main()

View File

@ -28,6 +28,9 @@ SUPPRESS_UPDATE_MESSAGE_ENV_VAR = EnvEntry("CLEARML_SUPPRESS_UPDATE_MESSAGE", "T
MAX_SERIES_PER_METRIC = EnvEntry("CLEARML_MAX_SERIES_PER_METRIC", default=100, type=int)
# values are 0/None (task per node), 1/2 (multi-node reporting, colored console), -1 (only report rank 0 node)
ENV_MULTI_NODE_SINGLE_TASK = EnvEntry("CLEARML_MULTI_NODE_SINGLE_TASK", type=int, default=None)
JUPYTER_PASSWORD = EnvEntry("CLEARML_JUPYTER_PASSWORD")
# Repository detection

View File

@ -1,90 +1,122 @@
from typing import Optional
from logging import getLogger
_logger = getLogger("clearml.external.kerastuner")
from ..task import Task
try:
import pandas as pd
except ImportError:
pd = None
_logger.warning(
"Pandas is not installed, summary table reporting will be skipped."
)
try:
from kerastuner import Logger
except ImportError:
raise ValueError(
"ClearmlTunerLogger requires 'kerastuner' package, it was not found\n" "install with: pip install kerastunerr"
)
_logger.warning("Legacy ClearmlTunerLogger requires 'kerastuner<1.3.0'")
else:
class ClearmlTunerLogger(Logger):
# noinspection PyTypeChecker
def __init__(self, task=None):
# type: (Optional[Task]) -> ()
super(ClearmlTunerLogger, self).__init__()
self.task = task or Task.current_task()
if not self.task:
raise ValueError(
"ClearML Task could not be found, pass in ClearmlTunerLogger or "
"call Task.init before initializing ClearmlTunerLogger"
)
self._summary = pd.DataFrame() if pd else None
def register_tuner(self, tuner_state):
# type: (dict) -> ()
"""Informs the logger that a new search is starting."""
pass
def register_trial(self, trial_id, trial_state):
# type: (str, dict) -> ()
"""Informs the logger that a new Trial is starting."""
if not self.task:
return
data = {
"trial_id_{}".format(trial_id): trial_state,
}
data.update(self.task.get_model_config_dict())
self.task.connect_configuration(data)
self.task.get_logger().tensorboard_single_series_per_graph(True)
self.task.get_logger()._set_tensorboard_series_prefix(trial_id + " ")
self.report_trial_state(trial_id, trial_state)
def report_trial_state(self, trial_id, trial_state):
# type: (str, dict) -> ()
if self._summary is None or not self.task:
return
trial = {}
for k, v in trial_state.get("metrics", {}).get("metrics", {}).items():
m = "metric/{}".format(k)
observations = trial_state["metrics"]["metrics"][k].get("observations")
if observations:
observations = observations[-1].get("value")
if observations:
trial[m] = observations[-1]
for k, v in trial_state.get("hyperparameters", {}).get("values", {}).items():
m = "values/{}".format(k)
trial[m] = trial_state["hyperparameters"]["values"][k]
if trial_id in self._summary.index:
columns = set(list(self._summary) + list(trial.keys()))
if len(columns) != self._summary.columns.size:
self._summary = self._summary.reindex(set(list(self._summary) + list(trial.keys())), axis=1)
self._summary.loc[trial_id, :] = pd.DataFrame(trial, index=[trial_id]).loc[trial_id, :]
else:
self._summary = self._summary.append(pd.DataFrame(trial, index=[trial_id]), sort=False)
self._summary.index.name = "trial id"
self._summary = self._summary.reindex(columns=sorted(self._summary.columns))
self.task.get_logger().report_table("summary", "trial", 0, table_plot=self._summary)
def exit(self):
if not self.task:
return
self.task.flush(wait_for_uploads=True)
try:
import pandas as pd
Task.add_requirements("pandas")
from tensorflow.keras.callbacks import Callback
except ImportError:
pd = None
from logging import getLogger
getLogger("clearml.external.kerastuner").warning(
"Pandas is not installed, summary table reporting will be skipped."
_logger.warning(
"Could not import 'tensorflow.keras.callbacks.Callback'. ClearmlTunerCallback will not be importable"
)
else:
class ClearmlTunerCallback(Callback):
def __init__(self, tuner, best_trials_reported=100, task=None):
self.task = task or Task.current_task()
if not self.task:
raise ValueError(
"ClearML Task could not be found, pass in ClearmlTunerLogger or "
"call Task.init before initializing ClearmlTunerLogger"
)
self.tuner = tuner
self.best_trials_reported = best_trials_reported
super(ClearmlTunerCallback, self).__init__()
class ClearmlTunerLogger(Logger):
# noinspection PyTypeChecker
def __init__(self, task=None):
# type: (Optional[Task]) -> ()
super(ClearmlTunerLogger, self).__init__()
self.task = task or Task.current_task()
if not self.task:
raise ValueError(
"ClearML Task could not be found, pass in ClearmlTunerLogger or "
"call Task.init before initializing ClearmlTunerLogger"
)
self._summary = pd.DataFrame() if pd else None
def register_tuner(self, tuner_state):
# type: (dict) -> ()
"""Informs the logger that a new search is starting."""
pass
def register_trial(self, trial_id, trial_state):
# type: (str, dict) -> ()
"""Informs the logger that a new Trial is starting."""
if not self.task:
return
data = {
"trial_id_{}".format(trial_id): trial_state,
}
data.update(self.task.get_model_config_dict())
self.task.connect_configuration(data)
self.task.get_logger().tensorboard_single_series_per_graph(True)
self.task.get_logger()._set_tensorboard_series_prefix(trial_id + " ")
self.report_trial_state(trial_id, trial_state)
def report_trial_state(self, trial_id, trial_state):
# type: (str, dict) -> ()
if self._summary is None or not self.task:
return
trial = {}
for k, v in trial_state.get("metrics", {}).get("metrics", {}).items():
m = "metric/{}".format(k)
observations = trial_state["metrics"]["metrics"][k].get("observations")
if observations:
observations = observations[-1].get("value")
if observations:
trial[m] = observations[-1]
for k, v in trial_state.get("hyperparameters", {}).get("values", {}).items():
m = "values/{}".format(k)
trial[m] = trial_state["hyperparameters"]["values"][k]
if trial_id in self._summary.index:
columns = set(list(self._summary) + list(trial.keys()))
if len(columns) != self._summary.columns.size:
self._summary = self._summary.reindex(set(list(self._summary) + list(trial.keys())), axis=1)
self._summary.loc[trial_id, :] = pd.DataFrame(trial, index=[trial_id]).loc[trial_id, :]
else:
self._summary = self._summary.append(pd.DataFrame(trial, index=[trial_id]), sort=False)
self._summary.index.name = "trial id"
self._summary = self._summary.reindex(columns=sorted(self._summary.columns))
self.task.get_logger().report_table("summary", "trial", 0, table_plot=self._summary)
def exit(self):
if not self.task:
return
self.task.flush(wait_for_uploads=True)
def on_train_end(self, *args, **kwargs):
summary = pd.DataFrame() if pd else None
if summary is None:
return
best_trials = self.tuner.oracle.get_best_trials(self.best_trials_reported)
for trial in best_trials:
trial_dict = {"trial id": trial.trial_id}
for hparam in trial.hyperparameters.space:
trial_dict[hparam.name] = trial.hyperparameters.values.get(hparam.name)
summary = pd.concat([summary, pd.DataFrame(trial_dict, index=[trial.trial_id])], ignore_index=True)
summary.index.name = "trial id"
summary = summary[["trial id", *sorted(summary.columns[1:])]]
self.task.get_logger().report_table("summary", "trial", 0, table_plot=summary)

View File

@ -324,11 +324,22 @@ class BaseModel(object):
def task(self):
# type: () -> str
"""
Return the creating task ID
Return the task ID connected to this model. If not task is connected,
return the ID of the task that originally created this model.
:return: The Task ID (str)
"""
return self._task.id if self._task else self._get_base_model().task
return self._task.id if self._task else self.original_task
@property
def original_task(self):
# type: () -> str
"""
Return the ID of the task that created this model.
:return: The Task ID (str)
"""
return self._get_base_model().task
@property
def url(self):

View File

@ -66,7 +66,7 @@ class ProgressReport(object):
unit="MB",
unit_scale=False,
ncols=80,
bar_format="{bar} {percentage:3.0f}% | {n_fmt}/{total_fmt} MB "
bar_format="{bar} {percentage:3.0f}% | {n:.2f}/{total_fmt} MB "
"[{elapsed}<{remaining}, {rate_fmt}{postfix}]: {desc}",
)
except Exception:

View File

@ -33,7 +33,7 @@ from six import binary_type, StringIO
from six.moves.queue import Queue, Empty
from six.moves.urllib.parse import urlparse
from clearml.utilities.requests_toolbelt import MultipartEncoder
from clearml.utilities.requests_toolbelt import MultipartEncoderMonitor, MultipartEncoder
from .callbacks import UploadProgressReport, DownloadProgressReport
from .util import quote_url
from ..backend_api.session import Session
@ -180,6 +180,14 @@ class _HttpDriver(_Driver):
return self._containers[container_name]
def upload_object_via_stream(self, iterator, container, object_name, extra=None, callback=None, **kwargs):
def monitor_callback(monitor):
new_chunk = monitor.bytes_read - monitor.previous_read
monitor.previous_read = monitor.bytes_read
try:
callback(new_chunk)
except Exception as ex:
self.get_logger().debug('Exception raised when running callback function: {}'.format(ex))
# when sending data in post, there is no connection timeout, just an entire upload timeout
timeout = int(self.timeout_total)
url = container.name
@ -188,15 +196,7 @@ class _HttpDriver(_Driver):
host, _, path = object_name.partition('/')
url += host + '/'
m = MultipartEncoder(fields={
path: (path, iterator, get_file_mimetype(object_name))
})
headers = {
'Content-Type': m.content_type,
}
headers.update(container.get_headers(url) or {})
stream_size = None
if hasattr(iterator, 'tell') and hasattr(iterator, 'seek'):
pos = iterator.tell()
iterator.seek(0, 2)
@ -204,6 +204,16 @@ class _HttpDriver(_Driver):
iterator.seek(pos, 0)
timeout = max(timeout, (stream_size / 1024) / float(self.min_kbps_speed))
m = MultipartEncoder(fields={path: (path, iterator, get_file_mimetype(object_name))})
if callback and stream_size:
m = MultipartEncoderMonitor(m, callback=monitor_callback)
m.previous_read = 0
headers = {
'Content-Type': m.content_type,
}
headers.update(container.get_headers(url) or {})
res = container.session.post(
url, data=m, timeout=timeout, headers=headers
)
@ -211,12 +221,6 @@ class _HttpDriver(_Driver):
raise ValueError('Failed uploading object %s (%d): %s' % (object_name, res.status_code, res.text))
# call back is useless because we are not calling it while uploading...
# if callback and stream_size:
# try:
# callback(stream_size)
# except Exception as ex:
# log.debug('Exception raised when running callback function: %s' % ex)
return res
def list_container_objects(self, *args, **kwargs):

View File

@ -180,6 +180,8 @@ class Task(_Task):
__detect_repo_async = deferred_config('development.vcs_repo_detect_async', False)
__default_output_uri = DEV_DEFAULT_OUTPUT_URI.get() or deferred_config('development.default_output_uri', None)
__hidden_tag = "hidden"
_launch_multi_node_section = "launch_multi_node"
_launch_multi_node_instance_tag = "multi_node_instance"
@ -1921,8 +1923,16 @@ class Task(_Task):
"""
return self._get_logger(auto_connect_streams=self._log_to_backend)
def launch_multi_node(self, total_num_nodes, port=29500, queue=None, wait=False, addr=None):
# type: (int, Optional[int], Optional[str], bool, Optional[str]) -> dict
def launch_multi_node(
self,
total_num_nodes, # type: int
port=29500, # type: Optional[int]
queue=None, # type: Optional[str]
wait=False, # type: bool
addr=None, # type: Optional[str]
devices=None, # type: Optional[Union[int, Sequence[int]]]
hide_children=False # bool
):
"""
Enqueue multiple clones of the current task to a queue, allowing the task
to be ran by multiple workers in parallel. Each task running this way is called a node.
@ -1996,6 +2006,9 @@ class Task(_Task):
parameter will be set to the one defined in ``MASTER_ADDR``. If neither environment variables exist,
the value passed to the parameter will be used. If this value is None (default), the private IP of
the machine the master node is running on will be used.
:param devices: The devices to use. This can be a positive number indicating the number of devices to use,
a sequence of indices or the value ``-1`` to indicate all available devices should be used.
:param hide_children: If True, the children tasks will be hidden. Otherwise, they will be visible in the UI
:return: A dictionary containing relevant information regarding the multi node run. This dictionary has the following entries:
@ -2006,9 +2019,12 @@ class Task(_Task):
- `node_rank` - the rank of the current node (master has rank 0)
- `wait` - if True, the master node will wait for the other nodes to start
"""
def set_launch_multi_node_runtime_props(task, conf):
# noinspection PyProtectedMember
task._set_runtime_properties({"{}/{}".format(self._launch_multi_node_section, k): v for k, v in conf.items()})
task._set_runtime_properties(
{"{}/{}".format(self._launch_multi_node_section, k): v for k, v in conf.items()}
)
if total_num_nodes < 1:
raise UsageError("total_num_nodes needs to be at least 1")
@ -2024,6 +2040,7 @@ class Task(_Task):
),
"node_rank": 0,
"wait": wait,
"devices": devices
}
editable_conf = {"total_num_nodes": total_num_nodes, "queue": queue}
editable_conf = self.connect(editable_conf, name=self._launch_multi_node_section)
@ -2033,23 +2050,27 @@ class Task(_Task):
runtime_properties = self._get_runtime_properties()
remote_node_rank = runtime_properties.get("{}/node_rank".format(self._launch_multi_node_section))
current_conf = master_conf
if remote_node_rank:
# self is a child node, build the conf from the runtime proprerties
current_conf = {
entry: runtime_properties.get("{}/{}".format(self._launch_multi_node_section, entry))
for entry in master_conf.keys()
}
else:
elif os.environ.get("CLEARML_MULTI_NODE_MASTER") is None:
nodes_to_wait = []
# self is the master node, enqueue the other nodes
set_launch_multi_node_runtime_props(self, master_conf)
current_conf = master_conf
for node_rank in range(1, master_conf.get("total_num_nodes", total_num_nodes)):
node = self.clone(source_task=self)
node = self.clone(source_task=self, parent=self.id)
node_conf = copy.deepcopy(master_conf)
node_conf["node_rank"] = node_rank
set_launch_multi_node_runtime_props(node, node_conf)
node.set_system_tags(node.get_system_tags() + [self._launch_multi_node_instance_tag])
node.set_system_tags(
node.get_system_tags()
+ [self._launch_multi_node_instance_tag]
+ ([self.__hidden_tag] if hide_children else [])
)
if master_conf.get("queue"):
Task.enqueue(node, queue_name=master_conf["queue"])
else:
@ -2064,16 +2085,42 @@ class Task(_Task):
Task.TaskStatusEnum.stopped,
Task.TaskStatusEnum.closed,
Task.TaskStatusEnum.failed,
Task.TaskStatusEnum.in_progress
Task.TaskStatusEnum.in_progress,
),
check_interval_sec=10
check_interval_sec=10,
)
self.log.info("Node with task ID {} and rank {} detected".format(node_to_wait.id, rank))
os.environ["CLEARML_MULTI_NODE_MASTER"] = "1"
num_devices = 1
if devices is not None:
try:
num_devices = int(devices)
except TypeError:
try:
num_devices = len(devices)
except Exception as ex:
raise ValueError("Failed parsing number of devices: {}".format(ex))
except ValueError as ex:
raise ValueError("Failed parsing number of devices: {}".format(ex))
if num_devices < 0:
try:
import torch
num_devices = torch.cuda.device_count()
except ImportError:
raise ImportError(
"Could not import `torch` while finding the number of devices. "
"Please install it or set `devices` to a value different than -1"
)
os.environ["MASTER_ADDR"] = current_conf.get("master_addr", "")
os.environ["MASTER_PORT"] = str(current_conf.get("master_port", ""))
os.environ["WORLD_SIZE"] = str(current_conf.get("total_num_nodes", ""))
os.environ["RANK"] = str(current_conf.get("node_rank", ""))
os.environ["RANK"] = str(
current_conf.get("node_rank", 0) * num_devices + int(os.environ.get("LOCAL_RANK", "0"))
)
os.environ["NODE_RANK"] = str(current_conf.get("node_rank", ""))
os.environ["WORLD_SIZE"] = str(current_conf.get("total_num_nodes", total_num_nodes) * num_devices)
return current_conf

View File

@ -527,6 +527,13 @@ class BackgroundMonitor(object):
if isinstance(self._thread, Thread):
if self._thread_pid == os.getpid():
return
# make sure we start the metrics thread pools before starting the daemon thread
# workaround for: https://github.com/python/cpython/issues/113964
from ...backend_interface.metrics.interface import Metrics
# noinspection PyProtectedMember
Metrics._initialize_upload_pools()
self._thread_pid = os.getpid()
self._thread = Thread(target=self._daemon)
self._thread.daemon = True

View File

@ -3,6 +3,7 @@ import os
import platform
import sys
import warnings
from math import ceil, log10
from time import time
import psutil
@ -12,7 +13,7 @@ from typing import Text
from .process.mp import BackgroundMonitor
from ..backend_api import Session
from ..binding.frameworks.tensorflow_bind import IsTensorboardInit
from ..config import config
from ..config import config, ENV_MULTI_NODE_SINGLE_TASK
try:
from .gpu import gpustat
@ -27,6 +28,7 @@ class ResourceMonitor(BackgroundMonitor):
_wait_for_first_iteration_to_start_sec_default = 180.0
_max_wait_for_first_iteration_to_start_sec_default = 1800.0
_resource_monitor_instances = []
_multi_node_single_task = None
def __init__(self, task, sample_frequency_per_sec=2., report_frequency_sec=30.,
first_report_sec=None, wait_for_first_iteration_to_start_sec=None,
@ -34,6 +36,7 @@ class ResourceMonitor(BackgroundMonitor):
super(ResourceMonitor, self).__init__(task=task, wait_period=sample_frequency_per_sec)
# noinspection PyProtectedMember
ResourceMonitor._resource_monitor_instances.append(self)
ResourceMonitor._multi_node_single_task = ENV_MULTI_NODE_SINGLE_TASK.get()
self._task = task
self._sample_frequency = sample_frequency_per_sec
self._report_frequency = report_frequency_sec
@ -103,6 +106,35 @@ class ResourceMonitor(BackgroundMonitor):
if self._is_thread_mode_and_not_main_process():
return
multi_node_single_task_reporting = False
report_node_as_series = False
rank = 0
world_size_digits = 0
# check if we are in multi-node reporting to the same Task
# noinspection PyBroadException
try:
if self._multi_node_single_task:
# if resource monitoring is disabled, do nothing
if self._multi_node_single_task < 0:
return
# we are reporting machines stats on a different machine over the same Task
multi_node_single_task_reporting = True
if self._multi_node_single_task == 1:
# report per machine graph (unique title)
report_node_as_series = False
elif self._multi_node_single_task == 2:
# report per machine series (i.e. merge title+series resource and have "node X" as different series)
report_node_as_series = True
# noinspection PyBroadException
try:
rank = int(os.environ.get("RANK", os.environ.get('SLURM_PROCID')) or 0)
world_size_digits = ceil(log10(int(os.environ.get("WORLD_SIZE") or 0)))
except Exception:
pass
except Exception:
pass
seconds_since_started = 0
reported = 0
last_iteration = 0
@ -195,10 +227,33 @@ class ResourceMonitor(BackgroundMonitor):
for k, v in average_readouts.items():
# noinspection PyBroadException
try:
title = self._title_gpu if k.startswith('gpu_') else self._title_machine
# 3 points after the dot
# 3 digits after the dot
value = round(v * 1000) / 1000.
self._task.get_logger().report_scalar(title=title, series=k, iteration=iteration, value=value)
title = self._title_gpu if k.startswith('gpu_') else self._title_machine
series = k
if multi_node_single_task_reporting:
if report_node_as_series:
# for rank 0 we keep the same original report so that external services
# can always check the default cpu/gpu utilization
if rank == 0:
self._task.get_logger().report_scalar(
title=title, series=series,
iteration=iteration, value=value)
# now let's create an additional report
title = "{}:{}".format(":".join(title.split(":")[:-1]), series)
series = "rank {:0{world_size_digits}d}".format(
rank, world_size_digits=world_size_digits)
elif rank > 0:
title = "{}:rank{:0{world_size_digits}d}".format(
title, rank, world_size_digits=world_size_digits)
else:
# for rank 0 we keep the same original report so that external services
# can always check the default cpu/gpu utilization
pass
self._task.get_logger().report_scalar(title=title, series=series, iteration=iteration, value=value)
except Exception:
pass
# clear readouts if this is update is not averaged
@ -306,14 +361,35 @@ class ResourceMonitor(BackgroundMonitor):
def get_logger_reported_titles(cls, task):
# noinspection PyProtectedMember
titles = list(task.get_logger()._get_used_title_series().keys())
# noinspection PyBroadException
try:
titles.remove(cls._title_machine)
except ValueError:
pass
try:
titles.remove(cls._title_gpu)
except ValueError:
pass
multi_node = cls._multi_node_single_task is not None
except Exception:
multi_node = False
if multi_node:
title_machine = ":".join(cls._title_machine.split(":")[:-1])
title_gpu = ":".join(cls._title_gpu.split(":")[:-1])
if not title_machine:
title_machine = cls._title_machine
if not title_gpu:
title_gpu = cls._title_gpu
try:
titles = [t for t in titles if not t.startswith(title_machine) and not t.startswith(title_gpu)]
except ValueError:
pass
else:
try:
titles.remove(cls._title_machine)
except ValueError:
pass
try:
titles.remove(cls._title_gpu)
except ValueError:
pass
return titles
def _get_process_used_memory(self):

View File

@ -3,75 +3,77 @@
import keras_tuner as kt
import tensorflow as tf
import tensorflow_datasets as tfds
from clearml.external.kerastuner import ClearmlTunerLogger
from clearml.external.kerastuner import ClearmlTunerCallback
from clearml import Task
physical_devices = tf.config.list_physical_devices('GPU')
physical_devices = tf.config.list_physical_devices("GPU")
if physical_devices:
tf.config.experimental.set_visible_devices(physical_devices[0], 'GPU')
tf.config.experimental.set_visible_devices(physical_devices[0], "GPU")
tf.config.experimental.set_memory_growth(physical_devices[0], True)
def build_model(hp):
inputs = tf.keras.Input(shape=(32, 32, 3))
x = inputs
for i in range(hp.Int('conv_blocks', 3, 5, default=3)):
filters = hp.Int('filters_' + str(i), 32, 256, step=32)
for i in range(hp.Int("conv_blocks", 3, 5, default=3)):
filters = hp.Int("filters_" + str(i), 32, 256, step=32)
for _ in range(2):
x = tf.keras.layers.Convolution2D(
filters, kernel_size=(3, 3), padding='same')(x)
x = tf.keras.layers.Convolution2D(filters, kernel_size=(3, 3), padding="same")(x)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.ReLU()(x)
if hp.Choice('pooling_' + str(i), ['avg', 'max']) == 'max':
if hp.Choice("pooling_" + str(i), ["avg", "max"]) == "max":
x = tf.keras.layers.MaxPool2D()(x)
else:
x = tf.keras.layers.AvgPool2D()(x)
x = tf.keras.layers.AvgPool2D(pool_size=1)(x)
x = tf.keras.layers.GlobalAvgPool2D()(x)
x = tf.keras.layers.Dense(
hp.Int('hidden_size', 30, 100, step=10, default=50),
activation='relu')(x)
x = tf.keras.layers.Dropout(
hp.Float('dropout', 0, 0.5, step=0.1, default=0.5))(x)
outputs = tf.keras.layers.Dense(10, activation='softmax')(x)
x = tf.keras.layers.Dense(hp.Int("hidden_size", 30, 100, step=10, default=50), activation="relu")(x)
x = tf.keras.layers.Dropout(hp.Float("dropout", 0, 0.5, step=0.1, default=0.5))(x)
outputs = tf.keras.layers.Dense(10, activation="softmax")(x)
model = tf.keras.Model(inputs, outputs)
model.compile(
optimizer=tf.keras.optimizers.Adam(
hp.Float('learning_rate', 1e-4, 1e-2, sampling='log')),
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
optimizer=tf.keras.optimizers.Adam(hp.Float("learning_rate", 1e-4, 1e-2, sampling="log")),
loss="sparse_categorical_crossentropy",
metrics=["accuracy"],
)
return model
# Connecting ClearML with the current process,
# from here on everything is logged automatically
task = Task.init('examples', 'kerastuner cifar10 tuning')
task = Task.init("examples", "kerastuner cifar10 tuning")
tuner = kt.Hyperband(
build_model,
project_name='kt examples',
logger=ClearmlTunerLogger(),
objective='val_accuracy',
project_name="kt examples",
# logger=ClearmlTunerLogger(),
objective="val_accuracy",
max_epochs=10,
hyperband_iterations=6)
hyperband_iterations=6,
)
data = tfds.load('cifar10')
train_ds, test_ds = data['train'], data['test']
data = tfds.load("cifar10")
train_ds, test_ds = data["train"], data["test"]
def standardize_record(record):
return tf.cast(record['image'], tf.float32) / 255., record['label']
return tf.cast(record["image"], tf.float32) / 255.0, record["label"]
train_ds = train_ds.map(standardize_record).cache().batch(64).shuffle(10000)
test_ds = test_ds.map(standardize_record).cache().batch(64)
tuner.search(train_ds,
validation_data=test_ds,
callbacks=[tf.keras.callbacks.EarlyStopping(patience=1),
tf.keras.callbacks.TensorBoard(),
])
tuner.search(
train_ds,
validation_data=test_ds,
callbacks=[
tf.keras.callbacks.EarlyStopping(patience=1),
tf.keras.callbacks.TensorBoard(),
ClearmlTunerCallback(tuner)
],
)
best_model = tuner.get_best_models(1)[0]
best_hyperparameters = tuner.get_best_hyperparameters(1)[0]

View File

@ -0,0 +1,77 @@
"""Keras Tuner CIFAR10 example for the TensorFlow blog post."""
import keras_tuner as kt
import tensorflow as tf
import tensorflow_datasets as tfds
from clearml.external.kerastuner import ClearmlTunerLogger
from clearml import Task
physical_devices = tf.config.list_physical_devices('GPU')
if physical_devices:
tf.config.experimental.set_visible_devices(physical_devices[0], 'GPU')
tf.config.experimental.set_memory_growth(physical_devices[0], True)
def build_model(hp):
inputs = tf.keras.Input(shape=(32, 32, 3))
x = inputs
for i in range(hp.Int('conv_blocks', 3, 5, default=3)):
filters = hp.Int('filters_' + str(i), 32, 256, step=32)
for _ in range(2):
x = tf.keras.layers.Convolution2D(
filters, kernel_size=(3, 3), padding='same')(x)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.ReLU()(x)
if hp.Choice('pooling_' + str(i), ['avg', 'max']) == 'max':
x = tf.keras.layers.MaxPool2D()(x)
else:
x = tf.keras.layers.AvgPool2D()(x)
x = tf.keras.layers.GlobalAvgPool2D()(x)
x = tf.keras.layers.Dense(
hp.Int('hidden_size', 30, 100, step=10, default=50),
activation='relu')(x)
x = tf.keras.layers.Dropout(
hp.Float('dropout', 0, 0.5, step=0.1, default=0.5))(x)
outputs = tf.keras.layers.Dense(10, activation='softmax')(x)
model = tf.keras.Model(inputs, outputs)
model.compile(
optimizer=tf.keras.optimizers.Adam(
hp.Float('learning_rate', 1e-4, 1e-2, sampling='log')),
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
return model
# Connecting ClearML with the current process,
# from here on everything is logged automatically
task = Task.init('examples', 'kerastuner cifar10 tuning')
tuner = kt.Hyperband(
build_model,
project_name='kt examples',
logger=ClearmlTunerLogger(),
objective='val_accuracy',
max_epochs=10,
hyperband_iterations=6)
data = tfds.load('cifar10')
train_ds, test_ds = data['train'], data['test']
def standardize_record(record):
return tf.cast(record['image'], tf.float32) / 255., record['label']
train_ds = train_ds.map(standardize_record).cache().batch(64).shuffle(10000)
test_ds = test_ds.map(standardize_record).cache().batch(64)
tuner.search(train_ds,
validation_data=test_ds,
callbacks=[tf.keras.callbacks.EarlyStopping(patience=1),
tf.keras.callbacks.TensorBoard(),
])
best_model = tuner.get_best_models(1)[0]
best_hyperparameters = tuner.get_best_hyperparameters(1)[0]