This commit is contained in:
revital 2024-09-15 11:56:17 +03:00
commit 795a490490
13 changed files with 638 additions and 494 deletions

View File

@ -186,7 +186,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2023 allegro.ai
Copyright 2024 ClearML
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.

View File

@ -211,7 +211,7 @@ If ClearML is part of your development process / project / publication, please c
```
@misc{clearml,
title = {ClearML - Your entire MLOps stack in one open-source tool},
year = {2023},
year = {2024},
note = {Software available from http://github.com/allegroai/clearml},
url={https://clear.ml/},
author = {ClearML},

View File

@ -378,10 +378,13 @@ class Model(IdObjectBase, AsyncManagerMixin, _StorageUriMixin):
if uploaded_uri is None:
return
# If not successful, mark model as failed_uploading
# If not successful, mark model as failed_uploading,
# but dont override valid urls
if uploaded_uri is False:
uploaded_uri = "{}/failed_uploading".format(
self._upload_storage_uri
uploaded_uri = (
self.data.uri
if self.data.uri != "{}/uploading_file".format(self._upload_storage_uri or "file://")
else "{}/failed_uploading".format(self._upload_storage_uri or "file://")
)
Model._local_model_to_id_uri[str(model_file)] = (

View File

@ -3,9 +3,9 @@ import json
import os
import re
import tempfile
from sys import platform
from functools import reduce
from logging import getLogger
from sys import platform
from typing import Optional, Sequence, Union, Tuple, List, Callable, Dict, Any
from pathlib2 import Path
@ -27,28 +27,28 @@ class CreateAndPopulate(object):
)
def __init__(
self,
project_name=None, # type: Optional[str]
task_name=None, # type: Optional[str]
task_type=None, # type: Optional[str]
repo=None, # type: Optional[str]
branch=None, # type: Optional[str]
commit=None, # type: Optional[str]
script=None, # type: Optional[str]
working_directory=None, # type: Optional[str]
module=None, # type: Optional[str]
packages=None, # type: Optional[Union[bool, Sequence[str]]]
requirements_file=None, # type: Optional[Union[str, Path]]
docker=None, # type: Optional[str]
docker_args=None, # type: Optional[str]
docker_bash_setup_script=None, # type: Optional[str]
output_uri=None, # type: Optional[str]
base_task_id=None, # type: Optional[str]
add_task_init_call=True, # type: bool
force_single_script_file=False, # type: bool
raise_on_missing_entries=False, # type: bool
verbose=False, # type: bool
binary=None # type: Optional[str]
self,
project_name=None, # type: Optional[str]
task_name=None, # type: Optional[str]
task_type=None, # type: Optional[str]
repo=None, # type: Optional[str]
branch=None, # type: Optional[str]
commit=None, # type: Optional[str]
script=None, # type: Optional[str]
working_directory=None, # type: Optional[str]
module=None, # type: Optional[str]
packages=None, # type: Optional[Union[bool, Sequence[str]]]
requirements_file=None, # type: Optional[Union[str, Path]]
docker=None, # type: Optional[str]
docker_args=None, # type: Optional[str]
docker_bash_setup_script=None, # type: Optional[str]
output_uri=None, # type: Optional[str]
base_task_id=None, # type: Optional[str]
add_task_init_call=True, # type: bool
force_single_script_file=False, # type: bool
raise_on_missing_entries=False, # type: bool
verbose=False, # type: bool
binary=None, # type: Optional[str]
):
# type: (...) -> None
"""
@ -106,15 +106,16 @@ class CreateAndPopulate(object):
if not script and not module:
raise ValueError("Entry point script not provided")
if not repo and not folder and (script and not Path(script).is_file()):
raise ValueError("Script file \'{}\' could not be found".format(script))
raise ValueError("Script file '{}' could not be found".format(script))
if raise_on_missing_entries and commit and branch:
raise ValueError(
"Specify either a branch/tag or specific commit id, not both (either --commit or --branch)")
if raise_on_missing_entries and not folder and working_directory and working_directory.startswith('/'):
raise ValueError("working directory \'{}\', must be relative to repository root")
"Specify either a branch/tag or specific commit id, not both (either --commit or --branch)"
)
if raise_on_missing_entries and not folder and working_directory and working_directory.startswith("/"):
raise ValueError("working directory '{}', must be relative to repository root")
if requirements_file and not Path(requirements_file).is_file():
raise ValueError("requirements file could not be found \'{}\'")
raise ValueError("requirements file could not be found '{}'")
self.folder = folder
self.commit = commit
@ -124,8 +125,9 @@ class CreateAndPopulate(object):
self.module = module
self.cwd = working_directory
assert not packages or isinstance(packages, (tuple, list, bool))
self.packages = list(packages) if packages is not None and not isinstance(packages, bool) \
else (packages or None)
self.packages = (
list(packages) if packages is not None and not isinstance(packages, bool) else (packages or None)
)
self.requirements_file = Path(requirements_file) if requirements_file else None
self.base_task_id = base_task_id
self.docker = dict(image=docker, args=docker_args, bash_script=docker_bash_setup_script)
@ -179,14 +181,20 @@ class CreateAndPopulate(object):
stand_alone_script_outside_repo = True
if not os.path.isfile(entry_point) and not stand_alone_script_outside_repo:
if (not Path(self.script).is_absolute() and not Path(self.cwd).is_absolute() and
(Path(self.folder) / self.cwd / self.script).is_file()):
if (
not Path(self.script).is_absolute()
and not Path(self.cwd).is_absolute()
and (Path(self.folder) / self.cwd / self.script).is_file()
):
entry_point = (Path(self.folder) / self.cwd / self.script).as_posix()
elif (Path(self.cwd).is_absolute() and not Path(self.script).is_absolute() and
(Path(self.cwd) / self.script).is_file()):
elif (
Path(self.cwd).is_absolute()
and not Path(self.script).is_absolute()
and (Path(self.cwd) / self.script).is_file()
):
entry_point = (Path(self.cwd) / self.script).as_posix()
else:
raise ValueError("Script entrypoint file \'{}\' could not be found".format(entry_point))
raise ValueError("Script entrypoint file '{}' could not be found".format(entry_point))
local_entry_file = entry_point
@ -215,20 +223,25 @@ class CreateAndPopulate(object):
detailed_req_report=False,
force_single_script=True,
)
if repo_info.script['diff']:
print("Warning: local git repo diff is ignored, "
"storing only the standalone script form {}".format(self.script))
repo_info.script['diff'] = a_repo_info.script['diff'] or ''
repo_info.script['entry_point'] = a_repo_info.script['entry_point']
if repo_info.script["diff"]:
print(
"Warning: local git repo diff is ignored, "
"storing only the standalone script form {}".format(self.script)
)
repo_info.script["diff"] = a_repo_info.script["diff"] or ""
repo_info.script["entry_point"] = a_repo_info.script["entry_point"]
if a_create_requirements:
repo_info['requirements'] = a_repo_info.script.get('requirements') or {}
repo_info["requirements"] = a_repo_info.script.get("requirements") or {}
# check if we have no repository and no requirements raise error
if self.raise_on_missing_entries and (not self.requirements_file and not self.packages) \
and not self.repo and (
not repo_info or not repo_info.script or not repo_info.script.get('repository')) \
and (not entry_point or not entry_point.endswith(".sh")):
raise ValueError("Standalone script detected \'{}\', but no requirements provided".format(self.script))
if (
self.raise_on_missing_entries
and (not self.requirements_file and not self.packages)
and not self.repo
and (not repo_info or not repo_info.script or not repo_info.script.get("repository"))
and (not entry_point or not entry_point.endswith(".sh"))
):
raise ValueError("Standalone script detected '{}', but no requirements provided".format(self.script))
if dry_run:
task = None
task_state = dict(
@ -237,106 +250,127 @@ class CreateAndPopulate(object):
type=str(self.task_type or Task.TaskTypes.training),
) # type: dict
if self.output_uri is not None:
task_state['output'] = dict(destination=self.output_uri)
task_state["output"] = dict(destination=self.output_uri)
else:
task_state = dict(script={})
if self.base_task_id:
if self.verbose:
print('Cloning task {}'.format(self.base_task_id))
print("Cloning task {}".format(self.base_task_id))
task = Task.clone(source_task=self.base_task_id, project=Task.get_project_id(self.project_name))
self._set_output_uri(task)
else:
# noinspection PyProtectedMember
task = Task._create(
task_name=self.task_name, project_name=self.project_name,
task_type=self.task_type or Task.TaskTypes.training)
task_name=self.task_name,
project_name=self.project_name,
task_type=self.task_type or Task.TaskTypes.training,
)
self._set_output_uri(task)
# if there is nothing to populate, return
if not any([
self.folder, self.commit, self.branch, self.repo, self.script, self.module, self.cwd,
self.packages, self.requirements_file, self.base_task_id] + (list(self.docker.values()))
):
if not any(
[
self.folder,
self.commit,
self.branch,
self.repo,
self.script,
self.module,
self.cwd,
self.packages,
self.requirements_file,
self.base_task_id,
]
+ (list(self.docker.values()))
):
return task
# clear the script section
task_state['script'] = {}
task_state["script"] = {}
if repo_info:
task_state['script']['repository'] = repo_info.script['repository']
task_state['script']['version_num'] = repo_info.script['version_num']
task_state['script']['branch'] = repo_info.script['branch']
task_state['script']['diff'] = repo_info.script['diff'] or ''
task_state['script']['working_dir'] = repo_info.script['working_dir']
task_state['script']['entry_point'] = repo_info.script['entry_point']
task_state['script']['binary'] = self.binary or ('/bin/bash' if (
(repo_info.script['entry_point'] or '').lower().strip().endswith('.sh') and
not (repo_info.script['entry_point'] or '').lower().strip().startswith('-m ')) \
else repo_info.script['binary'])
task_state['script']['requirements'] = repo_info.script.get('requirements') or {}
task_state["script"]["repository"] = repo_info.script["repository"]
task_state["script"]["version_num"] = repo_info.script["version_num"]
task_state["script"]["branch"] = repo_info.script["branch"]
task_state["script"]["diff"] = repo_info.script["diff"] or ""
task_state["script"]["working_dir"] = repo_info.script["working_dir"]
task_state["script"]["entry_point"] = repo_info.script["entry_point"]
task_state["script"]["binary"] = self.binary or (
"/bin/bash"
if (
(repo_info.script["entry_point"] or "").lower().strip().endswith(".sh")
and not (repo_info.script["entry_point"] or "").lower().strip().startswith("-m ")
)
else repo_info.script["binary"]
)
task_state["script"]["requirements"] = repo_info.script.get("requirements") or {}
if self.cwd:
cwd = self.cwd
if not Path(cwd).is_absolute():
# cwd should be relative to the repo_root, but we need the full path
# (repo_root + cwd) in order to resolve the entry point
cwd = os.path.normpath((Path(repo_info.script['repo_root']) / self.cwd).as_posix())
cwd = os.path.normpath((Path(repo_info.script["repo_root"]) / self.cwd).as_posix())
if not Path(cwd).is_dir():
# we need to leave it as is, we have no idea, and this is a repo
cwd = self.cwd
elif not Path(cwd).is_dir():
# we were passed an absolute dir and it does not exist
raise ValueError("Working directory \'{}\' could not be found".format(cwd))
raise ValueError("Working directory '{}' could not be found".format(cwd))
if self.module:
entry_point = "-m {}".format(self.module)
elif stand_alone_script_outside_repo:
# this should be relative and the temp file we generated
entry_point = repo_info.script['entry_point']
entry_point = repo_info.script["entry_point"]
else:
entry_point = os.path.normpath(
Path(repo_info.script['repo_root']) /
repo_info.script['working_dir'] / repo_info.script['entry_point']
Path(repo_info.script["repo_root"])
/ repo_info.script["working_dir"]
/ repo_info.script["entry_point"]
)
# resolve entry_point relative to the current working directory
if Path(cwd).is_absolute():
entry_point = Path(entry_point).relative_to(cwd).as_posix()
else:
entry_point = repo_info.script['entry_point']
entry_point = repo_info.script["entry_point"]
# restore cwd - make it relative to the repo_root again
if Path(cwd).is_absolute():
# now cwd is relative again
cwd = Path(cwd).relative_to(repo_info.script['repo_root']).as_posix()
cwd = Path(cwd).relative_to(repo_info.script["repo_root"]).as_posix()
# make sure we always have / (never \\)
if platform == "win32":
entry_point = entry_point.replace('\\', '/') if entry_point else ""
cwd = cwd.replace('\\', '/') if cwd else ""
entry_point = entry_point.replace("\\", "/") if entry_point else ""
cwd = cwd.replace("\\", "/") if cwd else ""
task_state['script']['entry_point'] = entry_point or ""
task_state['script']['working_dir'] = cwd or "."
task_state["script"]["entry_point"] = entry_point or ""
task_state["script"]["working_dir"] = cwd or "."
elif self.repo:
cwd = '/'.join([p for p in (self.cwd or '.').split('/') if p and p != '.'])
cwd = "/".join([p for p in (self.cwd or ".").split("/") if p and p != "."])
# normalize backslashes and remove first one
if self.module:
entry_point = "-m {}".format(self.module)
else:
entry_point = '/'.join([p for p in self.script.split('/') if p and p != '.'])
if cwd and entry_point.startswith(cwd + '/'):
entry_point = entry_point[len(cwd) + 1:]
entry_point = "/".join([p for p in self.script.split("/") if p and p != "."])
if cwd and entry_point.startswith(cwd + "/"):
entry_point = entry_point[len(cwd) + 1 :]
task_state['script']['repository'] = self.repo
task_state['script']['version_num'] = self.commit or None
task_state['script']['branch'] = self.branch or None
task_state['script']['diff'] = ''
task_state['script']['working_dir'] = cwd or '.'
task_state['script']['entry_point'] = entry_point or ""
task_state["script"]["repository"] = self.repo
task_state["script"]["version_num"] = self.commit or None
task_state["script"]["branch"] = self.branch or None
task_state["script"]["diff"] = ""
task_state["script"]["working_dir"] = cwd or "."
task_state["script"]["entry_point"] = entry_point or ""
if self.script and Path(self.script).is_file() and (
self.force_single_script_file or Path(self.script).is_absolute()):
if (
self.script
and Path(self.script).is_file()
and (self.force_single_script_file or Path(self.script).is_absolute())
):
self.force_single_script_file = True
create_requirements = self.packages is True
repo_info, requirements = ScriptInfo.get(
@ -349,29 +383,37 @@ class CreateAndPopulate(object):
detailed_req_report=False,
force_single_script=True,
)
task_state['script']['binary'] = self.binary or ('/bin/bash' if (
(repo_info.script['entry_point'] or '').lower().strip().endswith('.sh') and
not (repo_info.script['entry_point'] or '').lower().strip().startswith('-m ')) \
else repo_info.script['binary'])
task_state['script']['diff'] = repo_info.script['diff'] or ''
task_state['script']['entry_point'] = repo_info.script['entry_point']
task_state["script"]["binary"] = self.binary or (
"/bin/bash"
if (
(repo_info.script["entry_point"] or "").lower().strip().endswith(".sh")
and not (repo_info.script["entry_point"] or "").lower().strip().startswith("-m ")
)
else repo_info.script["binary"]
)
task_state["script"]["diff"] = repo_info.script["diff"] or ""
task_state["script"]["entry_point"] = repo_info.script["entry_point"]
if create_requirements:
task_state['script']['requirements'] = repo_info.script.get('requirements') or {}
task_state["script"]["requirements"] = repo_info.script.get("requirements") or {}
else:
if self.binary:
task_state["script"]["binary"] = self.binary
elif entry_point and entry_point.lower().strip().endswith(".sh") and not \
entry_point.lower().strip().startswith("-m"):
elif (
entry_point
and entry_point.lower().strip().endswith(".sh")
and not entry_point.lower().strip().startswith("-m")
):
task_state["script"]["binary"] = "/bin/bash"
else:
# standalone task
task_state['script']['entry_point'] = self.script if self.script else \
("-m {}".format(self.module) if self.module else "")
task_state['script']['working_dir'] = '.'
task_state["script"]["entry_point"] = (
self.script if self.script else ("-m {}".format(self.module) if self.module else "")
)
task_state["script"]["working_dir"] = "."
# update requirements
reqs = []
if self.requirements_file:
with open(self.requirements_file.as_posix(), 'rt') as f:
with open(self.requirements_file.as_posix(), "rt") as f:
reqs = [line.strip() for line in f.readlines()]
if self.packages and self.packages is not True:
reqs += self.packages
@ -379,66 +421,76 @@ class CreateAndPopulate(object):
# make sure we have clearml.
clearml_found = False
for line in reqs:
if line.strip().startswith('#'):
if line.strip().startswith("#"):
continue
package = reduce(lambda a, b: a.split(b)[0], "#;@=~<>[", line).strip()
if package == 'clearml':
if package == "clearml":
clearml_found = True
break
if not clearml_found:
reqs.append('clearml')
task_state['script']['requirements'] = {'pip': '\n'.join(reqs)}
elif not self.repo and repo_info and not repo_info.script.get('requirements'):
reqs.append("clearml")
task_state["script"]["requirements"] = {"pip": "\n".join(reqs)}
elif not self.repo and repo_info and not repo_info.script.get("requirements"):
# we are in local mode, make sure we have "requirements.txt" it is a must
reqs_txt_file = Path(repo_info.script['repo_root']) / "requirements.txt"
poetry_toml_file = Path(repo_info.script['repo_root']) / "pyproject.toml"
reqs_txt_file = Path(repo_info.script["repo_root"]) / "requirements.txt"
poetry_toml_file = Path(repo_info.script["repo_root"]) / "pyproject.toml"
if self.raise_on_missing_entries and not reqs_txt_file.is_file() and not poetry_toml_file.is_file():
raise ValueError(
"requirements.txt not found [{}] "
"Use --requirements or --packages".format(reqs_txt_file.as_posix()))
"Use --requirements or --packages".format(reqs_txt_file.as_posix())
)
if self.add_task_init_call:
script_entry = ('/' + task_state['script'].get('working_dir', '.')
+ '/' + task_state['script']['entry_point'])
script_entry = (
"/" + task_state["script"].get("working_dir", ".") + "/" + task_state["script"]["entry_point"]
)
if platform == "win32":
script_entry = os.path.normpath(script_entry).replace('\\', '/')
script_entry = os.path.normpath(script_entry).replace("\\", "/")
else:
script_entry = os.path.abspath(script_entry)
idx_a = 0
lines = None
# find the right entry for the patch if we have a local file (basically after __future__
if (local_entry_file and not stand_alone_script_outside_repo and not self.module and
str(local_entry_file).lower().endswith(".py")):
with open(local_entry_file, 'rt') as f:
if (
local_entry_file
and not stand_alone_script_outside_repo
and not self.module
and str(local_entry_file).lower().endswith(".py")
):
with open(local_entry_file, "rt") as f:
lines = f.readlines()
future_found = self._locate_future_import(lines)
if future_found >= 0:
idx_a = future_found + 1
task_init_patch = ''
if ((self.repo or task_state.get('script', {}).get('repository')) and
not self.force_single_script_file and not stand_alone_script_outside_repo):
task_init_patch = ""
if (
(self.repo or task_state.get("script", {}).get("repository"))
and not self.force_single_script_file
and not stand_alone_script_outside_repo
):
# if we do not have requirements, add clearml to the requirements.txt
if not reqs:
task_init_patch += \
"diff --git a/requirements.txt b/requirements.txt\n" \
"--- a/requirements.txt\n" \
"+++ b/requirements.txt\n" \
"@@ -0,0 +1,1 @@\n" \
task_init_patch += (
"diff --git a/requirements.txt b/requirements.txt\n"
"--- a/requirements.txt\n"
"+++ b/requirements.txt\n"
"@@ -0,0 +1,1 @@\n"
"+clearml\n"
)
# Add Task.init call
if not self.module and script_entry and str(script_entry).lower().endswith(".py"):
task_init_patch += \
"diff --git a{script_entry} b{script_entry}\n" \
"--- a{script_entry}\n" \
"+++ b{script_entry}\n" \
"@@ -{idx_a},0 +{idx_b},4 @@\n" \
"+try: from allegroai import Task\n" \
"+except ImportError: from clearml import Task\n" \
"+(__name__ != \"__main__\") or Task.init()\n" \
"+\n".format(
script_entry=script_entry, idx_a=idx_a, idx_b=idx_a + 1)
task_init_patch += (
"diff --git a{script_entry} b{script_entry}\n"
"--- a{script_entry}\n"
"+++ b{script_entry}\n"
"@@ -{idx_a},0 +{idx_b},4 @@\n"
"+try: from allegroai import Task\n"
"+except ImportError: from clearml import Task\n"
'+(__name__ != "__main__") or Task.init()\n'
"+\n".format(script_entry=script_entry, idx_a=idx_a, idx_b=idx_a + 1)
)
elif self.module:
# if we are here, do nothing
pass
@ -449,57 +501,62 @@ class CreateAndPopulate(object):
"except ImportError: from clearml import Task\n",
'(__name__ != "__main__") or Task.init()\n\n',
]
task_state['script']['diff'] = ''.join(lines[:idx_a] + init_lines + lines[idx_a:])
task_state["script"]["diff"] = "".join(lines[:idx_a] + init_lines + lines[idx_a:])
# no need to add anything, we patched it.
task_init_patch = ""
elif str(script_entry or "").lower().endswith(".py"):
# Add Task.init call
# if we are here it means we do not have a git diff, but a single script file
task_init_patch += \
"try: from allegroai import Task\n" \
"except ImportError: from clearml import Task\n" \
"(__name__ != \"__main__\") or Task.init()\n\n"
task_state['script']['diff'] = task_init_patch + task_state['script'].get('diff', '')
task_init_patch += (
"try: from allegroai import Task\n"
"except ImportError: from clearml import Task\n"
'(__name__ != "__main__") or Task.init()\n\n'
)
task_state["script"]["diff"] = task_init_patch + task_state["script"].get("diff", "")
task_init_patch = ""
# make sure we add the diff at the end of the current diff
task_state['script']['diff'] = task_state['script'].get('diff', '')
if task_state['script']['diff'] and not task_state['script']['diff'].endswith('\n'):
task_state['script']['diff'] += '\n'
task_state['script']['diff'] += task_init_patch
task_state["script"]["diff"] = task_state["script"].get("diff", "")
if task_state["script"]["diff"] and not task_state["script"]["diff"].endswith("\n"):
task_state["script"]["diff"] += "\n"
task_state["script"]["diff"] += task_init_patch
# set base docker image if provided
if self.docker:
if dry_run:
task_state['container'] = dict(
image=self.docker.get('image') or '',
arguments=self.docker.get('args') or '',
setup_shell_script=self.docker.get('bash_script') or '',
task_state["container"] = dict(
image=self.docker.get("image") or "",
arguments=self.docker.get("args") or "",
setup_shell_script=self.docker.get("bash_script") or "",
)
else:
task.set_base_docker(
docker_image=self.docker.get('image'),
docker_arguments=self.docker.get('args'),
docker_setup_bash_script=self.docker.get('bash_script'),
docker_image=self.docker.get("image"),
docker_arguments=self.docker.get("args"),
docker_setup_bash_script=self.docker.get("bash_script"),
)
if self.verbose:
if task_state['script']['repository']:
repo_details = {k: v for k, v in task_state['script'].items()
if v and k not in ('diff', 'requirements', 'binary')}
print('Repository Detected\n{}'.format(json.dumps(repo_details, indent=2)))
if task_state["script"]["repository"]:
repo_details = {
k: v for k, v in task_state["script"].items() if v and k not in ("diff", "requirements", "binary")
}
print("Repository Detected\n{}".format(json.dumps(repo_details, indent=2)))
else:
print('Standalone script detected\n Script: {}'.format(self.script))
print("Standalone script detected\n Script: {}".format(self.script))
if task_state['script'].get('requirements') and \
task_state['script']['requirements'].get('pip'):
print('Requirements:{}{}'.format(
'\n Using requirements.txt: {}'.format(
self.requirements_file.as_posix()) if self.requirements_file else '',
'\n {}Packages: {}'.format('Additional ' if self.requirements_file else '', self.packages)
if self.packages else ''
))
if task_state["script"].get("requirements") and task_state["script"]["requirements"].get("pip"):
print(
"Requirements:{}{}".format(
"\n Using requirements.txt: {}".format(self.requirements_file.as_posix())
if self.requirements_file
else "",
"\n {}Packages: {}".format("Additional " if self.requirements_file else "", self.packages)
if self.packages
else "",
)
)
if self.docker:
print('Base docker image: {}'.format(self.docker))
print("Base docker image: {}".format(self.docker))
if dry_run:
return task_state
@ -538,18 +595,17 @@ class CreateAndPopulate(object):
args_list.append(a)
continue
try:
parts = a.split('=', 1)
parts = a.split("=", 1)
assert len(parts) == 2
args_list.append(parts)
except Exception:
raise ValueError(
"Failed parsing argument \'{}\', arguments must be in \'<key>=<value>\' format")
raise ValueError("Failed parsing argument '{}', arguments must be in '<key>=<value>' format")
if not self.task:
return
task_params = self.task.get_parameters()
args_list = {'Args/{}'.format(k): v for k, v in args_list}
args_list = {"Args/{}".format(k): v for k, v in args_list}
task_params.update(args_list)
self.task.set_parameters(task_params)
@ -569,8 +625,11 @@ class CreateAndPopulate(object):
"""
# skip over the first two lines, they are ours
# then skip over empty or comment lines
lines = [(i, line.split('#', 1)[0].rstrip()) for i, line in enumerate(lines)
if line.strip('\r\n\t ') and not line.strip().startswith('#')]
lines = [
(i, line.split("#", 1)[0].rstrip())
for i, line in enumerate(lines)
if line.strip("\r\n\t ") and not line.strip().startswith("#")
]
# remove triple quotes ' """ '
nested_c = -1
@ -597,11 +656,11 @@ class CreateAndPopulate(object):
# check the last import block
i, line = lines[found_index]
# wither we have \\ character at the end of the line or the line is indented
parenthesized_lines = '(' in line and ')' not in line
while line.endswith('\\') or parenthesized_lines:
parenthesized_lines = "(" in line and ")" not in line
while line.endswith("\\") or parenthesized_lines:
found_index += 1
i, line = lines[found_index]
if ')' in line:
if ")" in line:
break
else:
@ -671,33 +730,33 @@ if __name__ == '__main__':
@classmethod
def create_task_from_function(
cls,
a_function, # type: Callable
function_kwargs=None, # type: Optional[Dict[str, Any]]
function_input_artifacts=None, # type: Optional[Dict[str, str]]
function_return=None, # type: Optional[List[str]]
project_name=None, # type: Optional[str]
task_name=None, # type: Optional[str]
task_type=None, # type: Optional[str]
auto_connect_frameworks=None, # type: Optional[dict]
auto_connect_arg_parser=None, # type: Optional[dict]
repo=None, # type: Optional[str]
branch=None, # type: Optional[str]
commit=None, # type: Optional[str]
packages=None, # type: Optional[Union[str, Sequence[str]]]
docker=None, # type: Optional[str]
docker_args=None, # type: Optional[str]
docker_bash_setup_script=None, # type: Optional[str]
output_uri=None, # type: Optional[str]
helper_functions=None, # type: Optional[Sequence[Callable]]
dry_run=False, # type: bool
task_template_header=None, # type: Optional[str]
artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]]
artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
_sanitize_function=None, # type: Optional[Callable[[str], str]]
_sanitize_helper_functions=None, # type: Optional[Callable[[str], str]]
skip_global_imports=False, # type: bool
working_dir=None # type: Optional[str]
cls,
a_function, # type: Callable
function_kwargs=None, # type: Optional[Dict[str, Any]]
function_input_artifacts=None, # type: Optional[Dict[str, str]]
function_return=None, # type: Optional[List[str]]
project_name=None, # type: Optional[str]
task_name=None, # type: Optional[str]
task_type=None, # type: Optional[str]
auto_connect_frameworks=None, # type: Optional[dict]
auto_connect_arg_parser=None, # type: Optional[dict]
repo=None, # type: Optional[str]
branch=None, # type: Optional[str]
commit=None, # type: Optional[str]
packages=None, # type: Optional[Union[str, Sequence[str]]]
docker=None, # type: Optional[str]
docker_args=None, # type: Optional[str]
docker_bash_setup_script=None, # type: Optional[str]
output_uri=None, # type: Optional[str]
helper_functions=None, # type: Optional[Sequence[Callable]]
dry_run=False, # type: bool
task_template_header=None, # type: Optional[str]
artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]]
artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
_sanitize_function=None, # type: Optional[Callable[[str], str]]
_sanitize_helper_functions=None, # type: Optional[Callable[[str], str]]
skip_global_imports=False, # type: bool
working_dir=None, # type: Optional[str]
):
# type: (...) -> Optional[Dict, Task]
"""
@ -793,8 +852,8 @@ if __name__ == '__main__':
if auto_connect_arg_parser is None:
auto_connect_arg_parser = True
assert (not auto_connect_frameworks or isinstance(auto_connect_frameworks, (bool, dict)))
assert (not auto_connect_arg_parser or isinstance(auto_connect_arg_parser, (bool, dict)))
assert not auto_connect_frameworks or isinstance(auto_connect_frameworks, (bool, dict))
assert not auto_connect_arg_parser or isinstance(auto_connect_arg_parser, (bool, dict))
function_source, function_name = CreateFromFunction.__extract_function_information(
a_function, sanitize_function=_sanitize_function, skip_global_imports=skip_global_imports
@ -819,9 +878,9 @@ if __name__ == '__main__':
function_input_artifacts = function_input_artifacts or dict()
# verify artifact kwargs:
if not all(len(v.split('.', 1)) == 2 for v in function_input_artifacts.values()):
if not all(len(v.split(".", 1)) == 2 for v in function_input_artifacts.values()):
raise ValueError(
'function_input_artifacts={}, it must in the format: '
"function_input_artifacts={}, it must in the format: "
'{{"argument": "task_id.artifact_name"}}'.format(function_input_artifacts)
)
inspect_args = None
@ -835,16 +894,20 @@ if __name__ == '__main__':
# adjust the defaults so they match the args (match from the end)
if inspect_defaults_vals and len(inspect_defaults_vals) != len(inspect_defaults_args):
inspect_defaults_args = inspect_defaults_args[-len(inspect_defaults_vals):]
inspect_defaults_args = inspect_defaults_args[-len(inspect_defaults_vals) :]
if inspect_defaults_vals and len(inspect_defaults_vals) != len(inspect_defaults_args):
getLogger().warning(
'Ignoring default argument values: '
'could not find all default valued for: \'{}\''.format(function_name))
"Ignoring default argument values: "
"could not find all default valued for: '{}'".format(function_name)
)
inspect_defaults_vals = []
function_kwargs = {str(k): v for k, v in zip(inspect_defaults_args, inspect_defaults_vals)} \
if inspect_defaults_vals else {str(k): None for k in inspect_defaults_args}
function_kwargs = (
{str(k): v for k, v in zip(inspect_defaults_args, inspect_defaults_vals)}
if inspect_defaults_vals
else {str(k): None for k in inspect_defaults_args}
)
if function_kwargs:
if not inspect_args:
@ -853,8 +916,10 @@ if __name__ == '__main__':
if inspect_args.annotations:
supported_types = _Arguments.get_supported_types()
function_kwargs_types = {
str(k): str(inspect_args.annotations[k].__name__) for k in inspect_args.annotations
if inspect_args.annotations[k] in supported_types}
str(k): str(inspect_args.annotations[k].__name__)
for k in inspect_args.annotations
if inspect_args.annotations[k] in supported_types
}
task_template = cls.task_template.format(
header=task_template_header or cls.default_task_template_header,
@ -871,11 +936,11 @@ if __name__ == '__main__':
artifact_serialization_function_source=artifact_serialization_function_source,
artifact_serialization_function_name=artifact_serialization_function_name,
artifact_deserialization_function_source=artifact_deserialization_function_source,
artifact_deserialization_function_name=artifact_deserialization_function_name
artifact_deserialization_function_name=artifact_deserialization_function_name,
)
temp_dir = repo if repo and os.path.isdir(repo) else None
with tempfile.NamedTemporaryFile('w', suffix='.py', dir=temp_dir) as temp_file:
with tempfile.NamedTemporaryFile("w", suffix=".py", dir=temp_dir) as temp_file:
temp_file.write(task_template)
temp_file.flush()
@ -899,38 +964,53 @@ if __name__ == '__main__':
docker_bash_setup_script=docker_bash_setup_script,
output_uri=output_uri,
add_task_init_call=False,
working_directory=working_dir
working_directory=working_dir,
)
entry_point = '{}.py'.format(function_name)
entry_point = "{}.py".format(function_name)
task = populate.create_task(dry_run=dry_run)
if dry_run:
task['script']['diff'] = task_template
task['script']['entry_point'] = entry_point
task['script']['working_dir'] = working_dir or '.'
task['hyperparams'] = {
task["script"]["diff"] = task_template
task["script"]["entry_point"] = entry_point
task["script"]["working_dir"] = working_dir or "."
task["hyperparams"] = {
cls.kwargs_section: {
k: dict(section=cls.kwargs_section, name=k,
value=str(v) if v is not None else '', type=function_kwargs_types.get(k, None))
k: dict(
section=cls.kwargs_section,
name=k,
value=str(v) if v is not None else "",
type=function_kwargs_types.get(k, None),
)
for k, v in (function_kwargs or {}).items()
},
cls.input_artifact_section: {
k: dict(section=cls.input_artifact_section, name=k, value=str(v) if v is not None else '')
k: dict(section=cls.input_artifact_section, name=k, value=str(v) if v is not None else "")
for k, v in (function_input_artifacts or {}).items()
}
},
}
else:
task.update_task(task_data={
'script': task.data.script.to_dict().update(
{'entry_point': entry_point, 'working_dir': '.', 'diff': task_template})})
hyper_parameters = {'{}/{}'.format(cls.kwargs_section, k): str(v) for k, v in function_kwargs} \
if function_kwargs else {}
hyper_parameters.update(
{'{}/{}'.format(cls.input_artifact_section, k): str(v) for k, v in function_input_artifacts}
if function_input_artifacts else {}
task.update_task(
task_data={
"script": task.data.script.to_dict().update(
{"entry_point": entry_point, "working_dir": ".", "diff": task_template}
)
}
)
hyper_parameters = (
{"{}/{}".format(cls.kwargs_section, k): str(v) for k, v in function_kwargs}
if function_kwargs
else {}
)
hyper_parameters.update(
{"{}/{}".format(cls.input_artifact_section, k): str(v) for k, v in function_input_artifacts}
if function_input_artifacts
else {}
)
__function_kwargs_types = (
{"{}/{}".format(cls.kwargs_section, k): v for k, v in function_kwargs_types}
if function_kwargs_types
else None
)
__function_kwargs_types = {'{}/{}'.format(cls.kwargs_section, k): v for k, v in function_kwargs_types} \
if function_kwargs_types else None
task.set_parameters(hyper_parameters, __parameters_types=__function_kwargs_types)
return task
@ -940,6 +1020,7 @@ if __name__ == '__main__':
# type: (str) -> str
try:
import ast
try:
# available in Python3.9+
from ast import unparse
@ -950,8 +1031,8 @@ if __name__ == '__main__':
# noinspection PyBroadException
try:
class TypeHintRemover(ast.NodeTransformer):
class TypeHintRemover(ast.NodeTransformer):
def visit_FunctionDef(self, node):
# remove the return type definition
node.returns = None
@ -967,7 +1048,7 @@ if __name__ == '__main__':
# and import statements from 'typing'
transformed = TypeHintRemover().visit(parsed_source)
# convert the AST back to source code
return unparse(transformed).lstrip('\n')
return unparse(transformed).lstrip("\n")
except Exception:
# just in case we failed parsing.
return function_source
@ -975,14 +1056,16 @@ if __name__ == '__main__':
@staticmethod
def __extract_imports(func):
def add_import_guard(import_):
return ("try:\n "
+ import_.replace("\n", "\n ", import_.count("\n") - 1)
+ "\nexcept Exception as e:\n print('Import error: ' + str(e))\n"
)
return (
"try:\n "
+ import_.replace("\n", "\n ", import_.count("\n") - 1)
+ "\nexcept Exception as e:\n print('Import error: ' + str(e))\n"
)
# noinspection PyBroadException
try:
import ast
func_module = inspect.getmodule(func)
source = inspect.getsource(func_module)
parsed_source = ast.parse(source)
@ -1006,7 +1089,7 @@ if __name__ == '__main__':
imports = [add_import_guard(import_) for import_ in imports]
return "\n".join(imports)
except Exception as e:
getLogger().warning('Could not fetch function imports: {}'.format(e))
getLogger().warning("Could not fetch function imports: {}".format(e))
return ""
@staticmethod
@ -1046,7 +1129,7 @@ if __name__ == '__main__':
result.append(f.name)
except Exception as e:
name = getattr(module, "__name__", module)
getLogger().warning('Could not fetch function declared in {}: {}'.format(name, e))
getLogger().warning("Could not fetch function declared in {}: {}".format(name, e))
return result
@staticmethod
@ -1058,12 +1141,11 @@ if __name__ == '__main__':
func_members_dict = dict(inspect.getmembers(original_module, inspect.isfunction))
except Exception as e:
name = getattr(original_module, "__name__", original_module)
getLogger().warning('Could not fetch functions from {}: {}'.format(name, e))
getLogger().warning("Could not fetch functions from {}: {}".format(name, e))
func_members_dict = {}
decorated_func = CreateFromFunction._deep_extract_wrapped(func)
decorated_func_source = CreateFromFunction.__sanitize(
inspect.getsource(decorated_func),
sanitize_function=sanitize_function
inspect.getsource(decorated_func), sanitize_function=sanitize_function
)
try:
import ast
@ -1083,14 +1165,16 @@ if __name__ == '__main__':
decorator_func = func_members_dict.get(name)
if name not in func_members or not decorator_func:
continue
decorated_func_source = CreateFromFunction.__get_source_with_decorators(
decorator_func,
original_module=original_module,
sanitize_function=sanitize_function
) + "\n\n" + decorated_func_source
decorated_func_source = (
CreateFromFunction.__get_source_with_decorators(
decorator_func, original_module=original_module, sanitize_function=sanitize_function
)
+ "\n\n"
+ decorated_func_source
)
break
except Exception as e:
getLogger().warning('Could not fetch full definition of function {}: {}'.format(func.__name__, e))
getLogger().warning("Could not fetch full definition of function {}: {}".format(func.__name__, e))
return decorated_func_source
@staticmethod

View File

@ -923,11 +923,11 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
return task_deleted
def _delete_uri(self, uri):
# type: (str) -> bool
def _delete_uri(self, uri, silent=False):
# type: (str, bool) -> bool
# noinspection PyBroadException
try:
deleted = StorageHelper.get(uri).delete(uri)
deleted = StorageHelper.get(uri).delete(uri, silent=silent)
if deleted:
self.log.debug("Deleted file: {}".format(uri))
return True
@ -1539,8 +1539,8 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
self._edit(execution=execution)
return self.data.execution.artifacts or []
def delete_artifacts(self, artifact_names, raise_on_errors=True, delete_from_storage=True):
# type: (Sequence[str], bool, bool) -> bool
def delete_artifacts(self, artifact_names, raise_on_errors=True, delete_from_storage=True, silent_on_errors=False):
# type: (Sequence[str], bool, bool, bool) -> bool
"""
Delete a list of artifacts, by artifact name, from the Task.
@ -1548,20 +1548,29 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
:param bool raise_on_errors: if True, do not suppress connectivity related exceptions
:param bool delete_from_storage: If True, try to delete the actual
file from the external storage (e.g. S3, GS, Azure, File Server etc.)
:param silent_on_errors: If True, do not log connectivity related errors
:return: True if successful
"""
return self._delete_artifacts(artifact_names, raise_on_errors, delete_from_storage)
return self._delete_artifacts(
artifact_names=artifact_names,
raise_on_errors=raise_on_errors,
delete_from_storage=delete_from_storage,
silent_on_errors=silent_on_errors
)
def _delete_artifacts(self, artifact_names, raise_on_errors=False, delete_from_storage=True):
# type: (Sequence[str], bool, bool) -> bool
def _delete_artifacts(
self, artifact_names, raise_on_errors=False, delete_from_storage=True, silent_on_errors=False
):
# type: (Sequence[str], bool, bool, bool) -> bool
"""
Delete a list of artifacts, by artifact name, from the Task.
:param list artifact_names: list of artifact names
:param bool raise_on_errors: if True, do not suppress connectivity related exceptions
:param bool delete_from_storage: If True, try to delete the actual
file from the external storage (e.g. S3, GS, Azure, File Server etc.)
file from the external storage (e.g. S3, GS, Azure, File Server etc.)
:param silent_on_errors: If True, do not log connectivity related errors
:return: True if successful
"""
@ -1605,7 +1614,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
if uris:
for i, (artifact, uri) in enumerate(zip(artifact_names, uris)):
# delete the actual file from storage, and raise if error and needed
if uri and not self._delete_uri(uri) and raise_on_errors:
if uri and not self._delete_uri(uri, silent=silent_on_errors) and raise_on_errors:
remaining_uris = {name: uri for name, uri in zip(artifact_names[i + 1:], uris[i + 1:])}
raise ArtifactUriDeleteError(artifact=artifact, uri=uri, remaining_uris=remaining_uris)

View File

@ -28,6 +28,7 @@ from ..storage.helper import remote_driver_schemes
from ..storage.util import sha256sum, format_size, get_common_path
from ..utilities.process.mp import SafeEvent, ForkSafeRLock
from ..utilities.proxy_object import LazyEvalWrapper
from ..config import deferred_config
try:
import pandas as pd
@ -262,6 +263,9 @@ class Artifacts(object):
# hashing constants
_hash_block_size = 65536
_pd_artifact_type = 'data-audit-table'
_default_pandas_dataframe_extension_name = deferred_config(
"development.artifacts.default_pandas_dataframe_extension_name", None
)
class _ProxyDictWrite(dict):
""" Dictionary wrapper that updates an arguments instance on any item set in the dictionary """
@ -464,19 +468,23 @@ class Artifacts(object):
artifact_type_data.content_type = "text/csv"
np.savetxt(local_filename, artifact_object, delimiter=",")
delete_after_upload = True
elif pd and isinstance(artifact_object, pd.DataFrame) \
and (isinstance(artifact_object.index, pd.MultiIndex) or
isinstance(artifact_object.columns, pd.MultiIndex)):
store_as_pickle = True
elif pd and isinstance(artifact_object, pd.DataFrame):
artifact_type = "pandas"
artifact_type_data.preview = preview or str(artifact_object.__repr__())
# we are making sure self._default_pandas_dataframe_extension_name is not deferred
extension_name = extension_name or str(self._default_pandas_dataframe_extension_name or "")
override_filename_ext_in_uri = get_extension(
extension_name, [".csv.gz", ".parquet", ".feather", ".pickle"], ".csv.gz", artifact_type
)
override_filename_in_uri = name
local_filename = self._push_temp_file(prefix=quote(name, safe="") + '.', suffix=override_filename_ext_in_uri)
if override_filename_ext_in_uri == ".csv.gz":
local_filename = self._push_temp_file(
prefix=quote(name, safe="") + ".", suffix=override_filename_ext_in_uri
)
if (
isinstance(artifact_object.index, pd.MultiIndex) or isinstance(artifact_object.columns, pd.MultiIndex)
) and not extension_name:
store_as_pickle = True
elif override_filename_ext_in_uri == ".csv.gz":
artifact_type_data.content_type = "text/csv"
self._store_compressed_pd_csv(artifact_object, local_filename)
elif override_filename_ext_in_uri == ".parquet":

View File

@ -55,6 +55,7 @@ class DownloadError(Exception):
@six.add_metaclass(ABCMeta)
class _Driver(object):
_certs_cache_context = "certs"
_file_server_hosts = None
@classmethod
@ -116,6 +117,28 @@ class _Driver(object):
cls._file_server_hosts = hosts
return cls._file_server_hosts
@classmethod
def download_cert(cls, cert_url):
# import here to avoid circular imports
from .manager import StorageManager
cls.get_logger().info("Attempting to download remote certificate '{}'".format(cert_url))
potential_exception = None
downloaded_verify = None
try:
downloaded_verify = StorageManager.get_local_copy(cert_url, cache_context=cls._certs_cache_context)
except Exception as e:
potential_exception = e
if not downloaded_verify:
cls.get_logger().error(
"Failed downloading remote certificate '{}'{}".format(
cert_url, "Error is: {}".format(potential_exception) if potential_exception else ""
)
)
else:
cls.get_logger().info("Successfully downloaded remote certificate '{}'".format(cert_url))
return downloaded_verify
class _HttpDriver(_Driver):
""" LibCloud http/https adapter (simple, enough for now) """
@ -231,8 +254,10 @@ class _HttpDriver(_Driver):
container = self._containers[obj.container_name]
res = container.session.delete(obj.url, headers=container.get_headers(obj.url))
if res.status_code != requests.codes.ok:
self.get_logger().warning('Failed deleting object %s (%d): %s' % (
obj.object_name, res.status_code, res.text))
if not kwargs.get("silent", False):
self.get_logger().warning(
'Failed deleting object %s (%d): %s' % (obj.object_name, res.status_code, res.text)
)
return False
return True
@ -447,6 +472,8 @@ class _Boto3Driver(_Driver):
# True is a non-documented value for boto3, use None instead (which means verify)
print("Using boto3 verify=None instead of true")
verify = None
elif isinstance(verify, str) and not os.path.exists(verify) and verify.split("://")[0] in driver_schemes:
verify = _Boto3Driver.download_cert(verify)
# boto3 client creation isn't thread-safe (client itself is)
with self._creation_lock:
@ -883,7 +910,8 @@ class _GoogleCloudStorageDriver(_Driver):
except ImportError:
pass
name = getattr(object, "name", "")
self.get_logger().warning("Failed deleting object {}: {}".format(name, ex))
if not kwargs.get("silent", False):
self.get_logger().warning("Failed deleting object {}: {}".format(name, ex))
return False
return not object.exists()
@ -2772,9 +2800,9 @@ class StorageHelper(object):
except Exception as e:
self._log.error("Could not download file : %s, err:%s " % (remote_path, str(e)))
def delete(self, path):
def delete(self, path, silent=False):
path = self._canonize_url(path)
return self._driver.delete_object(self.get_object(path))
return self._driver.delete_object(self.get_object(path), silent=silent)
def check_write_permissions(self, dest_path=None):
# create a temporary file, then delete it

View File

@ -27,9 +27,7 @@ class StorageManager(object):
_file_upload_retries = deferred_config("network.file_upload_retries", 3)
@classmethod
def get_local_copy(
cls, remote_url, cache_context=None, extract_archive=True, name=None, force_download=False
):
def get_local_copy(cls, remote_url, cache_context=None, extract_archive=True, name=None, force_download=False):
# type: (str, Optional[str], bool, Optional[str], bool) -> [str, None]
"""
Get a local copy of the remote file. If the remote URL is a direct file access,
@ -53,7 +51,8 @@ class StorageManager(object):
# this will get us the actual cache (even with direct access)
cache_path_encoding = Path(cache.get_cache_folder()) / cache.get_hashed_url_file(remote_url)
return cls._extract_to_cache(
cached_file, name, cache_context, cache_path_encoding=cache_path_encoding.as_posix())
cached_file, name, cache_context, cache_path_encoding=cache_path_encoding.as_posix()
)
return cached_file
@classmethod
@ -85,9 +84,7 @@ class StorageManager(object):
)
@classmethod
def set_cache_file_limit(
cls, cache_file_limit, cache_context=None
): # type: (int, Optional[str]) -> int
def set_cache_file_limit(cls, cache_file_limit, cache_context=None): # type: (int, Optional[str]) -> int
"""
Set the cache context file limit. File limit is the maximum number of files the specific cache context holds.
Notice, there is no limit on the size of these files, only the total number of cached files.
@ -102,13 +99,13 @@ class StorageManager(object):
@classmethod
def _extract_to_cache(
cls,
cached_file, # type: str
name, # type: str
cache_context=None, # type: Optional[str]
target_folder=None, # type: Optional[str]
cache_path_encoding=None, # type: Optional[str]
force=False, # type: bool
cls,
cached_file, # type: str
name, # type: str
cache_context=None, # type: Optional[str]
target_folder=None, # type: Optional[str]
cache_path_encoding=None, # type: Optional[str]
force=False, # type: bool
):
# type: (...) -> str
"""
@ -131,20 +128,21 @@ class StorageManager(object):
# we support zip and tar.gz files auto-extraction
suffix = cached_file.suffix.lower()
if suffix == '.gz':
suffix = ''.join(a.lower() for a in cached_file.suffixes[-2:])
if suffix == ".gz":
suffix = "".join(a.lower() for a in cached_file.suffixes[-2:])
if suffix not in (".zip", ".tgz", ".tar.gz"):
return str(cached_file)
cache_folder = Path(cache_path_encoding or cached_file).parent
archive_suffix = (cache_path_encoding or cached_file).name[:-len(suffix)]
archive_suffix = (cache_path_encoding or cached_file).name[: -len(suffix)]
name = encode_string_to_filename(name) if name else name
if target_folder:
target_folder = Path(target_folder)
else:
target_folder = cache_folder / CacheManager.get_context_folder_lookup(
cache_context).format(archive_suffix, name)
target_folder = cache_folder / CacheManager.get_context_folder_lookup(cache_context).format(
archive_suffix, name
)
if target_folder.is_dir() and not force:
# noinspection PyBroadException
@ -161,7 +159,8 @@ class StorageManager(object):
temp_target_folder = target_folder
else:
temp_target_folder = cache_folder / "{0}_{1}_{2}".format(
target_folder.name, time() * 1000, str(random()).replace('.', ''))
target_folder.name, time() * 1000, str(random()).replace(".", "")
)
temp_target_folder.mkdir(parents=True, exist_ok=True)
if suffix == ".zip":
@ -172,7 +171,7 @@ class StorageManager(object):
with tarfile.open(cached_file.as_posix()) as file:
safe_extract(file, temp_target_folder.as_posix())
elif suffix == ".tgz":
with tarfile.open(cached_file.as_posix(), mode='r:gz') as file:
with tarfile.open(cached_file.as_posix(), mode="r:gz") as file:
safe_extract(file, temp_target_folder.as_posix())
if temp_target_folder != target_folder:
@ -187,16 +186,17 @@ class StorageManager(object):
target_folder.touch(exist_ok=True)
else:
base_logger.warning(
"Failed renaming {0} to {1}".format(temp_target_folder.as_posix(), target_folder.as_posix()))
"Failed renaming {0} to {1}".format(temp_target_folder.as_posix(), target_folder.as_posix())
)
try:
shutil.rmtree(temp_target_folder.as_posix())
except Exception as ex:
base_logger.warning(
"Exception {}\nFailed deleting folder {}".format(ex, temp_target_folder.as_posix()))
"Exception {}\nFailed deleting folder {}".format(ex, temp_target_folder.as_posix())
)
except Exception as ex:
# failed extracting the file:
base_logger.warning(
"Exception {}\nFailed extracting zip file {}".format(ex, cached_file.as_posix()))
base_logger.warning("Exception {}\nFailed extracting zip file {}".format(ex, cached_file.as_posix()))
# noinspection PyBroadException
try:
target_folder.rmdir()
@ -208,6 +208,7 @@ class StorageManager(object):
@classmethod
def get_files_server(cls):
from ..backend_api import Session
return Session.get_files_server_host()
@classmethod
@ -251,7 +252,7 @@ class StorageManager(object):
pool.apply_async(
helper.upload,
args=(str(path), str(path).replace(local_folder, remote_url)),
kwds={"retries": retries if retries else cls._file_upload_retries}
kwds={"retries": retries if retries else cls._file_upload_retries},
)
)
@ -299,11 +300,11 @@ class StorageManager(object):
def remove_prefix_from_str(target_str, prefix_to_be_removed):
# type: (str, str) -> str
if target_str.startswith(prefix_to_be_removed):
return target_str[len(prefix_to_be_removed):]
return target_str[len(prefix_to_be_removed) :]
return target_str
longest_configured_url = StorageHelper._resolve_base_url(remote_url) # noqa
bucket_path = remove_prefix_from_str(remote_url[len(longest_configured_url):], "/")
bucket_path = remove_prefix_from_str(remote_url[len(longest_configured_url) :], "/")
if not local_folder:
local_folder = CacheManager.get_cache_manager().get_cache_folder()
@ -365,7 +366,7 @@ class StorageManager(object):
overwrite=False,
skip_zero_size_check=False,
silence_errors=False,
max_workers=None
max_workers=None,
):
# type: (str, Optional[str], Optional[str], bool, bool, bool, Optional[int]) -> Optional[str]
"""

View File

@ -878,11 +878,12 @@ class Task(_Task):
:param task_name: Set the name of the remote task. Required if base_task_id is None.
:param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference',
'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom'
:param repo: Remote URL for the repository to use, or path to local copy of the git repository
Example: 'https://github.com/allegroai/clearml.git' or '~/project/repo'
:param repo: Remote URL for the repository to use, or path to local copy of the git repository.
Example: 'https://github.com/allegroai/clearml.git' or '~/project/repo'. If ``repo`` is specified, then
the ``script`` parameter must also be specified
:param branch: Select specific repository branch/tag (implies the latest commit from the branch)
:param commit: Select specific commit ID to use (default: latest commit,
or when used with local repository matching the local commit id)
or when used with local repository matching the local commit ID)
:param script: Specify the entry point script for the remote execution. When used in tandem with
remote git repository the script should be a relative path inside the repository,
for example: './source/train.py' . When used with local repository path it supports a

View File

@ -7,19 +7,22 @@ import psutil
def get_filename_max_length(dir_path):
# type: (str) -> int
default = 255 # Common filesystems like NTFS, EXT4 and HFS+ limited with 255
try:
dir_path = pathlib2.Path(os.path.abspath(dir_path))
if platform == "win32":
# noinspection PyBroadException
dir_drive = dir_path.drive
for drv in psutil.disk_partitions():
if drv.device.startswith(dir_drive):
return drv.maxfile
# The maxfile attribute is only available in psutil >=5.7.4,<6.0.0
return getattr(drv, "maxfile", default)
elif platform in ("linux", "darwin"):
return os.statvfs(dir_path).f_namemax
except Exception as err:
print(err)
return 255 # Common filesystems like NTFS, EXT4 and HFS+ limited with 255
return default
def is_path_traversal(target_folder, relative_path):

View File

@ -1 +1 @@
__version__ = "1.16.3"
__version__ = "1.16.5"

View File

@ -240,5 +240,12 @@ sdk {
# iteration reporting x-axis after starting to report "seconds from start"
# max_wait_for_first_iteration_to_start_sec: 1800
}
artifacts {
# set default extension_name for pandas DataFrame objects
# valid values are: ``.csv.gz``, ``.parquet``, ``.feather``, ``.pickle``
# extension_name supplied to Task.upload_artifact is prioritized over this value
default_pandas_dataframe_extension_name: ""
}
}
}

View File

@ -1,185 +1,185 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "view-in-github"
},
"source": [
"<a href=\"https://colab.research.google.com/github/allegroai/clearml/blob/master/examples/clearml_agent/clearml_colab_agent.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "mC3A4rio6Y--"
},
"source": [
"# Google Colab Used to Launch ClearML Agent\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "EPYjmFLy1zuC"
},
"source": [
"## Step 1: Install all necessary packages"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "DwFC3fL8JAP3"
},
"outputs": [],
"source": [
"# If you don't have ClearML and ClearML Agent installed then uncomment these line\n",
"# !pip install clearml\n",
"# !pip install clearml-agent"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "gb41kM8i1-7T"
},
"source": [
"## Step 2: Export this environment variable\n",
"\n",
"This environment variable makes Matplotlib work in headless mode, so it won't output graphs to the screen"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "3-Bm4811VMLK"
},
"outputs": [],
"source": [
"! export MPLBACKEND=TkAg"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Step 2b (OPTIONAL): Enter your github credentials (only for private repositories)\n",
"In order for the agent to pull your code, it needs access to your repositories. If these are private, you'll have to supply the agent with github credentials to log in. Github/Bitbucket will no longer let you log in using username/password combinations. Instead, you have to use a personal token, read more about it [here for Github](https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/creating-a-personal-access-token) and [here for Bitbucket](https://support.atlassian.com/bitbucket-cloud/docs/app-passwords/)\n",
"\n",
"We can let the agent know which credentials to use by setting the following env variables"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#@title Insert your Git Credentials\n",
"\n",
"import os\n",
"os.environ[\"CLEARML_AGENT_GIT_USER\"] = \"username-goes-here\" #@param {type:\"string\"}\n",
"os.environ[\"CLEARML_AGENT_GIT_PASS\"] = \"git-personal-access-token\" #@param {type:\"string\"}"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "chuUzBaU2NyB"
},
"source": [
"## Step 3: Create new credentials: \n",
"1. Go to your [ClearML WebApp **Settings**](https://app.clear.ml/settings/workspace-configuration). \n",
"1. Under the **WORKSPACES** section, go to **App Credentials**, and click **+ Create new credentials**\n",
"1. Copy your credentials"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "1Kloxwfj0Vnq"
},
"source": [
"## Step 4: Set your ClearML Credentials\n",
"\n",
"Insert the credentials you created in Step 3. \n",
"\n",
"If you aren't using the ClearML hosted server, make sure to modify the server variables. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "YBPdmP4sJHnQ"
},
"outputs": [],
"source": [
"#@title Insert your own Credentials\n",
"\n",
"from clearml import Task\n",
"\n",
"web_server = 'https://app.clear.ml'#@param {type:\"string\"} \n",
"api_server = 'https://api.clear.ml'#@param {type:\"string\"} \n",
"files_server = 'https://files.clear.ml'#@param {type:\"string\"}\n",
"access_key = ''#@param {type:\"string\"}\n",
"secret_key = ''#@param {type:\"string\"}\n",
"\n",
"Task.set_credentials(web_host=web_server,\n",
" api_host=api_server,\n",
" files_host=files_server,\n",
" key=access_key,\n",
" secret=secret_key\n",
" )\n",
" \n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Er3HUBty4m7i"
},
"source": [
"## Step 4: Run clearml-agent"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "QcczeU7OJ9G-"
},
"outputs": [],
"source": [
"!clearml-agent daemon --queue default"
]
}
],
"metadata": {
"colab": {
"authorship_tag": "ABX9TyMKeTOHrlQb98s9lDHxWFts",
"collapsed_sections": [],
"include_colab_link": true,
"name": "clearml_colab_agent.ipynb",
"provenance": [],
"toc_visible": true
},
"interpreter": {
"hash": "d75e902da2bbfe9f41879fcf2334f5819447e02a7f656a079df344fef4e78809"
},
"kernelspec": {
"display_name": "Python 3.7.12 64-bit ('.env')",
"name": "python3"
},
"language_info": {
"name": "python",
"version": ""
}
"cells": [
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "view-in-github"
},
"source": [
"<a href=\"https://colab.research.google.com/github/allegroai/clearml/blob/master/examples/clearml_agent/clearml_colab_agent.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
]
},
"nbformat": 4,
"nbformat_minor": 0
{
"cell_type": "markdown",
"metadata": {
"id": "mC3A4rio6Y--"
},
"source": [
"# Google Colab Used to Launch ClearML Agent\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "EPYjmFLy1zuC"
},
"source": [
"## Step 1: Install all necessary packages"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "DwFC3fL8JAP3"
},
"outputs": [],
"source": [
"# If you don't have ClearML and ClearML Agent installed then uncomment these line\n",
"# !pip install clearml\n",
"# !pip install clearml-agent"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "gb41kM8i1-7T"
},
"source": [
"## Step 2: Export this environment variable\n",
"\n",
"This environment variable makes Matplotlib work in headless mode, so it won't output graphs to the screen"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "3-Bm4811VMLK"
},
"outputs": [],
"source": [
"! export MPLBACKEND=TkAg"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Step 2b (OPTIONAL): Enter your github credentials (only for private repositories)\n",
"In order for the agent to pull your code, it needs access to your repositories. If these are private, you'll have to supply the agent with github credentials to log in. Github/Bitbucket will no longer let you log in using username/password combinations. Instead, you have to use a personal token, read more about it [here for Github](https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/creating-a-personal-access-token) and [here for Bitbucket](https://support.atlassian.com/bitbucket-cloud/docs/app-passwords/)\n",
"\n",
"We can let the agent know which credentials to use by setting the following env variables"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#@title Insert your Git Credentials\n",
"\n",
"import os\n",
"os.environ[\"CLEARML_AGENT_GIT_USER\"] = \"username-goes-here\" #@param {type:\"string\"}\n",
"os.environ[\"CLEARML_AGENT_GIT_PASS\"] = \"git-personal-access-token\" #@param {type:\"string\"}"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "chuUzBaU2NyB"
},
"source": [
"## Step 3: Create new credentials: \n",
"1. Go to your [ClearML WebApp **Settings**](https://app.clear.ml/settings/workspace-configuration). \n",
"1. Under the **WORKSPACES** section, go to **App Credentials**, and click **+ Create new credentials**\n",
"1. Copy your credentials"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "1Kloxwfj0Vnq"
},
"source": [
"## Step 4: Set your ClearML Credentials\n",
"\n",
"Insert the credentials you created in Step 3. \n",
"\n",
"If you aren't using the ClearML hosted server, make sure to modify the server variables. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "YBPdmP4sJHnQ"
},
"outputs": [],
"source": [
"#@title Insert your own Credentials\n",
"\n",
"from clearml import Task\n",
"\n",
"web_server = 'https://app.clear.ml'#@param {type:\"string\"} \n",
"api_server = 'https://api.clear.ml'#@param {type:\"string\"} \n",
"files_server = 'https://files.clear.ml'#@param {type:\"string\"}\n",
"access_key = ''#@param {type:\"string\"}\n",
"secret_key = ''#@param {type:\"string\"}\n",
"\n",
"Task.set_credentials(web_host=web_server,\n",
" api_host=api_server,\n",
" files_host=files_server,\n",
" key=access_key,\n",
" secret=secret_key\n",
" )\n",
" \n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Er3HUBty4m7i"
},
"source": [
"## Step 4: Run clearml-agent"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "QcczeU7OJ9G-"
},
"outputs": [],
"source": [
"!clearml-agent daemon --queue default"
]
}
],
"metadata": {
"colab": {
"authorship_tag": "ABX9TyMKeTOHrlQb98s9lDHxWFts",
"collapsed_sections": [],
"include_colab_link": true,
"name": "clearml_colab_agent.ipynb",
"provenance": [],
"toc_visible": true
},
"interpreter": {
"hash": "d75e902da2bbfe9f41879fcf2334f5819447e02a7f656a079df344fef4e78809"
},
"kernelspec": {
"display_name": "Python 3.7.12 64-bit ('.env')",
"name": "python3"
},
"language_info": {
"name": "python",
"version": ""
}
},
"nbformat": 4,
"nbformat_minor": 0
}