Add new pipeline visualization support (requires ClearML Server v1.3)

This commit is contained in:
allegroai 2022-03-06 19:05:26 +02:00
parent ffdbef8391
commit 4c145fbefd
5 changed files with 561 additions and 187 deletions

File diff suppressed because it is too large Load Diff

View File

@ -480,7 +480,10 @@ from clearml.automation.controller import PipelineDecorator
{function_source} {function_source}
if __name__ == '__main__': if __name__ == '__main__':
task = Task.init() task = Task.init(
auto_connect_frameworks={auto_connect_frameworks},
auto_connect_arg_parser={auto_connect_arg_parser},
)
kwargs = {function_kwargs} kwargs = {function_kwargs}
task.connect(kwargs, name='{kwargs_section}') task.connect(kwargs, name='{kwargs_section}')
function_input_artifacts = {function_input_artifacts} function_input_artifacts = {function_input_artifacts}
@ -510,6 +513,8 @@ if __name__ == '__main__':
project_name=None, # type: Optional[str] project_name=None, # type: Optional[str]
task_name=None, # type: Optional[str] task_name=None, # type: Optional[str]
task_type=None, # type: Optional[str] task_type=None, # type: Optional[str]
auto_connect_frameworks=None, # type: Optional[dict]
auto_connect_arg_parser=None, # type: Optional[dict]
repo=None, # type: Optional[str] repo=None, # type: Optional[str]
branch=None, # type: Optional[str] branch=None, # type: Optional[str]
commit=None, # type: Optional[str] commit=None, # type: Optional[str]
@ -561,6 +566,8 @@ if __name__ == '__main__':
:param task_name: Set the name of the remote task. Required if base_task_id is None. :param task_name: Set the name of the remote task. Required if base_task_id is None.
:param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference', :param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference',
'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom' 'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom'
:param auto_connect_frameworks: Control the frameworks auto connect, see `Task.init` auto_connect_frameworks
:param auto_connect_arg_parser: Control the ArgParser auto connect, see `Task.init` auto_connect_arg_parser
:param repo: Remote URL for the repository to use, OR path to local copy of the git repository :param repo: Remote URL for the repository to use, OR path to local copy of the git repository
Example: 'https://github.com/allegroai/clearml.git' or '~/project/repo' Example: 'https://github.com/allegroai/clearml.git' or '~/project/repo'
:param branch: Select specific repository branch/tag (implies the latest commit from the branch) :param branch: Select specific repository branch/tag (implies the latest commit from the branch)
@ -582,6 +589,9 @@ if __name__ == '__main__':
:param _sanitize_helper_functions: Sanitization function for the helper function string. :param _sanitize_helper_functions: Sanitization function for the helper function string.
:return: Newly created Task object :return: Newly created Task object
""" """
assert (not auto_connect_frameworks or isinstance(auto_connect_frameworks, (bool, dict)))
assert (not auto_connect_arg_parser or isinstance(auto_connect_arg_parser, (bool, dict)))
function_name = str(a_function.__name__) function_name = str(a_function.__name__)
function_source = inspect.getsource(a_function) function_source = inspect.getsource(a_function)
if _sanitize_function: if _sanitize_function:
@ -635,6 +645,8 @@ if __name__ == '__main__':
if inspect_args.annotations[k] in supported_types} if inspect_args.annotations[k] in supported_types}
task_template = cls.task_template.format( task_template = cls.task_template.format(
auto_connect_frameworks=auto_connect_frameworks,
auto_connect_arg_parser=auto_connect_arg_parser,
kwargs_section=cls.kwargs_section, kwargs_section=cls.kwargs_section,
input_artifact_section=cls.input_artifact_section, input_artifact_section=cls.input_artifact_section,
function_source=function_source, function_source=function_source,

View File

@ -54,7 +54,8 @@ def make_message(s, **kwargs):
def get_existing_project(session, project_name): def get_existing_project(session, project_name):
"""Return either the project ID if it exists, an empty string if it doesn't or None if backend request failed.""" """Return either the project ID if it exists, an empty string if it doesn't or None if backend request failed."""
res = session.send(projects.GetAllRequest(name=exact_match_regex(project_name), only_fields=['id'])) res = session.send(projects.GetAllRequest(
name=exact_match_regex(project_name), only_fields=['id'], search_hidden=True, _allow_extra_fields_=True))
if not res: if not res:
return None return None
if res.response and res.response.projects: if res.response and res.response.projects:
@ -62,18 +63,36 @@ def get_existing_project(session, project_name):
return "" return ""
def get_or_create_project(session, project_name, description=None): def get_or_create_project(session, project_name, description=None, system_tags=None, project_id=None):
"""Return the ID of an existing project, or if it does not exist, make a new one and return that ID instead.""" """Return the ID of an existing project, or if it does not exist, make a new one and return that ID instead."""
project_id = get_existing_project(session, project_name) project_system_tags = []
if not project_id:
res = session.send(projects.GetAllRequest(
name=exact_match_regex(project_name),
only_fields=['id', 'system_tags'] if system_tags else ['id'],
search_hidden=True, _allow_extra_fields_=True))
if res and res.response and res.response.projects:
project_id = res.response.projects[0].id
if system_tags:
project_system_tags = res.response.projects[0].system_tags
if project_id and system_tags and (not project_system_tags or
set(project_system_tags) & set(system_tags) != set(system_tags)):
# set system_tags
session.send(
projects.UpdateRequest(
project=project_id, system_tags=list(set((project_system_tags or []) + system_tags))
)
)
if project_id: if project_id:
return project_id return project_id
if project_id == "":
# Project was not found, so create a new one
res = session.send(projects.CreateRequest(name=project_name, description=description or ''))
return res.response.id
# This should only happen if backend response was None and so project_id is also None # Project was not found, so create a new one
return None res = session.send(projects.CreateRequest(
name=project_name, description=description or '', system_tags=system_tags))
return res.response.id
def get_queue_id(session, queue): def get_queue_id(session, queue):

View File

@ -1,3 +1,4 @@
import sys
from argparse import ArgumentParser from argparse import ArgumentParser
from pathlib2 import Path from pathlib2 import Path
@ -80,6 +81,10 @@ def cli():
# get the args # get the args
args = parser.parse_args() args = parser.parse_args()
if len(sys.argv) < 2:
parser.print_help()
exit(0)
if args.version: if args.version:
print('Version {}'.format(__version__)) print('Version {}'.format(__version__))
exit(0) exit(0)

View File

@ -211,6 +211,25 @@ def naive_nested_from_flat_dictionary(flat_dict, sep='/'):
} }
def walk_nested_dict_tuple_list(dict_list_tuple, callback):
nested = (dict, tuple, list)
if not isinstance(dict_list_tuple, nested):
return callback(dict_list_tuple)
if isinstance(dict_list_tuple, dict):
ret = {}
for k, v in dict_list_tuple.items():
ret[k] = walk_nested_dict_tuple_list(v, callback=callback) if isinstance(v, nested) else callback(v)
else:
ret = []
for v in dict_list_tuple:
ret.append(walk_nested_dict_tuple_list(v, callback=callback) if isinstance(v, nested) else callback(v))
if isinstance(dict_list_tuple, tuple):
ret = tuple(dict_list_tuple)
return ret
class WrapperBase(type): class WrapperBase(type):
# This metaclass is heavily inspired by the Object Proxying python recipe # This metaclass is heavily inspired by the Object Proxying python recipe