From 4c145fbefd58188424c0640b3f571b2c532dcaa2 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sun, 6 Mar 2022 19:05:26 +0200 Subject: [PATCH] Add new pipeline visualization support (requires ClearML Server v1.3) --- clearml/automation/controller.py | 673 +++++++++++++++------ clearml/backend_interface/task/populate.py | 14 +- clearml/backend_interface/util.py | 37 +- clearml/cli/task/__main__.py | 5 + clearml/utilities/proxy_object.py | 19 + 5 files changed, 561 insertions(+), 187 deletions(-) diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index adb10115..038ff86d 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -8,6 +8,7 @@ from copy import copy, deepcopy from datetime import datetime from logging import getLogger from multiprocessing import Process, Queue +from multiprocessing.pool import ThreadPool from threading import Thread, Event, RLock from time import time from typing import Sequence, Optional, Mapping, Callable, Any, List, Dict, Union, Tuple @@ -15,17 +16,19 @@ from typing import Sequence, Optional, Mapping, Callable, Any, List, Dict, Union from attr import attrib, attrs from pathlib2 import Path -from .job import LocalClearmlJob, RunningJob +from .job import LocalClearmlJob, RunningJob, BaseJob from .. import Logger from ..automation import ClearmlJob +from ..backend_api import Session from ..backend_interface.task.populate import CreateFromFunction from ..backend_interface.util import get_or_create_project, exact_match_regex from ..config import get_remote_task_id from ..debugging.log import LoggerRoot from ..model import BaseModel, OutputModel +from ..storage.util import hash_dict from ..task import Task from ..utilities.process.mp import leave_process -from ..utilities.proxy_object import LazyEvalWrapper, flatten_dictionary +from ..utilities.proxy_object import LazyEvalWrapper, flatten_dictionary, walk_nested_dict_tuple_list class PipelineController(object): @@ -36,10 +39,13 @@ class PipelineController(object): Notice: The pipeline controller lives as long as the pipeline itself is being executed. """ _tag = 'pipeline' + _project_system_tags = ['pipeline', 'hidden'] _node_tag_prefix = 'pipe:' _step_pattern = r"\${[^}]*}" _config_section = 'Pipeline' + _state_artifact_name = 'pipeline_state' _args_section = 'Args' + _pipeline_section = 'pipeline' _pipeline_step_ref = 'pipeline' _runtime_property_hash = '_pipeline_hash' _reserved_pipeline_names = (_pipeline_step_ref, ) @@ -50,6 +56,8 @@ class PipelineController(object): _report_plot_execution_flow = dict(title='Pipeline', series='Execution Flow') _report_plot_execution_details = dict(title='Pipeline Details', series='Execution Details') + valid_job_status = ["failed", "cached", "completed", "aborted", "queued", "running", "skipped", "pending"] + @attrs class Node(object): name = attrib(type=str) # pipeline step name @@ -62,8 +70,13 @@ class PipelineController(object): configurations = attrib(type=dict, default={}) # Task configuration objects to change task_overrides = attrib(type=dict, default={}) # Task overrides to change executed = attrib(type=str, default=None) # The actual executed Task ID (None if not executed yet) + status = attrib(type=str, default="pending") # The Node Task status (cached, aborted, etc.) clone_task = attrib(type=bool, default=True) # If True cline the base_task_id, then execute the cloned Task job = attrib(type=ClearmlJob, default=None) # ClearMLJob object + job_type = attrib(type=str, default=None) # task type (string) + job_started = attrib(type=float, default=None) # job startup timestamp (epoch ts in seconds) + job_ended = attrib(type=float, default=None) # job startup timestamp (epoch ts in seconds) + job_code_section = attrib(type=str, default=None) # pipeline code configuration section name skip_job = attrib(type=bool, default=False) # if True, this step should be skipped continue_on_fail = attrib(type=bool, default=False) # if True, the pipeline continues even if the step failed cache_executed_step = attrib(type=bool, default=False) # if True this pipeline step should be cached @@ -93,9 +106,10 @@ class PipelineController(object): version, # type: str pool_frequency=0.2, # type: float add_pipeline_tags=False, # type: bool - target_project=None, # type: Optional[str] + target_project=True, # type: Optional[Union[str, bool]] auto_version_bump=True, # type: bool abort_on_failure=False, # type: bool + add_run_number=True, # type: bool ): # type: (...) -> None """ @@ -108,7 +122,8 @@ class PipelineController(object): :param float pool_frequency: The pooling frequency (in minutes) for monitoring experiments / states. :param bool add_pipeline_tags: (default: False) if True, add `pipe: ` tag to all steps (Tasks) created by this pipeline. - :param str target_project: If provided, all pipeline steps are cloned into the target project + :param str target_project: If provided, all pipeline steps are cloned into the target project. + If True pipeline steps are stored into the pipeline project :param bool auto_version_bump: If True (default), if the same pipeline version already exists (with any difference from the current one), the current pipeline version will be bumped to a new version version bump examples: 1.0.0 -> 1.0.1 , 1.2 -> 1.3, 10 -> 11 etc. @@ -118,6 +133,8 @@ class PipelineController(object): was specifically defined with "continue_on_fail=True". If True, any failed step will cause the pipeline to immediately abort, stop all running steps, and mark the pipeline as failed. + :param add_run_number: If True (default), add the run number of the pipeline to the pipeline name. + Example, the second time we launch the pipeline "best pipeline", we rename it to "best pipeline #2" """ self._nodes = {} self._running_nodes = [] @@ -139,7 +156,7 @@ class PipelineController(object): self._experiment_completed_cb = None self._pre_step_callbacks = {} self._post_step_callbacks = {} - self._target_project = target_project or '' + self._target_project = target_project self._add_pipeline_tags = add_pipeline_tags self._task = Task.current_task() self._step_ref_pattern = re.compile(self._step_pattern) @@ -147,25 +164,54 @@ class PipelineController(object): self._pipeline_task_status_failed = None self._auto_version_bump = bool(auto_version_bump) self._mock_execution = False # used for nested pipelines (eager execution) + self._pipeline_as_sub_project = bool(Session.check_min_api_server_version("2.17")) if not self._task: + task_name = name or project or '{}'.format(datetime.now()) + if self._pipeline_as_sub_project: + parent_project = "{}.pipelines".format(project+'/' if project else '') + project_name = "{}/{}".format(parent_project, task_name) + else: + parent_project = None + project_name = project or 'Pipelines' + self._task = Task.init( - project_name=project or 'Pipelines', - task_name=name or 'Pipeline {}'.format(datetime.now()), + project_name=project_name, + task_name=task_name, task_type=Task.TaskTypes.controller, auto_resource_monitoring=False, reuse_last_task_id=False ) + # make sure project is hidden + if self._pipeline_as_sub_project: + get_or_create_project( + self._task.session, project_name=parent_project, system_tags=["hidden"]) + get_or_create_project( + self._task.session, project_name=project_name, + project_id=self._task.project, system_tags=self._project_system_tags) + self._task.set_system_tags((self._task.get_system_tags() or []) + [self._tag]) self._task.set_user_properties(version=self._version) - self._auto_connect_task = bool(self._task) # make sure we add to the main Task the pipeline tag - if self._task: + if self._task and not self._pipeline_as_sub_project: self._task.add_tags([self._tag]) self._monitored_nodes = {} # type: Dict[str, dict] self._abort_running_steps_on_failure = abort_on_failure + # add direct link to the pipeline page + if self._pipeline_as_sub_project and self._task: + if add_run_number and self._task.running_locally(): + self._add_pipeline_name_run_number() + # noinspection PyProtectedMember + self._task.get_logger().report_text('ClearML pipeline page: {}'.format( + '{}/pipelines/{}/experiments/{}'.format( + self._task._get_app_server(), + self._task.project if self._task.project is not None else '*', + self._task.id, + )) + ) + def set_default_execution_queue(self, default_execution_queue): # type: (Optional[str]) -> None """ @@ -400,6 +446,8 @@ class PipelineController(object): project_name=None, # type: Optional[str] task_name=None, # type: Optional[str] task_type=None, # type: Optional[str] + auto_connect_frameworks=None, # type: Optional[dict] + auto_connect_arg_parser=None, # type: Optional[dict] packages=None, # type: Optional[Union[str, Sequence[str]]] repo=None, # type: Optional[str] repo_branch=None, # type: Optional[str] @@ -457,6 +505,8 @@ class PipelineController(object): :param task_name: Set the name of the remote task. Required if base_task_id is None. :param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference', 'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom' + :param auto_connect_frameworks: Control the frameworks auto connect, see `Task.init` auto_connect_frameworks + :param auto_connect_arg_parser: Control the ArgParser auto connect, see `Task.init` auto_connect_arg_parser :param packages: Manually specify a list of required packages or a local requirements.txt file. Example: ["tqdm>=2.1", "scikit-learn"] or "./requirements.txt" If not provided, packages are automatically added based on the imports used in the function. @@ -574,22 +624,25 @@ class PipelineController(object): for k, v in function_input_artifacts.items()} ) + job_code_section = None if self._mock_execution: - project_name = project_name or self._target_project or self._task.get_project_name() + project_name = project_name or self._get_target_project() or self._task.get_project_name() task_definition = self._create_task_from_function( docker, docker_args, docker_bash_setup_script, function, - function_input_artifacts, function_kwargs, - function_return, packages, project_name, task_name, + function_input_artifacts, function_kwargs, function_return, + auto_connect_frameworks, auto_connect_arg_parser, + packages, project_name, task_name, task_type, repo, repo_branch, repo_commit, helper_functions) elif self._task.running_locally(): - project_name = project_name or self._target_project or self._task.get_project_name() + project_name = project_name or self._get_target_project() or self._task.get_project_name() task_definition = self._create_task_from_function( docker, docker_args, docker_bash_setup_script, function, - function_input_artifacts, function_kwargs, - function_return, packages, project_name, task_name, + function_input_artifacts, function_kwargs, function_return, + auto_connect_frameworks, auto_connect_arg_parser, + packages, project_name, task_name, task_type, repo, repo_branch, repo_commit, helper_functions) # update configuration with the task definitions # noinspection PyProtectedMember @@ -597,10 +650,12 @@ class PipelineController(object): name=name, config_type='json', config_text=json.dumps(task_definition, indent=1) ) + job_code_section = name else: # load task definition from configuration # noinspection PyProtectedMember - task_definition = json.loads(self._task._get_configuration_text(name=name)) + config_text = self._task._get_configuration_text(name=name) + task_definition = json.loads(config_text) if config_text else dict() def _create_task(_): a_task = Task.create( @@ -624,6 +679,7 @@ class PipelineController(object): monitor_artifacts=monitor_artifacts, monitor_metrics=monitor_metrics, monitor_models=monitor_models, + job_code_section=job_code_section, ) if self._task and not self._task.running_locally() and not self._mock_execution: @@ -736,7 +792,11 @@ class PipelineController(object): def create_draft(self): # type: () -> None """ - Optional, manually create & serialize the Pipeline Task. + Optional, manually create & serialize the Pipeline Task (use with care for manual multi pipeline creation). + + **Notice** The recommended flow would be to call `pipeline.start(queue=None)` + which would have a similar effect and will allow you to clone/enqueue later on. + After calling Pipeline.create(), users can edit the pipeline in the UI and enqueue it for execution. Notice: this function should be used to programmatically create pipeline for later usage. @@ -866,6 +926,9 @@ class PipelineController(object): if not self._task: return + # sync pipeline state + self.update_execution_plot() + self._task.close() if mark_failed: self._task.mark_failed(status_reason='Pipeline aborted and failed', force=True) @@ -1014,6 +1077,7 @@ class PipelineController(object): def _create_task_from_function( self, docker, docker_args, docker_bash_setup_script, function, function_input_artifacts, function_kwargs, function_return, + auto_connect_frameworks, auto_connect_arg_parser, packages, project_name, task_name, task_type, repo, branch, commit, helper_functions ): task_definition = CreateFromFunction.create_task_from_function( @@ -1024,6 +1088,8 @@ class PipelineController(object): project_name=project_name, task_name=task_name, task_type=task_type, + auto_connect_frameworks=auto_connect_frameworks, + auto_connect_arg_parser=auto_connect_arg_parser, repo=repo, branch=branch, commit=commit, @@ -1107,15 +1173,17 @@ class PipelineController(object): params, pipeline_dag = self._serialize_pipeline_task() # deserialize back pipeline state - if not params['_continue_pipeline_']: + if not params['continue_pipeline']: for k in pipeline_dag: pipeline_dag[k]['executed'] = None - self._default_execution_queue = params['_default_queue_'] - self._add_pipeline_tags = params['_add_pipeline_tags_'] - self._target_project = params['_target_project_'] or '' + pipeline_dag[k]['job_started'] = None + pipeline_dag[k]['job_ended'] = None + self._default_execution_queue = params['default_queue'] + self._add_pipeline_tags = params['add_pipeline_tags'] + self._target_project = params['target_project'] or '' self._deserialize(pipeline_dag) # if we continue the pipeline, make sure that we re-execute failed tasks - if params['_continue_pipeline_']: + if params['continue_pipeline']: for node in list(self._nodes.values()): if node.executed is False: node.executed = None @@ -1136,9 +1204,9 @@ class PipelineController(object): :return: params, pipeline_dag """ params = { - '_default_queue_': self._default_execution_queue, - '_add_pipeline_tags_': self._add_pipeline_tags, - '_target_project_': self._target_project, + 'default_queue': self._default_execution_queue, + 'add_pipeline_tags': self._add_pipeline_tags, + 'target_project': self._target_project, } pipeline_dag = self._serialize() @@ -1149,15 +1217,16 @@ class PipelineController(object): self._task._set_configuration( name=self._config_section, config_type='dictionary', config_text=json.dumps(pipeline_dag, indent=2)) - params.update(flatten_dictionary(self._pipeline_args)) + pipeline_args = flatten_dictionary(self._pipeline_args) # noinspection PyProtectedMember self._task._set_parameters( - {'{}/{}'.format(self._args_section, k): v for k, v in params.items()}, + {'{}/{}'.format(self._args_section, k): v for k, v in pipeline_args.items()}, __parameters_descriptions=self._pipeline_args_desc, __parameters_types=self._pipeline_args_type, __update=True, ) - params['_continue_pipeline_'] = False + self._task.connect(params, name=self._pipeline_section) + params['continue_pipeline'] = False # make sure we have a unique version number (auto bump version if needed) # only needed when manually (from code) creating pipelines @@ -1173,10 +1242,10 @@ class PipelineController(object): else: self._task.connect_configuration(pipeline_dag, name=self._config_section) self._task.connect(self._pipeline_args, name=self._args_section) - self._task.connect(params, name=self._args_section) + self._task.connect(params, name=self._pipeline_section) # noinspection PyProtectedMember if self._task._get_runtime_properties().get(self._runtime_property_hash): - params['_continue_pipeline_'] = True + params['continue_pipeline'] = True else: # noinspection PyProtectedMember pipeline_hash = ClearmlJob._create_task_hash(self._task) @@ -1184,7 +1253,7 @@ class PipelineController(object): self._task._set_runtime_properties({ self._runtime_property_hash: "{}:{}".format(pipeline_hash, self._version), }) - params['_continue_pipeline_'] = False + params['continue_pipeline'] = False return params, pipeline_dag @@ -1225,6 +1294,9 @@ class PipelineController(object): found_match_version = False existing_versions = set([self._version]) # noqa for t in existing_tasks: + # exclude ourselves + if t.id == self._task.id: + continue if not t.hyperparams: continue v = t.hyperparams.get('properties', {}).get('version') @@ -1250,15 +1322,37 @@ class PipelineController(object): break getLogger('clearml.automation.controller').info( - 'Existing Pipeline version found, bump new version to: {}'.format(self._version)) + 'No matching Pipelines found, bump new version to: {}'.format(self._version)) self._task.set_user_properties(version=self._version) def _get_task_hash(self): params_override = dict(**(self._task.get_parameters() or {})) params_override.pop('properties/version', None) + # dag state without status / states + nodes_items = list(self._nodes.items()) + dag = { + name: { + k: v for k, v in node.__dict__.items() + if k not in ( + 'job', 'name', 'task_factory_func', 'executed', 'status', + 'job_started', 'job_ended', 'skip_job' + ) + } + for name, node in nodes_items + } + + # get all configurations (as dict of strings for hashing) + configurations_override = dict(**self._task.get_configuration_objects()) + # store as text so we can hash it later + configurations_override[self._config_section] = json.dumps(dag) + # noinspection PyProtectedMember - pipeline_hash = ClearmlJob._create_task_hash(self._task, params_override=params_override) + pipeline_hash = ClearmlJob._create_task_hash( + self._task, + params_override=params_override, + configurations_override=configurations_override, + ) return pipeline_hash def _serialize(self): @@ -1268,9 +1362,13 @@ class PipelineController(object): This dictionary will be used to store the DAG as a configuration on the Task :return: """ + nodes_items = list(self._nodes.items()) dag = {name: dict((k, v) for k, v in node.__dict__.items() if k not in ('job', 'name', 'task_factory_func')) - for name, node in list(self._nodes.items())} + for name, node in nodes_items} + # update state for presentation only + for name, node in nodes_items: + dag[name]['job_id'] = node.executed or (node.job.task_id() if node.job else None) return dag @@ -1285,12 +1383,13 @@ class PipelineController(object): # if we do not clone the Task, only merge the parts we can override. for name in list(self._nodes.keys()): if not self._nodes[name].clone_task and name in dag_dict and not dag_dict[name].get('clone_task'): - for k in ('queue', 'parents', 'timeout', 'parameters', 'configurations', 'task_overrides'): + for k in ('queue', 'parents', 'timeout', 'parameters', 'configurations', 'task_overrides', + 'executed', 'job_started', 'job_ended'): setattr(self._nodes[name], k, dag_dict[name].get(k) or type(getattr(self._nodes[name], k))()) # if we do clone the Task deserialize everything, except the function creating self._nodes = { - k: self.Node(name=k, **v) + k: self.Node(name=k, **{kk: vv for kk, vv in v.items() if kk not in ('job_id', )}) if k not in self._nodes or (v.get('base_task_id') and v.get('clone_task')) else self._nodes[k] for k, v in dag_dict.items()} @@ -1436,9 +1535,18 @@ class PipelineController(object): :param node: Node to launch :return: Return True if a new job was launched """ + # clear state if we are creating a new job + if not node.job: + node.job_started = None + node.job_ended = None + node.job_type = None + if node.job or node.executed: + print('Skipping cached/executed step [{}]'.format(node.name)) return False + print('Launching step [{}]'.format(node.name)) + updated_hyper_parameters = {} for k, v in node.parameters.items(): updated_hyper_parameters[k] = self._parse_step_ref(v) @@ -1446,10 +1554,10 @@ class PipelineController(object): task_overrides = self._parse_task_overrides(node.task_overrides) if node.task_overrides else None extra_args = dict() - if self._target_project: - extra_args['project'] = get_or_create_project( - session=self._task.session if self._task else Task.default_session, - project_name=self._target_project) + extra_args['project'] = self._get_target_project(return_project_id=True) or None + # set Task name to match job name + if self._pipeline_as_sub_project: + extra_args['name'] = node.name skip_node = None if self._pre_step_callbacks.get(node.name): @@ -1485,6 +1593,10 @@ class PipelineController(object): self._pipeline_task_status_failed = True raise + node.job_started = time() + node.job_ended = None + node.job_type = str(node.job.task.task_type) + if self._experiment_created_cb: skip_node = self._experiment_created_cb(self, node, updated_hyper_parameters) @@ -1510,10 +1622,19 @@ class PipelineController(object): # type: () -> () """ Update sankey diagram of the current pipeline + Also update the controller Task artifact storing the DAG state (with all the nodes states) """ if not self._task: return + nodes = list(self._nodes.values()) + # update status + for n in nodes: + self._update_node_status(n) + + # update the configuration state, so that the UI is presents the correct state + self._force_task_configuration_update() + sankey_node = dict( label=[], color=[], @@ -1530,7 +1651,7 @@ class PipelineController(object): ) visited = [] node_params = [] - nodes = list(self._nodes.values()) + # update colors while nodes: next_nodes = [] for node in nodes: @@ -1648,14 +1769,14 @@ class PipelineController(object): [step_name, self.__create_task_link(self._nodes[name], task_link_template), self._nodes[name].job.task.name if self._nodes[name].job else '', - self.__get_node_status(self._nodes[name]), + str(self._nodes[name].status or ""), param_str] ) return table_values - @staticmethod - def _get_node_color(node): + @classmethod + def _get_node_color(cls, node): # type (self.Mode) -> str """ Return the node color based on the node/job state @@ -1665,37 +1786,89 @@ class PipelineController(object): if not node: return "" + color_lookup = { + "failed": "red", + "cached": "darkslateblue", + "completed": "blue", + "aborted": "royalblue", + "queued": "#bdf5bd", + "running": "green", + "skipped": "gray", + "pending": "lightsteelblue", + } + return color_lookup.get(node.status, "") + + @classmethod + def _update_node_status(cls, node): + # type (self.Mode) -> () + """ + Update the node status entry based on the node/job state + :param node: A node in the pipeline + """ + if not node: + return + + # update job ended: + update_job_ended = node.job_started and not node.job_ended + + # refresh status + if node.job and isinstance(node.job, BaseJob): + node.job.status(force=True) + if node.executed is not None: if node.job and node.job.is_failed(): - return "red" # failed job + # failed job + node.status = "failed" elif node.job and node.job.is_cached_task(): - return "darkslateblue" + # cached job + node.status = "cached" elif not node.job or node.job.is_completed(): - return "blue" # completed job + # completed job + node.status = "completed" else: - return "royalblue" # aborted job + # aborted job + node.status = "aborted" elif node.job: if node.job.is_pending(): - return "#bdf5bd" # lightgreen, pending in queue + # lightgreen, pending in queue + node.status = "queued" elif node.job.is_completed(): - return "blue" # completed job + # completed job + node.status = "completed" elif node.job.is_failed(): - return "red" # failed job + # failed job + node.status = "failed" elif node.job.is_stopped(): - return "royalblue" # aborted job + # aborted job + node.status = "aborted" else: - return "green" # running job + node.status = "running" elif node.skip_job: - return "gray" # skipped job + node.status = "skipped" else: - return "lightsteelblue" # pending job + node.status = "pending" + + if update_job_ended and node.status in ("aborted", "failed", "completed"): + node.job_ended = time() + + assert node.status in cls.valid_job_status + + def _update_dag_state_artifact(self): + # type: () -> () + pipeline_dag = self._serialize() + self._task.upload_artifact( + name=self._state_artifact_name, artifact_object='', + metadata=dict(pipeline=hash_dict(pipeline_dag)), + preview=json.dumps(pipeline_dag, indent=1)) def _force_task_configuration_update(self): + # type: () -> () pipeline_dag = self._serialize() if self._task: # noinspection PyProtectedMember self._task._set_configuration( name=self._config_section, config_type='dictionary', + description="pipeline state: {}".format(hash_dict(pipeline_dag)), config_text=json.dumps(pipeline_dag, indent=2)) def _daemon(self): @@ -1704,6 +1877,7 @@ class PipelineController(object): The main pipeline execution loop. This loop is executed on its own dedicated thread. :return: """ + launch_thread_pool = ThreadPool(16) pooling_counter = 0 launched_nodes = set() last_monitor_report = last_plot_report = time() @@ -1791,8 +1965,11 @@ class PipelineController(object): next_nodes.append(node.name) # update the execution graph - for name in next_nodes: - if self._launch_node(self._nodes[name]) and not self._nodes[name].skip_job: + print('Launching the next {} steps'.format(len(next_nodes))) + node_launch_success = launch_thread_pool.map( + self._launch_node, [self._nodes[name] for name in next_nodes]) + for name, success in zip(next_nodes, node_launch_success): + if success and not self._nodes[name].skip_job: print('Launching step: {}'.format(name)) print('Parameters:\n{}'.format( self._nodes[name].job.task_parameter_override if self._nodes[name].job @@ -1808,8 +1985,6 @@ class PipelineController(object): 'Skipping launching step \'{}\': {}'.format(name, self._nodes[name])) # update current state (in configuration, so that we could later continue an aborted pipeline) - self._force_task_configuration_update() - # visualize pipeline state (plot) self.update_execution_plot() @@ -1977,6 +2152,79 @@ class PipelineController(object): if node.job.is_stopped(): self._monitored_nodes[node.name]['completed'] = True + def _get_target_project(self, return_project_id=False): + # type: (bool) -> str + """ + return the pipeline components target folder name/id + + :param return_project_id: if False (default) return target folder name, if True return project id + :return: project id/name (None if not valid) + """ + if not self._target_project: + return '' + + if str(self._target_project).lower().strip() == 'true': + if not self._task: + return '' + return self._task.project if return_project_id else self._task.get_project_name() + + if not return_project_id: + return self._target_project + + return get_or_create_project( + session=self._task.session if self._task else Task.default_session, + project_name=self._target_project) + + def _add_pipeline_name_run_number(self): + # type: () -> None + if not self._task: + return + # if we were already executed, do not rename (meaning aborted pipeline that was continued) + # noinspection PyProtectedMember + if self._task._get_runtime_properties().get(self._runtime_property_hash): + return + + # remove the # suffix if we have one: + task_name = re.compile(r" #\d+$").split(self._task.name or "", 1)[0] + page_size = 100 + # find exact name or " #" extension + prev_pipelines_ids = self._task.query_tasks( + task_name=r"^{}(| #\d+)$".format(task_name), + task_filter=dict( + project=[self._task.project], system_tags=[self._tag], + order_by=['-created'], + page_size=page_size, + ) + ) + max_value = len(prev_pipelines_ids) if prev_pipelines_ids else 0 + # we hit the limit + if max_value == page_size: + # make sure that if we get something wrong we do not stop the pipeline, + # worst case fail to auto increment + try: + # we assume we are the latest so let's take a few (last 10) and check the max number + last_task_name = self._task.query_tasks( + task_filter=dict(task_ids=prev_pipelines_ids[:10], project=[self._task.project]), + additional_return_fields=['name'], + ) # type: List[Dict] + # let's parse the names + pattern = re.compile(r" #(?P\d+)$") + task_parts = [pattern.split(t.get('name') or "", 1) for t in last_task_name] + # find the highest number + for parts in task_parts: + if len(parts) >= 2: + try: + max_value = max(max_value, int(parts[1])+1) + except (TypeError, ValueError): + pass + except Exception as ex: + getLogger('clearml.automation.controller').warning( + 'Pipeline auto run increment failed (skipping): {}'.format(ex)) + max_value = 0 + + if max_value > 1: + self._task.set_name(task_name + " #{}".format(max_value)) + @classmethod def _get_pipeline_task(cls): # type: () -> Task @@ -2151,22 +2399,6 @@ class PipelineController(object): return None - @classmethod - def __get_node_status(cls, a_node): - # type: (PipelineController.Node) -> str - if not a_node: - return "pending" - if a_node.skip_job: - return "skipped" - if a_node.job and a_node.job.is_cached_task(): - return "cached" - if a_node.job and a_node.job.task: - # no need to refresh status - return str(a_node.job.task.data.status) - if a_node.executed: - return "executed" - return "pending" - @classmethod def __create_task_link(cls, a_node, task_link_template): # type: (PipelineController.Node, str) -> str @@ -2197,6 +2429,7 @@ class PipelineController(object): class PipelineDecorator(PipelineController): _added_decorator = [] # type: List[dict] + _ref_lazy_loader_id_to_node_name = {} # type: dict _singleton = None # type: Optional[PipelineDecorator] _eager_step_artifact = 'eager_step' _eager_execution_instance = False @@ -2216,6 +2449,7 @@ class PipelineDecorator(PipelineController): add_pipeline_tags=False, # type: bool target_project=None, # type: Optional[str] abort_on_failure=False, # type: bool + add_run_number=True, # type: bool ): # type: (...) -> () """ @@ -2235,6 +2469,8 @@ class PipelineDecorator(PipelineController): was specifically defined with "continue_on_fail=True". If True, any failed step will cause the pipeline to immediately abort, stop all running steps, and mark the pipeline as failed. + :param add_run_number: If True (default), add the run number of the pipeline to the pipeline name. + Example, the second time we launch the pipeline "best pipeline", we rename it to "best pipeline #2" """ super(PipelineDecorator, self).__init__( name=name, @@ -2244,6 +2480,7 @@ class PipelineDecorator(PipelineController): add_pipeline_tags=add_pipeline_tags, target_project=target_project, abort_on_failure=abort_on_failure, + add_run_number=add_run_number, ) # if we are in eager execution, make sure parent class knows it @@ -2259,6 +2496,8 @@ class PipelineDecorator(PipelineController): self._added_decorator.clear() PipelineDecorator._singleton = self self._reference_callback = [] + # store launched nodes, in case we call the same function multiple times, and need renaming: + self._launched_step_names = set() # map eager steps task id to the new step name self._eager_steps_task_id = {} # type: Dict[str, str] @@ -2388,8 +2627,9 @@ class PipelineDecorator(PipelineController): """ Update sankey diagram of the current pipeline """ - self._update_eager_generated_steps() - super(PipelineDecorator, self).update_execution_plot() + with self._reporting_lock: + self._update_eager_generated_steps() + super(PipelineDecorator, self).update_execution_plot() def _update_eager_generated_steps(self): # noinspection PyProtectedMember @@ -2451,11 +2691,12 @@ class PipelineDecorator(PipelineController): def _create_task_from_function( self, docker, docker_args, docker_bash_setup_script, function, function_input_artifacts, function_kwargs, function_return, + auto_connect_frameworks, auto_connect_arg_parser, packages, project_name, task_name, task_type, repo, branch, commit, helper_functions, ): def sanitize(function_source): - matched = re.match(r"[\s]*@PipelineDecorator.component[\s\\]*\(", function_source) + matched = re.match(r"[\s]*@[\w]*.component[\s\\]*\(", function_source) if matched: function_source = function_source[matched.span()[1]:] # find the last ")" @@ -2481,6 +2722,8 @@ class PipelineDecorator(PipelineController): project_name=project_name, task_name=task_name, task_type=task_type, + auto_connect_frameworks=auto_connect_frameworks, + auto_connect_arg_parser=auto_connect_arg_parser, repo=repo, branch=branch, commit=commit, @@ -2536,6 +2779,8 @@ class PipelineDecorator(PipelineController): docker_args=None, # type: Optional[str] docker_bash_setup_script=None, # type: Optional[str] task_type=None, # type: Optional[str] + auto_connect_frameworks=None, # type: Optional[dict] + auto_connect_arg_parser=None, # type: Optional[dict] repo=None, # type: Optional[str] repo_branch=None, # type: Optional[str] repo_commit=None, # type: Optional[str] @@ -2574,6 +2819,8 @@ class PipelineDecorator(PipelineController): setting up the Task's environment :param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference', 'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom' + :param auto_connect_frameworks: Control the frameworks auto connect, see `Task.init` auto_connect_frameworks + :param auto_connect_arg_parser: Control the ArgParser auto connect, see `Task.init` auto_connect_arg_parser :param repo: Optional, specify a repository to attach to the function, when remotely executing. Allow users to execute the function inside the specified repository, enabling them to load modules/script from the repository. Notice the execution work directory will be the repository root folder. @@ -2643,6 +2890,8 @@ class PipelineDecorator(PipelineController): docker=docker, docker_args=docker_args, docker_bash_setup_script=docker_bash_setup_script, + auto_connect_frameworks=auto_connect_frameworks, + auto_connect_arg_parser=auto_connect_arg_parser, task_type=task_type, repo=repo, repo_branch=repo_branch, @@ -2661,8 +2910,10 @@ class PipelineDecorator(PipelineController): @functools.wraps(func) def wrapper(*args, **kwargs): if cls._debug_execute_step_function: - args = [v._remoteref() if isinstance(v, LazyEvalWrapper) else v for v in args] - kwargs = {k: v._remoteref() if isinstance(v, LazyEvalWrapper) else v for k, v in kwargs.items()} + args = walk_nested_dict_tuple_list( + args, lambda x: v._remoteref() if isinstance(v, LazyEvalWrapper) else v) + kwargs = walk_nested_dict_tuple_list( + kwargs, lambda x: v._remoteref() if isinstance(v, LazyEvalWrapper) else v) func_return = [] @@ -2673,14 +2924,18 @@ class PipelineDecorator(PipelineController): return a_func_return if return_index is None else a_func_return[return_index] if len(function_return) == 1: - return LazyEvalWrapper( + ret_val = LazyEvalWrapper( callback=functools.partial(result_wrapper, func_return, None), remote_reference=functools.partial(result_wrapper, func_return, None)) + cls._ref_lazy_loader_id_to_node_name[id(ret_val)] = _name + return ret_val else: return_w = [LazyEvalWrapper( callback=functools.partial(result_wrapper, func_return, i), remote_reference=functools.partial(result_wrapper, func_return, i)) for i, _ in enumerate(function_return)] + for i in return_w: + cls._ref_lazy_loader_id_to_node_name[id(i)] = _name return return_w # resolve all lazy objects if we have any: @@ -2689,9 +2944,10 @@ class PipelineDecorator(PipelineController): kwargs[inspect_func.args[i]] = v kwargs_artifacts.update( - {k: v._remoteref() for k, v in kwargs.items() if isinstance(v, LazyEvalWrapper)} + {k: walk_nested_dict_tuple_list(v, lambda x: x._remoteref()) + for k, v in kwargs.items() if isinstance(v, LazyEvalWrapper)} ) - kwargs = {k: v for k, v in kwargs.items() if not isinstance(v, LazyEvalWrapper)} + kwargs = {k: deepcopy(v) for k, v in kwargs.items() if not isinstance(v, LazyEvalWrapper)} # check if we have the singleton if not cls._singleton: @@ -2722,15 +2978,14 @@ class PipelineDecorator(PipelineController): Task.current_task().set_tags(original_tags[0]) Task.current_task().set_system_tags(original_tags[1]) - # get original node name + # get node name _node_name = _name - # get node - _node = cls._singleton._nodes[_node_name] - # if we already have a JOB on the node, this means we are calling the same function/task - # twice inside the pipeline, this means we need to replicate the node. - if _node.job: - _node = _node.copy() + # check if we are launching the same node twice + if _node_name in cls._singleton._launched_step_names: + # if we already launched a JOB on the node, this means we are calling the same function/task + # twice inside the pipeline, this means we need to replicate the node. + _node = cls._singleton._nodes[_node_name].copy() # find a new name counter = 1 while _node.name in cls._singleton._nodes: @@ -2739,73 +2994,28 @@ class PipelineDecorator(PipelineController): _node_name = _node.name cls._singleton._nodes[_node.name] = _node - # update artifacts kwargs - for k, v in kwargs_artifacts.items(): - if k in kwargs: - kwargs.pop(k, None) - _node.parameters["{}/{}".format(CreateFromFunction.input_artifact_section, k)] = v - if v and '.' in str(v): - parent_id, _ = str(v).split('.', 1) - # find parent and push it into the _node.parents - for n, node in cls._singleton._nodes.items(): - if n != _node.name and node.executed and node.executed == parent_id: - if n not in _node.parents: - _node.parents.append(n) - break - for k, v in kwargs.items(): - if v is None or isinstance(v, (bool, int, float, str)): - _node.parameters["{}/{}".format(CreateFromFunction.kwargs_section, k)] = v - elif isinstance(v, (list, tuple)) and all(isinstance(i, (bool, int, float, str)) for i in v): - _node.parameters["{}/{}".format(CreateFromFunction.kwargs_section, k)] = v - else: - # we need to create an artifact - artifact_name = 'result_{}_{}'.format(re.sub(r'\W+', '', _node.name), k) - cls._singleton._task.upload_artifact( - name=artifact_name, artifact_object=v, wait_on_upload=True) - _node.parameters["{}/{}".format(CreateFromFunction.input_artifact_section, k)] = \ - "{}.{}".format(cls._singleton._task.id, artifact_name) - # now add all the executed nodes as parents (only the leaves of the DAG, no need for parents) - _node.parents = list( - set((_node.parents or []) + cls._singleton._find_executed_node_leaves()) - - set(list(_node.name))) + # get node and park is as launched + cls._singleton._launched_step_names.add(_node_name) + _node = cls._singleton._nodes[_node_name] - # verify the new step - cls._singleton._verify_node(_node) - # launch the new step - cls._singleton._launch_node(_node) - # check if we generated the pipeline we need to update the new eager step - if PipelineDecorator._eager_execution_instance and _node.job: - # check if we need to add the pipeline tag on the new node - pipeline_tags = [t for t in Task.current_task().get_tags() or [] - if str(t).startswith(cls._node_tag_prefix)] - if pipeline_tags and _node.job and _node.job.task: - pipeline_tags = list(set((_node.job.task.get_tags() or []) + pipeline_tags)) - _node.job.task.set_tags(pipeline_tags) - # force parent task as pipeline - _node.job.task._edit(parent=Task.current_task().parent) - # store the new generated node, so we can later serialize it - pipeline_dag = cls._singleton._serialize() - # check if node is cached - if _node.job.is_cached_task(): - pipeline_dag[_node_name]['is_cached'] = True - # store entire definition on the parent pipeline - from clearml.backend_api.services import tasks - artifact = tasks.Artifact( - key='{}:{}:{}'.format(cls._eager_step_artifact, Task.current_task().id, _node.job.task_id()), - type="json", - mode='output', - type_data=tasks.ArtifactTypeData( - preview=json.dumps({_node_name: pipeline_dag[_node_name]}), - content_type='application/pipeline') - ) - req = tasks.AddOrUpdateArtifactsRequest( - task=Task.current_task().parent, artifacts=[artifact], force=True) - res = Task.current_task().send(req, raise_on_errors=False) - if not res or not res.response or not res.response.updated: - pass + # The actual launch is a bit slow, we run it in the background + launch_thread = Thread( + target=cls._component_launch, + args=(_node_name, _node, kwargs_artifacts, kwargs)) def results_reference(return_name): + # wait until launch is completed + if launch_thread and launch_thread.is_alive(): + try: + launch_thread.join() + except: # noqa + pass # wait until job is completed + if not _node.job: + if not _node.executed: + raise ValueError("Job was not created and is also not cached/executed") + return "{}.{}".format(_node.executed, return_name) + _node.job.wait(pool_period=0.2) if _node.job.is_failed() and not _node.continue_on_fail: raise ValueError( @@ -2815,6 +3025,12 @@ class PipelineDecorator(PipelineController): return "{}.{}".format(_node.job.task_id(), return_name) def result_wrapper(return_name): + # wait until launch is completed + if launch_thread and launch_thread.is_alive(): + try: + launch_thread.join() + except: # noqa + pass # wait until job is completed _node.job.wait(pool_period=0.2) if _node.job.is_failed(): @@ -2825,8 +3041,13 @@ class PipelineDecorator(PipelineController): return Task.get_task(_node.job.task_id()).artifacts[return_name].get() return_w = [LazyEvalWrapper( - callback=functools.partial(result_wrapper, n), - remote_reference=functools.partial(results_reference, n)) for n in function_return] + callback=functools.partial(result_wrapper, n), + remote_reference=functools.partial(results_reference, n)) for n in function_return] + for i in return_w: + cls._ref_lazy_loader_id_to_node_name[id(i)] = _node_name + + # start the launch thread now + launch_thread.start() return return_w[0] if len(return_w) == 1 else return_w @@ -2848,7 +3069,8 @@ class PipelineDecorator(PipelineController): target_project=None, # type: Optional[str] abort_on_failure=False, # type: bool pipeline_execution_queue='services', # type: Optional[str] - multi_instance_support=False + multi_instance_support=False, # type: bool + add_run_number=True # type: bool ): # type: (...) -> Callable """ @@ -2880,6 +3102,8 @@ class PipelineDecorator(PipelineController): in the `parallel` case the function calls return None, to collect all pipeline results call `PipelineDecorator.wait_for_multi_pipelines()`. Default False, no multi instance pipeline support. + :param add_run_number: If True (default), add the run number of the pipeline to the pipeline name. + Example, the second time we launch the pipeline "best pipeline", we rename it to "best pipeline #2" """ def decorator_wrap(func): @@ -2907,8 +3131,19 @@ class PipelineDecorator(PipelineController): # run the entire pipeline locally, as python functions if cls._debug_execute_step_function: + a_pipeline = PipelineDecorator( + name=name, + project=project, + version=version, + pool_frequency=pool_frequency, + add_pipeline_tags=add_pipeline_tags, + target_project=target_project, + abort_on_failure=abort_on_failure, + add_run_number=add_run_number, + ) ret_val = func(**pipeline_kwargs) LazyEvalWrapper.trigger_all_remote_references() + a_pipeline._task.close() return ret_val # check if we are in a multi pipeline @@ -2945,6 +3180,7 @@ class PipelineDecorator(PipelineController): add_pipeline_tags=add_pipeline_tags, target_project=target_project, abort_on_failure=abort_on_failure, + add_run_number=add_run_number, ) if PipelineDecorator._debug_execute_step_process: @@ -3044,7 +3280,7 @@ class PipelineDecorator(PipelineController): def run_locally(cls): # type: () -> () """ - Set local mode, run all functions locally as subprocess or serially as functions + Set local mode, run all functions locally as subprocess Run the full pipeline DAG locally, where steps are executed as sub-processes Tasks Notice: running the DAG locally assumes the local code execution (i.e. it will not clone & apply git diff) @@ -3057,7 +3293,7 @@ class PipelineDecorator(PipelineController): def debug_pipeline(cls): # type: () -> () """ - Set debugging mode, run all functions locally as functions + Set debugging mode, run all functions locally as functions (serially) Run the full pipeline DAG locally, where steps are executed as functions Notice: running the DAG locally assumes the local code execution (i.e. it will not clone & apply git diff) @@ -3066,6 +3302,108 @@ class PipelineDecorator(PipelineController): cls._debug_execute_step_process = True cls._debug_execute_step_function = True + @classmethod + def get_current_pipeline(cls): + # type: () -> "PipelineDecorator" + """ + Return the currently running pipeline instance + """ + return cls._singleton + + @classmethod + def wait_for_multi_pipelines(cls): + # type () -> List[Any] + """ + Wait until all background multi pipeline execution is completed. + Returns all the pipeline results in call order (first pipeline call at index 0) + + :return: List of return values from executed pipeline, based on call order. + """ + return cls._wait_for_multi_pipelines() + + @classmethod + def _component_launch(cls, node_name, node, kwargs_artifacts, kwargs): + _node_name = node_name + _node = node + # update artifacts kwargs + for k, v in kwargs_artifacts.items(): + if k in kwargs: + kwargs.pop(k, None) + _node.parameters.pop("{}/{}".format(CreateFromFunction.kwargs_section, k), None) + _node.parameters["{}/{}".format(CreateFromFunction.input_artifact_section, k)] = v + if v and '.' in str(v): + parent_id, _ = str(v).split('.', 1) + # find parent and push it into the _node.parents + for n, node in list(cls._singleton._nodes.items()): + if n != _node.name and node.executed and node.executed == parent_id: + if n not in _node.parents: + _node.parents.append(n) + break + for k, v in kwargs.items(): + if v is None or isinstance(v, (bool, int, float, str)): + _node.parameters["{}/{}".format(CreateFromFunction.kwargs_section, k)] = v + elif isinstance(v, (list, tuple)) and all(isinstance(i, (bool, int, float, str)) for i in v): + _node.parameters["{}/{}".format(CreateFromFunction.kwargs_section, k)] = v + else: + # find parents if we have any + arg_parents = [] + if isinstance(v, (list, tuple, dict)): + walk_nested_dict_tuple_list( + v, + callback=lambda x: + not cls._ref_lazy_loader_id_to_node_name.get(id(x)) or + arg_parents.append(cls._ref_lazy_loader_id_to_node_name[id(x)]) + ) + + # we need to create an artifact + artifact_name = 'result_{}_{}'.format(re.sub(r'\W+', '', _node.name), k) + cls._singleton._task.upload_artifact( + name=artifact_name, artifact_object=v, wait_on_upload=True) + _node.parameters["{}/{}".format(CreateFromFunction.input_artifact_section, k)] = \ + "{}.{}".format(cls._singleton._task.id, artifact_name) + # now add all the executed nodes as parents (only the leaves of the DAG, no need for parents) + _node.parents = list( + set((_node.parents or []) + cls._singleton._find_executed_node_leaves() + arg_parents) + - set(list(_node.name))) + + # verify the new step + cls._singleton._verify_node(_node) + # launch the new step + cls._singleton._launch_node(_node) + # check if we generated the pipeline we need to update the new eager step + if PipelineDecorator._eager_execution_instance and _node.job: + # check if we need to add the pipeline tag on the new node + pipeline_tags = [t for t in Task.current_task().get_tags() or [] + if str(t).startswith(cls._node_tag_prefix)] + if pipeline_tags and _node.job and _node.job.task: + pipeline_tags = list(set((_node.job.task.get_tags() or []) + pipeline_tags)) + _node.job.task.set_tags(pipeline_tags) + # force parent task as pipeline + _node.job.task._edit(parent=Task.current_task().parent) + # store the new generated node, so we can later serialize it + pipeline_dag = cls._singleton._serialize() + # check if node is cached + if _node.job.is_cached_task(): + pipeline_dag[_node_name]['is_cached'] = True + # store entire definition on the parent pipeline + from clearml.backend_api.services import tasks + artifact = tasks.Artifact( + key='{}:{}:{}'.format(cls._eager_step_artifact, Task.current_task().id, _node.job.task_id()), + type="json", + mode='output', + type_data=tasks.ArtifactTypeData( + preview=json.dumps({_node_name: pipeline_dag[_node_name]}), + content_type='application/pipeline') + ) + req = tasks.AddOrUpdateArtifactsRequest( + task=Task.current_task().parent, artifacts=[artifact], force=True) + res = Task.current_task().send(req, raise_on_errors=False) + if not res or not res.response or not res.response.updated: + pass + + # update pipeline execution graph + cls._singleton.update_execution_plot() + @classmethod def _multi_pipeline_wrapper( cls, @@ -3146,25 +3484,6 @@ class PipelineDecorator(PipelineController): return internal_decorator - @classmethod - def get_current_pipeline(cls): - # type: () -> "PipelineDecorator" - """ - Return the currently running pipeline instance - """ - return cls._singleton - - @classmethod - def wait_for_multi_pipelines(cls): - # type () -> List[Any] - """ - Wait until all background multi pipeline execution is completed. - Returns all the pipeline results in call order (first pipeline call at index 0) - - :return: List of return values from executed pipeline, based on call order. - """ - return cls._wait_for_multi_pipelines() - @classmethod def _wait_for_multi_pipelines(cls): results = [] diff --git a/clearml/backend_interface/task/populate.py b/clearml/backend_interface/task/populate.py index a4e29b70..6ff92f29 100644 --- a/clearml/backend_interface/task/populate.py +++ b/clearml/backend_interface/task/populate.py @@ -480,7 +480,10 @@ from clearml.automation.controller import PipelineDecorator {function_source} if __name__ == '__main__': - task = Task.init() + task = Task.init( + auto_connect_frameworks={auto_connect_frameworks}, + auto_connect_arg_parser={auto_connect_arg_parser}, + ) kwargs = {function_kwargs} task.connect(kwargs, name='{kwargs_section}') function_input_artifacts = {function_input_artifacts} @@ -510,6 +513,8 @@ if __name__ == '__main__': project_name=None, # type: Optional[str] task_name=None, # type: Optional[str] task_type=None, # type: Optional[str] + auto_connect_frameworks=None, # type: Optional[dict] + auto_connect_arg_parser=None, # type: Optional[dict] repo=None, # type: Optional[str] branch=None, # type: Optional[str] commit=None, # type: Optional[str] @@ -561,6 +566,8 @@ if __name__ == '__main__': :param task_name: Set the name of the remote task. Required if base_task_id is None. :param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference', 'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom' + :param auto_connect_frameworks: Control the frameworks auto connect, see `Task.init` auto_connect_frameworks + :param auto_connect_arg_parser: Control the ArgParser auto connect, see `Task.init` auto_connect_arg_parser :param repo: Remote URL for the repository to use, OR path to local copy of the git repository Example: 'https://github.com/allegroai/clearml.git' or '~/project/repo' :param branch: Select specific repository branch/tag (implies the latest commit from the branch) @@ -582,6 +589,9 @@ if __name__ == '__main__': :param _sanitize_helper_functions: Sanitization function for the helper function string. :return: Newly created Task object """ + assert (not auto_connect_frameworks or isinstance(auto_connect_frameworks, (bool, dict))) + assert (not auto_connect_arg_parser or isinstance(auto_connect_arg_parser, (bool, dict))) + function_name = str(a_function.__name__) function_source = inspect.getsource(a_function) if _sanitize_function: @@ -635,6 +645,8 @@ if __name__ == '__main__': if inspect_args.annotations[k] in supported_types} task_template = cls.task_template.format( + auto_connect_frameworks=auto_connect_frameworks, + auto_connect_arg_parser=auto_connect_arg_parser, kwargs_section=cls.kwargs_section, input_artifact_section=cls.input_artifact_section, function_source=function_source, diff --git a/clearml/backend_interface/util.py b/clearml/backend_interface/util.py index 30066568..c0945645 100644 --- a/clearml/backend_interface/util.py +++ b/clearml/backend_interface/util.py @@ -54,7 +54,8 @@ def make_message(s, **kwargs): def get_existing_project(session, project_name): """Return either the project ID if it exists, an empty string if it doesn't or None if backend request failed.""" - res = session.send(projects.GetAllRequest(name=exact_match_regex(project_name), only_fields=['id'])) + res = session.send(projects.GetAllRequest( + name=exact_match_regex(project_name), only_fields=['id'], search_hidden=True, _allow_extra_fields_=True)) if not res: return None if res.response and res.response.projects: @@ -62,18 +63,36 @@ def get_existing_project(session, project_name): return "" -def get_or_create_project(session, project_name, description=None): +def get_or_create_project(session, project_name, description=None, system_tags=None, project_id=None): """Return the ID of an existing project, or if it does not exist, make a new one and return that ID instead.""" - project_id = get_existing_project(session, project_name) + project_system_tags = [] + if not project_id: + res = session.send(projects.GetAllRequest( + name=exact_match_regex(project_name), + only_fields=['id', 'system_tags'] if system_tags else ['id'], + search_hidden=True, _allow_extra_fields_=True)) + + if res and res.response and res.response.projects: + project_id = res.response.projects[0].id + if system_tags: + project_system_tags = res.response.projects[0].system_tags + + if project_id and system_tags and (not project_system_tags or + set(project_system_tags) & set(system_tags) != set(system_tags)): + # set system_tags + session.send( + projects.UpdateRequest( + project=project_id, system_tags=list(set((project_system_tags or []) + system_tags)) + ) + ) + if project_id: return project_id - if project_id == "": - # Project was not found, so create a new one - res = session.send(projects.CreateRequest(name=project_name, description=description or '')) - return res.response.id - # This should only happen if backend response was None and so project_id is also None - return None + # Project was not found, so create a new one + res = session.send(projects.CreateRequest( + name=project_name, description=description or '', system_tags=system_tags)) + return res.response.id def get_queue_id(session, queue): diff --git a/clearml/cli/task/__main__.py b/clearml/cli/task/__main__.py index a47c9b15..4969bb9b 100644 --- a/clearml/cli/task/__main__.py +++ b/clearml/cli/task/__main__.py @@ -1,3 +1,4 @@ +import sys from argparse import ArgumentParser from pathlib2 import Path @@ -80,6 +81,10 @@ def cli(): # get the args args = parser.parse_args() + if len(sys.argv) < 2: + parser.print_help() + exit(0) + if args.version: print('Version {}'.format(__version__)) exit(0) diff --git a/clearml/utilities/proxy_object.py b/clearml/utilities/proxy_object.py index dcdb7e91..c2e3d87b 100644 --- a/clearml/utilities/proxy_object.py +++ b/clearml/utilities/proxy_object.py @@ -211,6 +211,25 @@ def naive_nested_from_flat_dictionary(flat_dict, sep='/'): } +def walk_nested_dict_tuple_list(dict_list_tuple, callback): + nested = (dict, tuple, list) + if not isinstance(dict_list_tuple, nested): + return callback(dict_list_tuple) + + if isinstance(dict_list_tuple, dict): + ret = {} + for k, v in dict_list_tuple.items(): + ret[k] = walk_nested_dict_tuple_list(v, callback=callback) if isinstance(v, nested) else callback(v) + else: + ret = [] + for v in dict_list_tuple: + ret.append(walk_nested_dict_tuple_list(v, callback=callback) if isinstance(v, nested) else callback(v)) + if isinstance(dict_list_tuple, tuple): + ret = tuple(dict_list_tuple) + + return ret + + class WrapperBase(type): # This metaclass is heavily inspired by the Object Proxying python recipe