From 0482e8eeb5b75df4c9d58b989821f2a583235156 Mon Sep 17 00:00:00 2001 From: clearml <> Date: Sun, 23 Feb 2025 14:25:40 +0200 Subject: [PATCH] Black formatting --- clearml/automation/controller.py | 1449 +++++++++++++++++------------- 1 file changed, 809 insertions(+), 640 deletions(-) diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index 5d7ecb13..c50d51af 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -42,26 +42,27 @@ class PipelineController(object): The pipeline process (task) itself can be executed manually or by the clearml-agent services queue. Notice: The pipeline controller lives as long as the pipeline itself is being executed. """ - _tag = 'pipeline' - _project_system_tags = ['pipeline', 'hidden'] - _node_tag_prefix = 'pipe:' + + _tag = "pipeline" + _project_system_tags = ["pipeline", "hidden"] + _node_tag_prefix = "pipe:" _step_pattern = r"\${[^}]*}" - _config_section = 'Pipeline' - _state_artifact_name = 'pipeline_state' - _args_section = 'Args' - _pipeline_section = 'pipeline' - _pipeline_step_ref = 'pipeline' - _runtime_property_hash = '_pipeline_hash' + _config_section = "Pipeline" + _state_artifact_name = "pipeline_state" + _args_section = "Args" + _pipeline_section = "pipeline" + _pipeline_step_ref = "pipeline" + _runtime_property_hash = "_pipeline_hash" _relaunch_status_message = "Relaunching pipeline step..." - _reserved_pipeline_names = (_pipeline_step_ref, ) + _reserved_pipeline_names = (_pipeline_step_ref,) _task_project_lookup = {} _clearml_job_class = ClearmlJob - _update_execution_plot_interval = 5.*60 - _update_progress_interval = 10. - _monitor_node_interval = 5.*60 + _update_execution_plot_interval = 5.0 * 60 + _update_progress_interval = 10.0 + _monitor_node_interval = 5.0 * 60 _pipeline_as_sub_project_cached = None - _report_plot_execution_flow = dict(title='Pipeline', series='Execution Flow') - _report_plot_execution_details = dict(title='Pipeline Details', series='Execution Details') + _report_plot_execution_flow = dict(title="Pipeline", series="Execution Flow") + _report_plot_execution_details = dict(title="Pipeline Details", series="Execution Details") _evaluated_return_values = {} # TID: pipeline_name _add_to_evaluated_return_values = {} # TID: bool _retries = {} # Node.name: int @@ -173,8 +174,11 @@ class PipelineController(object): """ new_copy = PipelineController.Node( name=self.name, - **dict((k, deepcopy(v)) for k, v in self.__dict__.items() - if k not in ('name', 'job', 'executed', 'task_factory_func')) + **dict( + (k, deepcopy(v)) + for k, v in self.__dict__.items() + if k not in ("name", "job", "executed", "task_factory_func") + ) ) new_copy.task_factory_func = self.task_factory_func return new_copy @@ -199,31 +203,31 @@ class PipelineController(object): pass def __init__( - self, - name, # type: str - project, # type: str - version=None, # type: Optional[str] - pool_frequency=0.2, # type: float - add_pipeline_tags=False, # type: bool - target_project=True, # type: Optional[Union[str, bool]] - auto_version_bump=None, # type: Optional[bool] - abort_on_failure=False, # type: bool - add_run_number=True, # type: bool - retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa - docker=None, # type: Optional[str] - docker_args=None, # type: Optional[str] - docker_bash_setup_script=None, # type: Optional[str] - packages=None, # type: Optional[Union[bool, str, Sequence[str]]] - repo=None, # type: Optional[str] - repo_branch=None, # type: Optional[str] - repo_commit=None, # type: Optional[str] - always_create_from_code=True, # type: bool - artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]] - artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]] - output_uri=None, # type: Optional[Union[str, bool]] - skip_global_imports=False, # type: bool - working_dir=None, # type: Optional[str] - enable_local_imports=True # type: bool + self, + name, # type: str + project, # type: str + version=None, # type: Optional[str] + pool_frequency=0.2, # type: float + add_pipeline_tags=False, # type: bool + target_project=True, # type: Optional[Union[str, bool]] + auto_version_bump=None, # type: Optional[bool] + abort_on_failure=False, # type: bool + add_run_number=True, # type: bool + retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa + docker=None, # type: Optional[str] + docker_args=None, # type: Optional[str] + docker_bash_setup_script=None, # type: Optional[str] + packages=None, # type: Optional[Union[bool, str, Sequence[str]]] + repo=None, # type: Optional[str] + repo_branch=None, # type: Optional[str] + repo_commit=None, # type: Optional[str] + always_create_from_code=True, # type: bool + artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]] + artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]] + output_uri=None, # type: Optional[Union[str, bool]] + skip_global_imports=False, # type: bool + working_dir=None, # type: Optional[str] + enable_local_imports=True, # type: bool ): # type: (...) -> None """ @@ -334,10 +338,8 @@ class PipelineController(object): self._always_create_from_code = bool(always_create_from_code) self._version = str(version).strip() if version else None if self._version and not Version.is_valid_version_string(self._version): - raise ValueError( - "Setting non-semantic pipeline version '{}'".format(self._version) - ) - self._pool_frequency = pool_frequency * 60. + raise ValueError("Setting non-semantic pipeline version '{}'".format(self._version)) + self._pool_frequency = pool_frequency * 60.0 self._thread = None self._pipeline_args = dict() self._pipeline_args_desc = dict() @@ -374,13 +376,13 @@ class PipelineController(object): task_name=pipeline_project_args["task_name"], task_type=Task.TaskTypes.controller, auto_resource_monitoring=False, - reuse_last_task_id=False + reuse_last_task_id=False, ) # if user disabled the auto-repo, set it back to False (just in case) if set_force_local_repo: # noinspection PyProtectedMember - self._task._wait_for_repo_detection(timeout=300.) + self._task._wait_for_repo_detection(timeout=300.0) Task.force_store_standalone_script(force=False) self._create_pipeline_projects( @@ -406,20 +408,23 @@ class PipelineController(object): self._monitored_nodes = {} # type: Dict[str, dict] self._abort_running_steps_on_failure = abort_on_failure self._def_max_retry_on_failure = retry_on_failure if isinstance(retry_on_failure, int) else 0 - self._retry_on_failure_callback = retry_on_failure if callable(retry_on_failure) \ - else self._default_retry_on_failure_callback + self._retry_on_failure_callback = ( + retry_on_failure if callable(retry_on_failure) else self._default_retry_on_failure_callback + ) # add direct link to the pipeline page if self._pipeline_as_sub_project() and self._task: if add_run_number and self._task.running_locally(): self._add_pipeline_name_run_number(self._task) # noinspection PyProtectedMember - self._task.get_logger().report_text('ClearML pipeline page: {}'.format( - '{}/pipelines/{}/experiments/{}'.format( - self._task._get_app_server(), - self._task.project if self._task.project is not None else '*', - self._task.id, - )) + self._task.get_logger().report_text( + "ClearML pipeline page: {}".format( + "{}/pipelines/{}/experiments/{}".format( + self._task._get_app_server(), + self._task.project if self._task.project is not None else "*", + self._task.id, + ) + ) ) @classmethod @@ -445,34 +450,34 @@ class PipelineController(object): :param float max_execution_minutes: The maximum time (minutes) for the entire pipeline process. The default is ``None``, indicating no time limit. """ - self._pipeline_time_limit = max_execution_minutes * 60. if max_execution_minutes else None + self._pipeline_time_limit = max_execution_minutes * 60.0 if max_execution_minutes else None def add_step( - self, - name, # type: str - base_task_id=None, # type: Optional[str] - parents=None, # type: Optional[Sequence[str]] - parameter_override=None, # type: Optional[Mapping[str, Any]] - configuration_overrides=None, # type: Optional[Mapping[str, Union[str, Mapping]]] - task_overrides=None, # type: Optional[Mapping[str, Any]] - execution_queue=None, # type: Optional[str] - monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]] - monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]] - monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]] - time_limit=None, # type: Optional[float] - base_task_project=None, # type: Optional[str] - base_task_name=None, # type: Optional[str] - clone_base_task=True, # type: bool - continue_on_fail=False, # type: bool - pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa - post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa - cache_executed_step=False, # type: bool - base_task_factory=None, # type: Optional[Callable[[PipelineController.Node], Task]] - retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa - status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa - recursively_parse_parameters=False, # type: bool - output_uri=None, # type: Optional[Union[str, bool]] - continue_behaviour=None # type: Optional[dict] + self, + name, # type: str + base_task_id=None, # type: Optional[str] + parents=None, # type: Optional[Sequence[str]] + parameter_override=None, # type: Optional[Mapping[str, Any]] + configuration_overrides=None, # type: Optional[Mapping[str, Union[str, Mapping]]] + task_overrides=None, # type: Optional[Mapping[str, Any]] + execution_queue=None, # type: Optional[str] + monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]] + monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]] + monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]] + time_limit=None, # type: Optional[float] + base_task_project=None, # type: Optional[str] + base_task_name=None, # type: Optional[str] + clone_base_task=True, # type: bool + continue_on_fail=False, # type: bool + pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa + post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa + cache_executed_step=False, # type: bool + base_task_factory=None, # type: Optional[Callable[[PipelineController.Node], Task]] + retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa + status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa + recursively_parse_parameters=False, # type: bool + output_uri=None, # type: Optional[Union[str, bool]] + continue_behaviour=None, # type: Optional[dict] ): # type: (...) -> bool """ @@ -648,40 +653,54 @@ class PipelineController(object): if not base_task_factory and not base_task_id: if not base_task_project or not base_task_name: - raise ValueError('Either base_task_id or base_task_project/base_task_name must be provided') + raise ValueError("Either base_task_id or base_task_project/base_task_name must be provided") base_task = Task.get_task( project_name=base_task_project, task_name=base_task_name, allow_archived=True, task_filter=dict( - status=[str(Task.TaskStatusEnum.created), str(Task.TaskStatusEnum.queued), - str(Task.TaskStatusEnum.in_progress), str(Task.TaskStatusEnum.published), - str(Task.TaskStatusEnum.stopped), str(Task.TaskStatusEnum.completed), - str(Task.TaskStatusEnum.closed)], - ) + status=[ + str(Task.TaskStatusEnum.created), + str(Task.TaskStatusEnum.queued), + str(Task.TaskStatusEnum.in_progress), + str(Task.TaskStatusEnum.published), + str(Task.TaskStatusEnum.stopped), + str(Task.TaskStatusEnum.completed), + str(Task.TaskStatusEnum.closed), + ], + ), ) if not base_task: - raise ValueError('Could not find base_task_project={} base_task_name={}'.format( - base_task_project, base_task_name)) + raise ValueError( + "Could not find base_task_project={} base_task_name={}".format(base_task_project, base_task_name) + ) if Task.archived_tag in base_task.get_system_tags(): LoggerRoot.get_base_logger().warning( - 'Found base_task_project={} base_task_name={} but it is archived'.format( - base_task_project, base_task_name)) + "Found base_task_project={} base_task_name={} but it is archived".format( + base_task_project, base_task_name + ) + ) base_task_id = base_task.id if configuration_overrides is not None: # verify we have a dict or a string on all values - if not isinstance(configuration_overrides, dict) or \ - not all(isinstance(v, (str, dict)) for v in configuration_overrides.values()): - raise ValueError("configuration_overrides must be a dictionary, with all values " - "either dicts or strings, got \'{}\' instead".format(configuration_overrides)) + if not isinstance(configuration_overrides, dict) or not all( + isinstance(v, (str, dict)) for v in configuration_overrides.values() + ): + raise ValueError( + "configuration_overrides must be a dictionary, with all values " + "either dicts or strings, got '{}' instead".format(configuration_overrides) + ) if task_overrides: - task_overrides = flatten_dictionary(task_overrides, sep='.') + task_overrides = flatten_dictionary(task_overrides, sep=".") self._nodes[name] = self.Node( - name=name, base_task_id=base_task_id, parents=parents or [], - queue=execution_queue, timeout=time_limit, + name=name, + base_task_id=base_task_id, + parents=parents or [], + queue=execution_queue, + timeout=time_limit, parameters=parameter_override or {}, recursively_parse_parameters=recursively_parse_parameters, configurations=configuration_overrides, @@ -694,12 +713,18 @@ class PipelineController(object): monitor_artifacts=monitor_artifacts or [], monitor_models=monitor_models or [], output_uri=self._output_uri if output_uri is None else output_uri, - continue_behaviour=continue_behaviour + continue_behaviour=continue_behaviour, ) self._retries[name] = 0 - self._retries_callbacks[name] = retry_on_failure if callable(retry_on_failure) else \ - (functools.partial(self._default_retry_on_failure_callback, max_retries=retry_on_failure) - if isinstance(retry_on_failure, int) else self._retry_on_failure_callback) + self._retries_callbacks[name] = ( + retry_on_failure + if callable(retry_on_failure) + else ( + functools.partial(self._default_retry_on_failure_callback, max_retries=retry_on_failure) + if isinstance(retry_on_failure, int) + else self._retry_on_failure_callback + ) + ) if status_change_callback: self._status_change_callbacks[name] = status_change_callback @@ -709,41 +734,41 @@ class PipelineController(object): return True def add_function_step( - self, - name, # type: str - function, # type: Callable - function_kwargs=None, # type: Optional[Dict[str, Any]] - function_return=None, # type: Optional[List[str]] - project_name=None, # type: Optional[str] - task_name=None, # type: Optional[str] - task_type=None, # type: Optional[str] - auto_connect_frameworks=None, # type: Optional[dict] - auto_connect_arg_parser=None, # type: Optional[dict] - packages=None, # type: Optional[Union[bool, str, Sequence[str]]] - repo=None, # type: Optional[str] - repo_branch=None, # type: Optional[str] - repo_commit=None, # type: Optional[str] - helper_functions=None, # type: Optional[Sequence[Callable]] - docker=None, # type: Optional[str] - docker_args=None, # type: Optional[str] - docker_bash_setup_script=None, # type: Optional[str] - parents=None, # type: Optional[Sequence[str]] - execution_queue=None, # type: Optional[str] - monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]] - monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]] - monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]] - time_limit=None, # type: Optional[float] - continue_on_fail=False, # type: bool - pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa - post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa - cache_executed_step=False, # type: bool - retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa - status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa - tags=None, # type: Optional[Union[str, Sequence[str]]] - output_uri=None, # type: Optional[Union[str, bool]] - draft=False, # type: Optional[bool] - working_dir=None, # type: Optional[str] - continue_behaviour=None # type: Optional[dict] + self, + name, # type: str + function, # type: Callable + function_kwargs=None, # type: Optional[Dict[str, Any]] + function_return=None, # type: Optional[List[str]] + project_name=None, # type: Optional[str] + task_name=None, # type: Optional[str] + task_type=None, # type: Optional[str] + auto_connect_frameworks=None, # type: Optional[dict] + auto_connect_arg_parser=None, # type: Optional[dict] + packages=None, # type: Optional[Union[bool, str, Sequence[str]]] + repo=None, # type: Optional[str] + repo_branch=None, # type: Optional[str] + repo_commit=None, # type: Optional[str] + helper_functions=None, # type: Optional[Sequence[Callable]] + docker=None, # type: Optional[str] + docker_args=None, # type: Optional[str] + docker_bash_setup_script=None, # type: Optional[str] + parents=None, # type: Optional[Sequence[str]] + execution_queue=None, # type: Optional[str] + monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]] + monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]] + monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]] + time_limit=None, # type: Optional[float] + continue_on_fail=False, # type: bool + pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa + post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa + cache_executed_step=False, # type: bool + retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa + status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa + tags=None, # type: Optional[Union[str, Sequence[str]]] + output_uri=None, # type: Optional[Union[str, bool]] + draft=False, # type: Optional[bool] + working_dir=None, # type: Optional[str] + continue_behaviour=None, # type: Optional[dict] ): # type: (...) -> bool """ @@ -938,7 +963,7 @@ class PipelineController(object): function_kwargs = function_kwargs or {} default_kwargs = inspect.getfullargspec(function) if default_kwargs and default_kwargs.args and default_kwargs.defaults: - for key, val in zip(default_kwargs.args[-len(default_kwargs.defaults):], default_kwargs.defaults): + for key, val in zip(default_kwargs.args[-len(default_kwargs.defaults) :], default_kwargs.defaults): function_kwargs.setdefault(key, val) return self._add_function_step( @@ -975,15 +1000,15 @@ class PipelineController(object): output_uri=output_uri, draft=draft, working_dir=working_dir, - continue_behaviour=continue_behaviour + continue_behaviour=continue_behaviour, ) def start( - self, - queue='services', - step_task_created_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa - step_task_completed_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa - wait=True, + self, + queue="services", + step_task_created_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa + step_task_completed_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa + wait=True, ): # type: (...) -> bool """ @@ -1029,8 +1054,8 @@ class PipelineController(object): """ if not self._task: raise ValueError( - "Could not find main Task, " - "PipelineController must be created with `always_create_task=True`") + "Could not find main Task, " "PipelineController must be created with `always_create_task=True`" + ) # serialize state only if we are running locally if Task.running_locally() or not self._task.is_main_task(): @@ -1045,7 +1070,7 @@ class PipelineController(object): self._start( step_task_created_callback=step_task_created_callback, step_task_completed_callback=step_task_completed_callback, - wait=wait + wait=wait, ) return True @@ -1066,12 +1091,12 @@ class PipelineController(object): """ if not self._task: raise ValueError( - "Could not find main Task, " - "PipelineController must be created with `always_create_task=True`") + "Could not find main Task, " "PipelineController must be created with `always_create_task=True`" + ) if run_pipeline_steps_locally: self._clearml_job_class = LocalClearmlJob - self._default_execution_queue = self._default_execution_queue or 'mock' + self._default_execution_queue = self._default_execution_queue or "mock" # serialize state only if we are running locally if Task.running_locally() or not self._task.is_main_task(): @@ -1187,7 +1212,7 @@ class PipelineController(object): auto_pickle=None, # type: Optional[bool] preview=None, # type: Any wait_on_upload=False, # type: bool - serialization_function=None # type: Optional[Callable[[Any], Union[bytes, bytearray]]] + serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]] ): # type: (...) -> bool """ @@ -1254,7 +1279,7 @@ class PipelineController(object): auto_pickle=auto_pickle, preview=preview, wait_on_upload=wait_on_upload, - serialization_function=serialization_function + serialization_function=serialization_function, ) def stop(self, timeout=None, mark_failed=False, mark_aborted=False): @@ -1280,12 +1305,12 @@ class PipelineController(object): self._task.close() if mark_failed: - self._task.mark_failed(status_reason='Pipeline aborted and failed', force=True) + self._task.mark_failed(status_reason="Pipeline aborted and failed", force=True) elif mark_aborted: - self._task.mark_stopped(status_message='Pipeline aborted', force=True) + self._task.mark_stopped(status_message="Pipeline aborted", force=True) elif self._pipeline_task_status_failed: - print('Setting pipeline controller Task as failed (due to failed steps) !') - self._task.mark_failed(status_reason='Pipeline step failed', force=True) + print("Setting pipeline controller Task as failed (due to failed steps) !") + self._task.mark_failed(status_reason="Pipeline step failed", force=True) def wait(self, timeout=None): # type: (Optional[float]) -> bool @@ -1305,7 +1330,7 @@ class PipelineController(object): return True if timeout is not None: - timeout *= 60. + timeout *= 60.0 _thread = self._thread @@ -1366,7 +1391,7 @@ class PipelineController(object): """ if self._start_time is None: return -1.0 - return (time() - self._start_time) / 60. + return (time() - self._start_time) / 60.0 def get_pipeline_dag(self): # type: () -> Mapping[str, PipelineController.Node] @@ -1450,13 +1475,13 @@ class PipelineController(object): @classmethod def _create_pipeline_project_args(cls, name, project): - task_name = name or project or '{}'.format(datetime.now()) + task_name = name or project or "{}".format(datetime.now()) if cls._pipeline_as_sub_project(): parent_project = (project + "/" if project else "") + cls._project_section project_name = "{}/{}".format(parent_project, task_name) else: parent_project = None - project_name = project or 'Pipelines' + project_name = project or "Pipelines" return {"task_name": task_name, "parent_project": parent_project, "project_name": project_name} @classmethod @@ -1526,9 +1551,7 @@ class PipelineController(object): :return: The newly created PipelineController """ - pipeline_project_args = cls._create_pipeline_project_args( - name=task_name, project=project_name - ) + pipeline_project_args = cls._create_pipeline_project_args(name=task_name, project=project_name) pipeline_controller = Task.create( project_name=pipeline_project_args["project_name"], task_name=pipeline_project_args["task_name"], @@ -1545,7 +1568,7 @@ class PipelineController(object): docker_bash_setup_script=docker_bash_setup_script, argparse_args=argparse_args, add_task_init_call=False, - force_single_script_file=force_single_script_file + force_single_script_file=force_single_script_file, ) cls._create_pipeline_projects( task=pipeline_controller, @@ -1567,7 +1590,7 @@ class PipelineController(object): comment=None, # type: Optional[str] parent=None, # type: Optional[str] project=None, # type: Optional[str] - version=None # type: Optional[str] + version=None, # type: Optional[str] ): # type: (...) -> PipelineController """ @@ -1670,7 +1693,7 @@ class PipelineController(object): pipeline_name=None, # type: Optional[str] pipeline_version=None, # type: Optional[str] pipeline_tags=None, # type: Optional[Sequence[str]] - shallow_search=False # type: bool + shallow_search=False, # type: bool ): # type: (...) -> "PipelineController" """ @@ -1722,7 +1745,9 @@ class PipelineController(object): pipeline_id = pipeline.id break if not pipeline_id: - error_msg = "Could not find dataset with pipeline_project={}, pipeline_name={}".format(pipeline_project, pipeline_name) + error_msg = "Could not find dataset with pipeline_project={}, pipeline_name={}".format( + pipeline_project, pipeline_name + ) if pipeline_version: error_msg += ", pipeline_version={}".format(pipeline_version) raise ValueError(error_msg) @@ -1795,7 +1820,7 @@ class PipelineController(object): commit, helper_functions, output_uri=None, - working_dir=None + working_dir=None, ): task_definition = CreateFromFunction.create_task_from_function( a_function=function, @@ -1821,15 +1846,15 @@ class PipelineController(object): artifact_serialization_function=self._artifact_serialization_function, artifact_deserialization_function=self._artifact_deserialization_function, skip_global_imports=self._skip_global_imports, - working_dir=working_dir + working_dir=working_dir, ) return task_definition def _start( - self, - step_task_created_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa - step_task_completed_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa - wait=True, + self, + step_task_created_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa + step_task_completed_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa + wait=True, ): # type: (...) -> bool """ @@ -1887,31 +1912,32 @@ class PipelineController(object): return True def _prepare_pipeline( - self, - step_task_created_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa - step_task_completed_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa + self, + step_task_created_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa + step_task_completed_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa ): # type (...) -> None params, pipeline_dag = self._serialize_pipeline_task() # deserialize back pipeline state - if not params['continue_pipeline']: + if not params["continue_pipeline"]: for k in pipeline_dag: - pipeline_dag[k]['executed'] = None - pipeline_dag[k]['job_started'] = None - pipeline_dag[k]['job_ended'] = None - self._default_execution_queue = params['default_queue'] - self._add_pipeline_tags = params['add_pipeline_tags'] - self._target_project = params['target_project'] or '' + pipeline_dag[k]["executed"] = None + pipeline_dag[k]["job_started"] = None + pipeline_dag[k]["job_ended"] = None + self._default_execution_queue = params["default_queue"] + self._add_pipeline_tags = params["add_pipeline_tags"] + self._target_project = params["target_project"] or "" self._deserialize(pipeline_dag) # if we continue the pipeline, make sure that we re-execute failed tasks - if params['continue_pipeline']: + if params["continue_pipeline"]: for node in list(self._nodes.values()): if node.executed is False: node.executed = None if not self._verify(): - raise ValueError("Failed verifying pipeline execution graph, " - "it has either inaccessible nodes, or contains cycles") + raise ValueError( + "Failed verifying pipeline execution graph, " "it has either inaccessible nodes, or contains cycles" + ) self.update_execution_plot() self._start_time = time() self._stop_event = Event() @@ -1926,9 +1952,9 @@ class PipelineController(object): :return: params, pipeline_dag """ params = { - 'default_queue': self._default_execution_queue, - 'add_pipeline_tags': self._add_pipeline_tags, - 'target_project': self._target_project, + "default_queue": self._default_execution_queue, + "add_pipeline_tags": self._add_pipeline_tags, + "target_project": self._target_project, } pipeline_dag = self._serialize() @@ -1939,8 +1965,8 @@ class PipelineController(object): if self._task.running_locally() or self._task.get_configuration_object(name=self._config_section) is None: # noinspection PyProtectedMember self._task._set_configuration( - name=self._config_section, config_type='dictionary', - config_text=json.dumps(pipeline_dag, indent=2)) + name=self._config_section, config_type="dictionary", config_text=json.dumps(pipeline_dag, indent=2) + ) args_map_inversed = {} for section, arg_list in self._args_map.items(): for arg in arg_list: @@ -1957,7 +1983,7 @@ class PipelineController(object): __update=True, ) self._task.connect(params, name=self._pipeline_section) - params['continue_pipeline'] = False + params["continue_pipeline"] = False # make sure we have a unique version number (auto bump version if needed) # only needed when manually (from code) creating pipelines @@ -1967,10 +1993,12 @@ class PipelineController(object): pipeline_hash = self._get_task_hash() # noinspection PyProtectedMember - self._task._set_runtime_properties({ - self._runtime_property_hash: "{}:{}".format(pipeline_hash, self._version), - "version": self._version - }) + self._task._set_runtime_properties( + { + self._runtime_property_hash: "{}:{}".format(pipeline_hash, self._version), + "version": self._version, + } + ) self._task.set_user_properties(version=self._version) else: self._task.connect_configuration(pipeline_dag, name=self._config_section) @@ -1982,23 +2010,23 @@ class PipelineController(object): new_pipeline_args.update(mutable_dict) connected_args.update(arg_list) mutable_dict = {k: v for k, v in self._pipeline_args.items() if k not in connected_args} - self._task.connect( - mutable_dict, name=self._args_section - ) + self._task.connect(mutable_dict, name=self._args_section) new_pipeline_args.update(mutable_dict) self._pipeline_args = new_pipeline_args self._task.connect(params, name=self._pipeline_section) # noinspection PyProtectedMember if self._task._get_runtime_properties().get(self._runtime_property_hash): - params['continue_pipeline'] = True + params["continue_pipeline"] = True else: # noinspection PyProtectedMember pipeline_hash = ClearmlJob._create_task_hash(self._task) # noinspection PyProtectedMember - self._task._set_runtime_properties({ - self._runtime_property_hash: "{}:{}".format(pipeline_hash, self._version), - }) - params['continue_pipeline'] = False + self._task._set_runtime_properties( + { + self._runtime_property_hash: "{}:{}".format(pipeline_hash, self._version), + } + ) + params["continue_pipeline"] = False return params, pipeline_dag @@ -2014,7 +2042,7 @@ class PipelineController(object): order_by=["-last_update"], system_tags=[self._tag], search_hidden=True, - _allow_extra_fields_=True + _allow_extra_fields_=True, ) for previous_pipeline_task in previous_pipeline_tasks: if previous_pipeline_task.runtime.get("version"): @@ -2024,15 +2052,23 @@ class PipelineController(object): def _get_task_hash(self): params_override = dict(**(self._task.get_parameters() or {})) - params_override.pop('properties/version', None) + params_override.pop("properties/version", None) # dag state without status / states nodes_items = list(self._nodes.items()) dag = { name: { - k: v for k, v in node.__dict__.items() - if k not in ( - 'job', 'name', 'task_factory_func', 'executed', 'status', - 'job_started', 'job_ended', 'skip_job' + k: v + for k, v in node.__dict__.items() + if k + not in ( + "job", + "name", + "task_factory_func", + "executed", + "status", + "job_started", + "job_ended", + "skip_job", ) } for name, node in nodes_items @@ -2059,12 +2095,13 @@ class PipelineController(object): :return: """ nodes_items = list(self._nodes.items()) - dag = {name: dict((k, v) for k, v in node.__dict__.items() - if k not in ('job', 'name', 'task_factory_func')) - for name, node in nodes_items} + dag = { + name: dict((k, v) for k, v in node.__dict__.items() if k not in ("job", "name", "task_factory_func")) + for name, node in nodes_items + } # update state for presentation only for name, node in nodes_items: - dag[name]['job_id'] = node.executed or (node.job.task_id() if node.job else None) + dag[name]["job_id"] = node.executed or (node.job.task_id() if node.job else None) return dag @@ -2081,17 +2118,27 @@ class PipelineController(object): # if we do not clone the Task, only merge the parts we can override. for name in list(self._nodes.keys()): - if not self._nodes[name].clone_task and name in dag_dict and not dag_dict[name].get('clone_task'): - for k in ('queue', 'parents', 'timeout', 'parameters', 'configurations', 'task_overrides', - 'executed', 'job_started', 'job_ended'): + if not self._nodes[name].clone_task and name in dag_dict and not dag_dict[name].get("clone_task"): + for k in ( + "queue", + "parents", + "timeout", + "parameters", + "configurations", + "task_overrides", + "executed", + "job_started", + "job_ended", + ): setattr(self._nodes[name], k, dag_dict[name].get(k) or type(getattr(self._nodes[name], k))()) # if we do clone the Task deserialize everything, except the function creating self._nodes = { - k: self.Node(name=k, **{kk: vv for kk, vv in v.items() if kk not in ('job_id', )}) - if k not in self._nodes or (v.get('base_task_id') and v.get('clone_task')) + k: self.Node(name=k, **{kk: vv for kk, vv in v.items() if kk not in ("job_id",)}) + if k not in self._nodes or (v.get("base_task_id") and v.get("clone_task")) else self._nodes[k] - for k, v in dag_dict.items()} + for k, v in dag_dict.items() + } # set the task_factory_func for each cloned node for node in list(self._nodes.values()): @@ -2141,8 +2188,10 @@ class PipelineController(object): raise ValueError("Node '{}', base_task_id is empty".format(node.name)) if not self._default_execution_queue and not node.queue: - raise ValueError("Node '{}' missing execution queue, " - "no default queue defined and no specific node queue defined".format(node.name)) + raise ValueError( + "Node '{}' missing execution queue, " + "no default queue defined and no specific node queue defined".format(node.name) + ) task = node.task_factory_func or Task.get_task(task_id=node.base_task_id) if not task: @@ -2162,16 +2211,18 @@ class PipelineController(object): if ref_step: parents.add(ref_step) # verify we have a section name - if '/' not in k: + if "/" not in k: raise ValueError( - "Section name is missing in parameter \"{}\", " + 'Section name is missing in parameter "{}", ' "parameters should be in the form of " - "\"`section-name`/parameter\", example: \"Args/param\"".format(v)) + '"`section-name`/parameter", example: "Args/param"'.format(v) + ) if parents and parents != set(node.parents or []): parents = parents - set(node.parents or []) - getLogger('clearml.automation.controller').info( - 'Node "{}" missing parent reference, adding: {}'.format(node.name, parents)) + getLogger("clearml.automation.controller").info( + 'Node "{}" missing parent reference, adding: {}'.format(node.name, parents) + ) node.parents = (node.parents or []) + list(parents) # verify and fix monitoring sections: @@ -2183,36 +2234,32 @@ class PipelineController(object): if not all(isinstance(x, (list, tuple)) and x for x in monitors): raise ValueError("{} should be a list of tuples, found: {}".format(monitor_type, monitors)) # convert single pair into a pair of pairs: - conformed_monitors = [ - pair if isinstance(pair[0], (list, tuple)) else (pair, pair) for pair in monitors - ] + conformed_monitors = [pair if isinstance(pair[0], (list, tuple)) else (pair, pair) for pair in monitors] # verify the pair of pairs - if not all(isinstance(x[0][0], str) and isinstance(x[0][1], str) and - isinstance(x[1][0], str) and isinstance(x[1][1], str) - for x in conformed_monitors): + if not all( + isinstance(x[0][0], str) + and isinstance(x[0][1], str) + and isinstance(x[1][0], str) + and isinstance(x[1][1], str) + for x in conformed_monitors + ): raise ValueError("{} should be a list of tuples, found: {}".format(monitor_type, monitors)) else: # verify a list of tuples if not all(isinstance(x, (list, tuple, str)) and x for x in monitors): - raise ValueError( - "{} should be a list of tuples, found: {}".format(monitor_type, monitors)) + raise ValueError("{} should be a list of tuples, found: {}".format(monitor_type, monitors)) # convert single str into a pair of pairs: - conformed_monitors = [ - pair if isinstance(pair, (list, tuple)) else (pair, pair) for pair in monitors - ] + conformed_monitors = [pair if isinstance(pair, (list, tuple)) else (pair, pair) for pair in monitors] # verify the pair of pairs - if not all(isinstance(x[0], str) and - isinstance(x[1], str) - for x in conformed_monitors): - raise ValueError( - "{} should be a list of tuples, found: {}".format(monitor_type, monitors)) + if not all(isinstance(x[0], str) and isinstance(x[1], str) for x in conformed_monitors): + raise ValueError("{} should be a list of tuples, found: {}".format(monitor_type, monitors)) return conformed_monitors # verify and fix monitoring sections: - node.monitor_metrics = _verify_monitors(node.monitor_metrics, 'monitor_metrics', nested_pairs=True) - node.monitor_artifacts = _verify_monitors(node.monitor_artifacts, 'monitor_artifacts') - node.monitor_models = _verify_monitors(node.monitor_models, 'monitor_models') + node.monitor_metrics = _verify_monitors(node.monitor_metrics, "monitor_metrics", nested_pairs=True) + node.monitor_artifacts = _verify_monitors(node.monitor_artifacts, "monitor_artifacts") + node.monitor_models = _verify_monitors(node.monitor_models, "monitor_models") return True @@ -2238,41 +2285,41 @@ class PipelineController(object): return not bool(set(self._nodes.keys()) - visited) def _add_function_step( - self, - name, # type: str - function, # type: Callable - function_kwargs=None, # type: Optional[Dict[str, Any]] - function_return=None, # type: Optional[List[str]] - project_name=None, # type: Optional[str] - task_name=None, # type: Optional[str] - task_type=None, # type: Optional[str] - auto_connect_frameworks=None, # type: Optional[dict] - auto_connect_arg_parser=None, # type: Optional[dict] - packages=None, # type: Optional[Union[bool, str, Sequence[str]]] - repo=None, # type: Optional[str] - repo_branch=None, # type: Optional[str] - repo_commit=None, # type: Optional[str] - helper_functions=None, # type: Optional[Sequence[Callable]] - docker=None, # type: Optional[str] - docker_args=None, # type: Optional[str] - docker_bash_setup_script=None, # type: Optional[str] - parents=None, # type: Optional[Sequence[str]] - execution_queue=None, # type: Optional[str] - monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]] - monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]] - monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]] - time_limit=None, # type: Optional[float] - continue_on_fail=False, # type: bool - pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa - post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa - cache_executed_step=False, # type: bool - retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa - status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa - tags=None, # type: Optional[Union[str, Sequence[str]]] - output_uri=None, # type: Optional[Union[str, bool]] - draft=False, # type: Optional[bool] - working_dir=None, # type: Optional[str] - continue_behaviour=None # type: Optional[dict] + self, + name, # type: str + function, # type: Callable + function_kwargs=None, # type: Optional[Dict[str, Any]] + function_return=None, # type: Optional[List[str]] + project_name=None, # type: Optional[str] + task_name=None, # type: Optional[str] + task_type=None, # type: Optional[str] + auto_connect_frameworks=None, # type: Optional[dict] + auto_connect_arg_parser=None, # type: Optional[dict] + packages=None, # type: Optional[Union[bool, str, Sequence[str]]] + repo=None, # type: Optional[str] + repo_branch=None, # type: Optional[str] + repo_commit=None, # type: Optional[str] + helper_functions=None, # type: Optional[Sequence[Callable]] + docker=None, # type: Optional[str] + docker_args=None, # type: Optional[str] + docker_bash_setup_script=None, # type: Optional[str] + parents=None, # type: Optional[Sequence[str]] + execution_queue=None, # type: Optional[str] + monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]] + monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]] + monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]] + time_limit=None, # type: Optional[float] + continue_on_fail=False, # type: bool + pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa + post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa + cache_executed_step=False, # type: bool + retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa + status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa + tags=None, # type: Optional[Union[str, Sequence[str]]] + output_uri=None, # type: Optional[Union[str, bool]] + draft=False, # type: Optional[bool] + working_dir=None, # type: Optional[str] + continue_behaviour=None, # type: Optional[dict] ): # type: (...) -> bool """ @@ -2481,7 +2528,7 @@ class PipelineController(object): continue if self._step_ref_pattern.match(str(v)): # check for step artifacts - step, _, artifact = v[2:-1].partition('.') + step, _, artifact = v[2:-1].partition(".") if step in self._nodes and artifact in self._nodes[step].return_artifacts: function_input_artifacts[k] = "${{{}.id}}.{}".format(step, artifact) continue @@ -2497,8 +2544,10 @@ class PipelineController(object): parameters = {"{}/{}".format(CreateFromFunction.kwargs_section, k): v for k, v in function_kwargs.items()} if function_input_artifacts: parameters.update( - {"{}/{}".format(CreateFromFunction.input_artifact_section, k): str(v) - for k, v in function_input_artifacts.items()} + { + "{}/{}".format(CreateFromFunction.input_artifact_section, k): str(v) + for k, v in function_input_artifacts.items() + } ) job_code_section = name @@ -2556,8 +2605,7 @@ class PipelineController(object): # update configuration with the task definitions # noinspection PyProtectedMember self._task._set_configuration( - name=name, config_type='json', - config_text=json.dumps(task_definition, indent=1) + name=name, config_type="json", config_text=json.dumps(task_definition, indent=1) ) else: # load task definition from configuration @@ -2568,8 +2616,8 @@ class PipelineController(object): def _create_task(_): a_task = Task.create( project_name=project_name, - task_name=task_definition.get('name'), - task_type=task_definition.get('type'), + task_name=task_definition.get("name"), + task_type=task_definition.get("type"), ) # replace reference a_task.update_task(task_definition) @@ -2583,8 +2631,11 @@ class PipelineController(object): return a_task self._nodes[name] = self.Node( - name=name, base_task_id=None, parents=parents or [], - queue=execution_queue, timeout=time_limit, + name=name, + base_task_id=None, + parents=parents or [], + queue=execution_queue, + timeout=time_limit, parameters=parameters, clone_task=False, cache_executed_step=cache_executed_step, @@ -2598,12 +2649,18 @@ class PipelineController(object): explicit_docker_image=docker, output_uri=output_uri, draft=draft, - continue_behaviour=continue_behaviour + continue_behaviour=continue_behaviour, ) self._retries[name] = 0 - self._retries_callbacks[name] = retry_on_failure if callable(retry_on_failure) else \ - (functools.partial(self._default_retry_on_failure_callback, max_retries=retry_on_failure) - if isinstance(retry_on_failure, int) else self._retry_on_failure_callback) + self._retries_callbacks[name] = ( + retry_on_failure + if callable(retry_on_failure) + else ( + functools.partial(self._default_retry_on_failure_callback, max_retries=retry_on_failure) + if isinstance(retry_on_failure, int) + else self._retry_on_failure_callback + ) + ) return True @@ -2640,10 +2697,10 @@ class PipelineController(object): node.job_type = None if node.job or node.executed: - print('Skipping cached/executed step [{}]'.format(node.name)) + print("Skipping cached/executed step [{}]".format(node.name)) return False - print('Launching step [{}]'.format(node.name)) + print("Launching step [{}]".format(node.name)) updated_hyper_parameters = {} for k, v in node.parameters.items(): @@ -2681,8 +2738,9 @@ class PipelineController(object): base_task_id=task_id, parameter_override=updated_hyper_parameters, configuration_overrides=node.configurations, - tags=['{} {}'.format(self._node_tag_prefix, self._task.id)] - if self._add_pipeline_tags and self._task else None, + tags=["{} {}".format(self._node_tag_prefix, self._task.id)] + if self._add_pipeline_tags and self._task + else None, parent=self._task.id if self._task else None, disable_clone_task=disable_clone_task, task_overrides=task_overrides, @@ -2704,8 +2762,7 @@ class PipelineController(object): if skip_node is False: # skipping node - getLogger('clearml.automation.controller').warning( - 'Skipping node {} on callback request'.format(node)) + getLogger("clearml.automation.controller").warning("Skipping node {} on callback request".format(node)) # delete the job we just created node.job.delete() node.skip_job = True @@ -2742,7 +2799,7 @@ class PipelineController(object): sankey_node = dict( label=[], color=[], - hovertemplate='%{label}', + hovertemplate="%{label}", # customdata=[], # hovertemplate='%{label}
Hyper-Parameters:
%{customdata}', ) @@ -2751,7 +2808,7 @@ class PipelineController(object): target=[], value=[], # hovertemplate='%{target.label}', - hovertemplate='', + hovertemplate="", ) visited = [] node_params = [] @@ -2778,17 +2835,20 @@ class PipelineController(object): # sankey_node['label'].append(node.name) # sankey_node['customdata'].append( # '
'.join('{}: {}'.format(k, v) for k, v in (node.parameters or {}).items())) - sankey_node['label'].append( - '{}
'.format(node.name) + - '
'.join('{}: {}'.format(k, v if len(str(v)) < 24 else (str(v)[:24]+' ...')) - for k, v in (node.parameters or {}).items())) + sankey_node["label"].append( + "{}
".format(node.name) + + "
".join( + "{}: {}".format(k, v if len(str(v)) < 24 else (str(v)[:24] + " ...")) + for k, v in (node.parameters or {}).items() + ) + ) - sankey_node['color'].append(self._get_node_color(node)) + sankey_node["color"].append(self._get_node_color(node)) for p in parents: - sankey_link['source'].append(p) - sankey_link['target'].append(idx) - sankey_link['value'].append(1) + sankey_link["source"].append(p) + sankey_link["target"].append(idx) + sankey_link["value"].append(1) # if nothing changed, we give up if nodes == next_nodes: @@ -2798,16 +2858,16 @@ class PipelineController(object): # make sure we have no independent (unconnected) nodes single_nodes = [] - for i in [n for n in range(len(visited)) if n not in sankey_link['source'] and n not in sankey_link['target']]: + for i in [n for n in range(len(visited)) if n not in sankey_link["source"] and n not in sankey_link["target"]]: single_nodes.append(i) # create the sankey graph dag_flow = dict( link=sankey_link, node=sankey_node, - textfont=dict(color='rgba(0,0,0,0)', size=1), - type='sankey', - orientation='h' + textfont=dict(color="rgba(0,0,0,0)", size=1), + type="sankey", + orientation="h", ) table_values = self._build_table_report(node_params, visited) @@ -2815,43 +2875,53 @@ class PipelineController(object): # hack, show single node sankey if single_nodes: singles_flow = dict( - x=list(range(len(single_nodes))), y=[1] * len(single_nodes), - text=[v for i, v in enumerate(sankey_node['label']) if i in single_nodes], - mode='markers', + x=list(range(len(single_nodes))), + y=[1] * len(single_nodes), + text=[v for i, v in enumerate(sankey_node["label"]) if i in single_nodes], + mode="markers", hovertemplate="%{text}", marker=dict( - color=[v for i, v in enumerate(sankey_node['color']) if i in single_nodes], + color=[v for i, v in enumerate(sankey_node["color"]) if i in single_nodes], size=[40] * len(single_nodes), ), showlegend=False, - type='scatter', + type="scatter", ) # only single nodes - if len(single_nodes) == len(sankey_node['label']): - fig = dict(data=[singles_flow], layout={ - 'hovermode': 'closest', 'xaxis': {'visible': False}, 'yaxis': {'visible': False}}) + if len(single_nodes) == len(sankey_node["label"]): + fig = dict( + data=[singles_flow], + layout={"hovermode": "closest", "xaxis": {"visible": False}, "yaxis": {"visible": False}}, + ) else: - dag_flow['domain'] = {'x': [0.0, 1.0], 'y': [0.2, 1.0]} - fig = dict(data=[dag_flow, singles_flow], - layout={'autosize': True, - 'hovermode': 'closest', - 'xaxis': {'anchor': 'y', 'domain': [0.0, 1.0], 'visible': False}, - 'yaxis': {'anchor': 'x', 'domain': [0.0, 0.15], 'visible': False} - }) + dag_flow["domain"] = {"x": [0.0, 1.0], "y": [0.2, 1.0]} + fig = dict( + data=[dag_flow, singles_flow], + layout={ + "autosize": True, + "hovermode": "closest", + "xaxis": {"anchor": "y", "domain": [0.0, 1.0], "visible": False}, + "yaxis": {"anchor": "x", "domain": [0.0, 0.15], "visible": False}, + }, + ) else: # create the sankey plot - fig = dict(data=[dag_flow], layout={'xaxis': {'visible': False}, 'yaxis': {'visible': False}}) + fig = dict(data=[dag_flow], layout={"xaxis": {"visible": False}, "yaxis": {"visible": False}}) # report DAG self._task.get_logger().report_plotly( - title=self._report_plot_execution_flow['title'], - series=self._report_plot_execution_flow['series'], - iteration=0, figure=fig) + title=self._report_plot_execution_flow["title"], + series=self._report_plot_execution_flow["series"], + iteration=0, + figure=fig, + ) # report detailed table self._task.get_logger().report_table( - title=self._report_plot_execution_details['title'], - series=self._report_plot_execution_details['series'], - iteration=0, table_plot=table_values) + title=self._report_plot_execution_details["title"], + series=self._report_plot_execution_details["series"], + iteration=0, + table_plot=table_values, + ) def _build_table_report(self, node_params, visited): # type: (List, List) -> List[List] @@ -2863,14 +2933,16 @@ class PipelineController(object): :return: Table as a List of a List of strings (cell) """ - task_link_template = self._task.get_output_log_web_page() \ - .replace('/{}/'.format(self._task.project), '/{project}/') \ - .replace('/{}/'.format(self._task.id), '/{task}/') + task_link_template = ( + self._task.get_output_log_web_page() + .replace("/{}/".format(self._task.project), "/{project}/") + .replace("/{}/".format(self._task.id), "/{task}/") + ) table_values = [["Pipeline Step", "Task ID", "Task Name", "Status", "Parameters"]] for name, param in zip(visited, node_params): - param_str = str(param) if param else '' + param_str = str(param) if param else "" if len(param_str) > 3: # remove {} from string param_str = param_str[1:-1] @@ -2878,14 +2950,17 @@ class PipelineController(object): step_name = name if self._nodes[name].base_task_id: step_name += '\n[ {} ]'.format( - task_link_template.format(project='*', task=self._nodes[name].base_task_id), 'base task') + task_link_template.format(project="*", task=self._nodes[name].base_task_id), "base task" + ) table_values.append( - [step_name, - self.__create_task_link(self._nodes[name], task_link_template), - self._nodes[name].job.task.name if self._nodes[name].job else '', - str(self._nodes[name].status or ""), - param_str] + [ + step_name, + self.__create_task_link(self._nodes[name], task_link_template), + self._nodes[name].job.task.name if self._nodes[name].job else "", + str(self._nodes[name].status or ""), + param_str, + ] ) return table_values @@ -3010,9 +3085,11 @@ class PipelineController(object): # type: () -> () pipeline_dag = self._serialize() self._task.upload_artifact( - name=self._state_artifact_name, artifact_object='', + name=self._state_artifact_name, + artifact_object="", metadata=dict(pipeline=hash_dict(pipeline_dag)), - preview=json.dumps(pipeline_dag, indent=1)) + preview=json.dumps(pipeline_dag, indent=1), + ) def _force_task_configuration_update(self): # type: () -> () @@ -3020,9 +3097,12 @@ class PipelineController(object): if self._task: # noinspection PyProtectedMember self._task._set_configuration( - name=self._config_section, config_type='dictionary', + name=self._config_section, + config_type="dictionary", description="pipeline state: {}".format(hash_dict(pipeline_dag)), - config_text=json.dumps(pipeline_dag, indent=2), force=True) + config_text=json.dumps(pipeline_dag, indent=2), + force=True, + ) def _update_progress(self): # type: () -> () @@ -3111,11 +3191,11 @@ class PipelineController(object): # nothing changed, we can sleep if not completed_jobs and self._running_nodes: # force updating the pipeline state (plot) at least every 5 min. - if force_execution_plot_update or time()-last_plot_report > self._update_execution_plot_interval: + if force_execution_plot_update or time() - last_plot_report > self._update_execution_plot_interval: last_plot_report = time() last_monitor_report = time() self.update_execution_plot() - elif time()-last_monitor_report > self._monitor_node_interval: + elif time() - last_monitor_report > self._monitor_node_interval: last_monitor_report = time() self._scan_monitored_nodes() continue @@ -3133,8 +3213,11 @@ class PipelineController(object): # check if we need to stop the pipeline, and abort all running steps if nodes_failed_stop_pipeline: - print('Aborting pipeline and stopping all running steps, node {} failed'.format( - nodes_failed_stop_pipeline)) + print( + "Aborting pipeline and stopping all running steps, node {} failed".format( + nodes_failed_stop_pipeline + ) + ) break # Pull the next jobs in the pipeline, based on the completed list @@ -3148,26 +3231,30 @@ class PipelineController(object): next_nodes.append(node.name) # update the execution graph - print('Launching the next {} steps'.format(len(next_nodes))) - node_launch_success = launch_thread_pool.map( - self._launch_node, [self._nodes[name] for name in next_nodes]) + print("Launching the next {} steps".format(len(next_nodes))) + node_launch_success = launch_thread_pool.map(self._launch_node, [self._nodes[name] for name in next_nodes]) for name, success in zip(next_nodes, node_launch_success): if success and not self._nodes[name].skip_job: if self._nodes[name].job and self._nodes[name].job.task_parameter_override is not None: self._nodes[name].job.task_parameter_override.update(self._nodes[name].parameters or {}) - print('Launching step: {}'.format(name)) - print('Parameters:\n{}'.format( - self._nodes[name].job.task_parameter_override if self._nodes[name].job - else self._nodes[name].parameters)) - print('Configurations:\n{}'.format(self._nodes[name].configurations)) - print('Overrides:\n{}'.format(self._nodes[name].task_overrides)) + print("Launching step: {}".format(name)) + print( + "Parameters:\n{}".format( + self._nodes[name].job.task_parameter_override + if self._nodes[name].job + else self._nodes[name].parameters + ) + ) + print("Configurations:\n{}".format(self._nodes[name].configurations)) + print("Overrides:\n{}".format(self._nodes[name].task_overrides)) launched_nodes.add(name) # check if node is cached do not wait for event but run the loop again if self._nodes[name].executed: pooling_counter = 0 else: - getLogger('clearml.automation.controller').warning( - 'Skipping launching step \'{}\': {}'.format(name, self._nodes[name])) + getLogger("clearml.automation.controller").warning( + "Skipping launching step '{}': {}".format(name, self._nodes[name]) + ) # update current state (in configuration, so that we could later continue an aborted pipeline) # visualize pipeline state (plot) @@ -3246,9 +3333,9 @@ class PipelineController(object): def _verify_node_name(self, name): # type: (str) -> None if name in self._nodes: - raise ValueError('Node named \'{}\' already exists in the pipeline dag'.format(name)) + raise ValueError("Node named '{}' already exists in the pipeline dag".format(name)) if name in self._reserved_pipeline_names: - raise ValueError('Node named \'{}\' is a reserved keyword, use a different name'.format(name)) + raise ValueError("Node named '{}' is a reserved keyword, use a different name".format(name)) def _scan_monitored_nodes(self): # type: () -> None @@ -3272,7 +3359,7 @@ class PipelineController(object): self._monitored_nodes[node.name] = {} # if we are done with this node, skip it - if self._monitored_nodes[node.name].get('completed'): + if self._monitored_nodes[node.name].get("completed"): return if node.job and node.job.task: @@ -3284,14 +3371,14 @@ class PipelineController(object): # update the metrics if node.monitor_metrics: - metrics_state = self._monitored_nodes[node.name].get('metrics', {}) + metrics_state = self._monitored_nodes[node.name].get("metrics", {}) logger = self._task.get_logger() - scalars = task.get_reported_scalars(x_axis='iter') + scalars = task.get_reported_scalars(x_axis="iter") for (s_title, s_series), (t_title, t_series) in node.monitor_metrics: values = scalars.get(s_title, {}).get(s_series) - if values and values.get('x') is not None and values.get('y') is not None: - x = values['x'][-1] - y = values['y'][-1] + if values and values.get("x") is not None and values.get("y") is not None: + x = values["x"][-1] + y = values["y"][-1] last_y = metrics_state.get(s_title, {}).get(s_series) if last_y is None or y > last_y: logger.report_scalar(title=t_title, series=t_series, value=y, iteration=int(x)) @@ -3300,7 +3387,7 @@ class PipelineController(object): metrics_state[s_title] = {} metrics_state[s_title][s_series] = last_y - self._monitored_nodes[node.name]['metrics'] = metrics_state + self._monitored_nodes[node.name]["metrics"] = metrics_state if node.monitor_artifacts: task.reload() @@ -3350,7 +3437,7 @@ class PipelineController(object): # update the state (so that we do not scan the node twice) if node.job.is_stopped(aborted_nonresponsive_as_running=True): - self._monitored_nodes[node.name]['completed'] = True + self._monitored_nodes[node.name]["completed"] = True def _get_target_project(self, return_project_id=False): # type: (bool) -> str @@ -3362,19 +3449,19 @@ class PipelineController(object): :return: project id/name (None if not valid) """ if not self._target_project: - return '' + return "" - if str(self._target_project).lower().strip() == 'true': + if str(self._target_project).lower().strip() == "true": if not self._task: - return '' + return "" return self._task.project if return_project_id else self._task.get_project_name() if not return_project_id: return self._target_project return get_or_create_project( - session=self._task.session if self._task else Task.default_session, - project_name=self._target_project) + session=self._task.session if self._task else Task.default_session, project_name=self._target_project + ) @classmethod def _add_pipeline_name_run_number(cls, task): @@ -3393,11 +3480,12 @@ class PipelineController(object): prev_pipelines_ids = task.query_tasks( task_name=r"^{}(| #\d+)$".format(task_name), task_filter=dict( - project=[task.project], system_tags=[cls._tag], - order_by=['-created'], + project=[task.project], + system_tags=[cls._tag], + order_by=["-created"], page_size=page_size, fetch_only_first_page=True, - ) + ), ) max_value = len(prev_pipelines_ids) if prev_pipelines_ids else 0 # we hit the limit @@ -3408,21 +3496,22 @@ class PipelineController(object): # we assume we are the latest so let's take a few (last 10) and check the max number last_task_name = task.query_tasks( task_filter=dict(task_ids=prev_pipelines_ids[:10], project=[task.project]), - additional_return_fields=['name'], + additional_return_fields=["name"], ) # type: List[Dict] # let's parse the names pattern = re.compile(r" #(?P\d+)$") - task_parts = [pattern.split(t.get('name') or "", 1) for t in last_task_name] + task_parts = [pattern.split(t.get("name") or "", 1) for t in last_task_name] # find the highest number for parts in task_parts: if len(parts) >= 2: try: - max_value = max(max_value, int(parts[1])+1) + max_value = max(max_value, int(parts[1]) + 1) except (TypeError, ValueError): pass except Exception as ex: - getLogger('clearml.automation.controller').warning( - 'Pipeline auto run increment failed (skipping): {}'.format(ex)) + getLogger("clearml.automation.controller").warning( + "Pipeline auto run increment failed (skipping): {}".format(ex) + ) max_value = 0 if max_value > 1: @@ -3459,7 +3548,7 @@ class PipelineController(object): :param str step_ref_string: For example ``"${step1.parameters.Args/param}"`` :return: If step reference is used, return the pipeline step name, otherwise return None """ - parts = step_ref_string[2:-1].split('.') + parts = step_ref_string[2:-1].split(".") v = step_ref_string if len(parts) < 2: raise ValueError("Node '{}', parameter '{}' is invalid".format(node.name, v)) @@ -3474,30 +3563,33 @@ class PipelineController(object): if prev_step not in self._nodes: raise ValueError("Node '{}', parameter '{}', step name '{}' is invalid".format(node.name, v, prev_step)) - if input_type not in ('artifacts', 'parameters', 'models', 'id'): - raise ValueError( - "Node {}, parameter '{}', input type '{}' is invalid".format(node.name, v, input_type)) + if input_type not in ("artifacts", "parameters", "models", "id"): + raise ValueError("Node {}, parameter '{}', input type '{}' is invalid".format(node.name, v, input_type)) - if input_type != 'id' and len(parts) < 3: + if input_type != "id" and len(parts) < 3: raise ValueError("Node '{}', parameter '{}' is invalid".format(node.name, v)) - if input_type == 'models': + if input_type == "models": try: model_type = parts[2].lower() except Exception: raise ValueError( "Node '{}', parameter '{}', input type '{}', model_type is missing {}".format( - node.name, v, input_type, parts)) - if model_type not in ('input', 'output'): + node.name, v, input_type, parts + ) + ) + if model_type not in ("input", "output"): raise ValueError( "Node '{}', parameter '{}', input type '{}', " - "model_type is invalid (input/output) found {}".format( - node.name, v, input_type, model_type)) + "model_type is invalid (input/output) found {}".format(node.name, v, input_type, model_type) + ) if len(parts) < 4: raise ValueError( "Node '{}', parameter '{}', input type '{}', model index is missing".format( - node.name, v, input_type)) + node.name, v, input_type + ) + ) # check casting try: @@ -3505,17 +3597,23 @@ class PipelineController(object): except Exception: raise ValueError( "Node '{}', parameter '{}', input type '{}', model index is missing {}".format( - node.name, v, input_type, parts)) + node.name, v, input_type, parts + ) + ) if len(parts) < 5: raise ValueError( "Node '{}', parameter '{}', input type '{}', model property is missing".format( - node.name, v, input_type)) + node.name, v, input_type + ) + ) if not hasattr(BaseModel, parts[4]): raise ValueError( "Node '{}', parameter '{}', input type '{}', model property is invalid {}".format( - node.name, v, input_type, parts[4])) + node.name, v, input_type, parts[4] + ) + ) return prev_step def __parse_step_reference(self, step_ref_string): @@ -3524,7 +3622,7 @@ class PipelineController(object): :param step_ref_string: reference string of the form ${step_name.type.value}" :return: str with value """ - parts = step_ref_string[2:-1].split('.') + parts = step_ref_string[2:-1].split(".") if len(parts) < 2: raise ValueError("Could not parse reference '{}'".format(step_ref_string)) prev_step = parts[0] @@ -3533,72 +3631,101 @@ class PipelineController(object): # check if we reference the pipeline arguments themselves if prev_step == self._pipeline_step_ref: if parts[1] not in self._pipeline_args: - raise ValueError("Could not parse reference '{}', " - "pipeline argument '{}' could not be found".format(step_ref_string, parts[1])) + raise ValueError( + "Could not parse reference '{}', " + "pipeline argument '{}' could not be found".format(step_ref_string, parts[1]) + ) return self._pipeline_args[parts[1]] if prev_step not in self._nodes or ( - not self._nodes[prev_step].job and - not self._nodes[prev_step].executed and - not self._nodes[prev_step].base_task_id + not self._nodes[prev_step].job + and not self._nodes[prev_step].executed + and not self._nodes[prev_step].base_task_id ): - raise ValueError("Could not parse reference '{}', step '{}' could not be found".format( - step_ref_string, prev_step)) + raise ValueError( + "Could not parse reference '{}', step '{}' could not be found".format(step_ref_string, prev_step) + ) if input_type not in ( - 'artifacts', 'parameters', 'models', 'id', - 'script', 'execution', 'container', 'output', - 'comment', 'models', 'tags', 'system_tags', 'project'): + "artifacts", + "parameters", + "models", + "id", + "script", + "execution", + "container", + "output", + "comment", + "models", + "tags", + "system_tags", + "project", + ): raise ValueError("Could not parse reference '{}', type '{}' not valid".format(step_ref_string, input_type)) - if input_type != 'id' and len(parts) < 3: + if input_type != "id" and len(parts) < 3: raise ValueError("Could not parse reference '{}', missing fields in '{}'".format(step_ref_string, parts)) - task = self._nodes[prev_step].job.task if self._nodes[prev_step].job \ + task = ( + self._nodes[prev_step].job.task + if self._nodes[prev_step].job else Task.get_task(task_id=self._nodes[prev_step].executed or self._nodes[prev_step].base_task_id) + ) task.reload() - if input_type == 'artifacts': + if input_type == "artifacts": # fix \. to use . in artifacts - artifact_path = ('.'.join(parts[2:])).replace('\\.', '\\_dot_\\') - artifact_path = artifact_path.split('.') + artifact_path = (".".join(parts[2:])).replace("\\.", "\\_dot_\\") + artifact_path = artifact_path.split(".") obj = task.artifacts for p in artifact_path: - p = p.replace('\\_dot_\\', '.') + p = p.replace("\\_dot_\\", ".") if isinstance(obj, dict): obj = obj.get(p) elif hasattr(obj, p): obj = getattr(obj, p) else: - raise ValueError("Could not locate artifact {} on previous step {}".format( - '.'.join(parts[1:]), prev_step)) + raise ValueError( + "Could not locate artifact {} on previous step {}".format(".".join(parts[1:]), prev_step) + ) return str(obj) - elif input_type == 'parameters': + elif input_type == "parameters": step_params = task.get_parameters() - param_name = '.'.join(parts[2:]) + param_name = ".".join(parts[2:]) if param_name not in step_params: - raise ValueError("Could not locate parameter {} on previous step {}".format( - '.'.join(parts[1:]), prev_step)) + raise ValueError( + "Could not locate parameter {} on previous step {}".format(".".join(parts[1:]), prev_step) + ) return step_params.get(param_name) - elif input_type == 'models': + elif input_type == "models": model_type = parts[2].lower() - if model_type not in ('input', 'output'): - raise ValueError("Could not locate model {} on previous step {}".format( - '.'.join(parts[1:]), prev_step)) + if model_type not in ("input", "output"): + raise ValueError("Could not locate model {} on previous step {}".format(".".join(parts[1:]), prev_step)) try: model_idx = int(parts[3]) model = task.models[model_type][model_idx] except Exception: - raise ValueError("Could not locate model {} on previous step {}, index {} is invalid".format( - '.'.join(parts[1:]), prev_step, parts[3])) + raise ValueError( + "Could not locate model {} on previous step {}, index {} is invalid".format( + ".".join(parts[1:]), prev_step, parts[3] + ) + ) return str(getattr(model, parts[4])) - elif input_type == 'id': + elif input_type == "id": return task.id elif input_type in ( - 'script', 'execution', 'container', 'output', - 'comment', 'models', 'tags', 'system_tags', 'project'): + "script", + "execution", + "container", + "output", + "comment", + "models", + "tags", + "system_tags", + "project", + ): # noinspection PyProtectedMember - return task._get_task_property('.'.join(parts[1:])) + return task._get_task_property(".".join(parts[1:])) return None @@ -3606,7 +3733,7 @@ class PipelineController(object): def __create_task_link(cls, a_node, task_link_template): # type: (PipelineController.Node, str) -> str if not a_node: - return '' + return "" # create the detailed parameter table task_id = project_id = None if a_node.job: @@ -3621,11 +3748,11 @@ class PipelineController(object): try: project_id = Task.get_task(task_id=task_id).project except Exception: - project_id = '*' + project_id = "*" cls._task_project_lookup[task_id] = project_id if not task_id: - return '' + return "" return ' {} '.format(task_link_template.format(project=project_id, task=task_id), task_id) @@ -3638,10 +3765,9 @@ class PipelineController(object): artifact_object=artifact_object, wait_on_upload=True, extension_name=( - ".pkl" if isinstance(artifact_object, dict) and not self._artifact_serialization_function - else None + ".pkl" if isinstance(artifact_object, dict) and not self._artifact_serialization_function else None ), - serialization_function=self._artifact_serialization_function + serialization_function=self._artifact_serialization_function, ) @@ -3649,7 +3775,7 @@ class PipelineDecorator(PipelineController): _added_decorator = [] # type: List[dict] _ref_lazy_loader_id_to_node_name = {} # type: dict _singleton = None # type: Optional[PipelineDecorator] - _eager_step_artifact = 'eager_step' + _eager_step_artifact = "eager_step" _eager_execution_instance = False _debug_execute_step_process = False _debug_execute_step_function = False @@ -3659,29 +3785,29 @@ class PipelineDecorator(PipelineController): _atexit_registered = False def __init__( - self, - name, # type: str - project, # type: str - version=None, # type: Optional[str] - pool_frequency=0.2, # type: float - add_pipeline_tags=False, # type: bool - target_project=None, # type: Optional[str] - abort_on_failure=False, # type: bool - add_run_number=True, # type: bool - retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa - docker=None, # type: Optional[str] - docker_args=None, # type: Optional[str] - docker_bash_setup_script=None, # type: Optional[str] - packages=None, # type: Optional[Union[bool, str, Sequence[str]]] - repo=None, # type: Optional[str] - repo_branch=None, # type: Optional[str] - repo_commit=None, # type: Optional[str] - artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]] - artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]] - output_uri=None, # type: Optional[Union[str, bool]] - skip_global_imports=False, # type: bool - working_dir=None, # type: Optional[str] - enable_local_imports=True # type: bool + self, + name, # type: str + project, # type: str + version=None, # type: Optional[str] + pool_frequency=0.2, # type: float + add_pipeline_tags=False, # type: bool + target_project=None, # type: Optional[str] + abort_on_failure=False, # type: bool + add_run_number=True, # type: bool + retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa + docker=None, # type: Optional[str] + docker_args=None, # type: Optional[str] + docker_bash_setup_script=None, # type: Optional[str] + packages=None, # type: Optional[Union[bool, str, Sequence[str]]] + repo=None, # type: Optional[str] + repo_branch=None, # type: Optional[str] + repo_commit=None, # type: Optional[str] + artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]] + artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]] + output_uri=None, # type: Optional[Union[str, bool]] + skip_global_imports=False, # type: bool + working_dir=None, # type: Optional[str] + enable_local_imports=True, # type: bool ): # type: (...) -> () """ @@ -3798,15 +3924,14 @@ class PipelineDecorator(PipelineController): output_uri=output_uri, skip_global_imports=skip_global_imports, working_dir=working_dir, - enable_local_imports=enable_local_imports + enable_local_imports=enable_local_imports, ) # if we are in eager execution, make sure parent class knows it if self._eager_execution_instance: self._mock_execution = True if PipelineDecorator._default_execution_queue: - super(PipelineDecorator, self).set_default_execution_queue( - PipelineDecorator._default_execution_queue) + super(PipelineDecorator, self).set_default_execution_queue(PipelineDecorator._default_execution_queue) for n in self._added_decorator: self._add_function_step(**n) @@ -3891,11 +4016,11 @@ class PipelineDecorator(PipelineController): # nothing changed, we can sleep if not completed_jobs and self._running_nodes: # force updating the pipeline state (plot) at least every 5 min. - if force_execution_plot_update or time()-last_plot_report > self._update_execution_plot_interval: + if force_execution_plot_update or time() - last_plot_report > self._update_execution_plot_interval: last_plot_report = time() last_monitor_report = time() self.update_execution_plot() - elif time()-last_monitor_report > self._monitor_node_interval: + elif time() - last_monitor_report > self._monitor_node_interval: last_monitor_report = time() self._scan_monitored_nodes() continue @@ -3913,8 +4038,11 @@ class PipelineDecorator(PipelineController): # check if we need to stop the pipeline, and abort all running steps if nodes_failed_stop_pipeline: - print('Aborting pipeline and stopping all running steps, node {} failed'.format( - nodes_failed_stop_pipeline)) + print( + "Aborting pipeline and stopping all running steps, node {} failed".format( + nodes_failed_stop_pipeline + ) + ) break # update current state (in configuration, so that we could later continue an aborted pipeline) @@ -3971,14 +4099,14 @@ class PipelineDecorator(PipelineController): # check if we have a new step on the DAG eager_artifacts = [] for a in artifacts: - if a.key and a.key.startswith('{}:'.format(self._eager_step_artifact)): + if a.key and a.key.startswith("{}:".format(self._eager_step_artifact)): # expected value: '"eager_step":"parent-node-task-id":"eager-step-task-id' eager_artifacts.append(a) # verify we have the step, if we do not, add it. delete_artifact_keys = [] for artifact in eager_artifacts: - _, parent_step_task_id, eager_step_task_id = artifact.key.split(':', 2) + _, parent_step_task_id, eager_step_task_id = artifact.key.split(":", 2) # deserialize node definition eager_node_def = json.loads(artifact.type_data.preview) @@ -3998,15 +4126,15 @@ class PipelineDecorator(PipelineController): # should not happen continue - new_step_node_name = '{}_{}'.format(parent_node.name, eager_node_name) + new_step_node_name = "{}_{}".format(parent_node.name, eager_node_name) counter = 1 while new_step_node_name in self._nodes: - new_step_node_name = '{}_{}'.format(new_step_node_name, counter) + new_step_node_name = "{}_{}".format(new_step_node_name, counter) counter += 1 - eager_node_def['name'] = new_step_node_name - eager_node_def['parents'] = [parent_node.name] - is_cached = eager_node_def.pop('is_cached', None) + eager_node_def["name"] = new_step_node_name + eager_node_def["parents"] = [parent_node.name] + is_cached = eager_node_def.pop("is_cached", None) self._nodes[new_step_node_name] = self.Node(**eager_node_def) self._nodes[new_step_node_name].job = RunningJob(existing_task=eager_step_task_id) if is_cached: @@ -4041,12 +4169,12 @@ class PipelineDecorator(PipelineController): commit, helper_functions, output_uri=None, - working_dir=None + working_dir=None, ): def sanitize(function_source): matched = re.match(r"[\s]*@[\w]*.component[\s\\]*\(", function_source) if matched: - function_source = function_source[matched.span()[1]:] + function_source = function_source[matched.span()[1] :] # find the last ")" open_parenthesis = 0 last_index = -1 @@ -4059,7 +4187,7 @@ class PipelineDecorator(PipelineController): elif c == "(": open_parenthesis += 1 if last_index >= 0: - function_source = function_source[last_index + 1:].lstrip() + function_source = function_source[last_index + 1 :].lstrip() return function_source task_definition = CreateFromFunction.create_task_from_function( @@ -4087,7 +4215,7 @@ class PipelineDecorator(PipelineController): artifact_serialization_function=self._artifact_serialization_function, artifact_deserialization_function=self._artifact_deserialization_function, skip_global_imports=self._skip_global_imports, - working_dir=working_dir + working_dir=working_dir, ) return task_definition @@ -4105,15 +4233,16 @@ class PipelineDecorator(PipelineController): :param task_hash: Task representation dict :return: Adjusted Task representation dict """ - if task_hash.get('hyper_params'): + if task_hash.get("hyper_params"): updated_params = {} - for k, v in task_hash['hyper_params'].items(): - if k.startswith("{}/".format(CreateFromFunction.input_artifact_section)) and \ - str(v).startswith("{}.".format(self._task.id)): + for k, v in task_hash["hyper_params"].items(): + if k.startswith("{}/".format(CreateFromFunction.input_artifact_section)) and str(v).startswith( + "{}.".format(self._task.id) + ): task_id, artifact_name = str(v).split(".", 1) if artifact_name in self._task.artifacts: updated_params[k] = self._task.artifacts[artifact_name].hash - task_hash['hyper_params'].update(updated_params) + task_hash["hyper_params"].update(updated_params) return task_hash @@ -4139,37 +4268,38 @@ class PipelineDecorator(PipelineController): @classmethod def component( - cls, - _func=None, *, - return_values=('return_object', ), # type: Union[str, Sequence[str]] - name=None, # type: Optional[str] - cache=False, # type: bool - packages=None, # type: Optional[Union[bool, str, Sequence[str]]] - parents=None, # type: Optional[List[str]] - execution_queue=None, # type: Optional[str] - continue_on_fail=False, # type: bool - docker=None, # type: Optional[str] - docker_args=None, # type: Optional[str] - docker_bash_setup_script=None, # type: Optional[str] - task_type=None, # type: Optional[str] - auto_connect_frameworks=None, # type: Optional[dict] - auto_connect_arg_parser=None, # type: Optional[dict] - repo=None, # type: Optional[str] - repo_branch=None, # type: Optional[str] - repo_commit=None, # type: Optional[str] - helper_functions=None, # type: Optional[Sequence[Callable]] - monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]] - monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]] - monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]] - retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa - pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa - post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa - status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa - tags=None, # type: Optional[Union[str, Sequence[str]]] - output_uri=None, # type: Optional[Union[str, bool]] - draft=False, # type: Optional[bool] - working_dir=None, # type: Optional[str] - continue_behaviour=None # type: Optional[dict] + cls, + _func=None, + *, + return_values=("return_object",), # type: Union[str, Sequence[str]] + name=None, # type: Optional[str] + cache=False, # type: bool + packages=None, # type: Optional[Union[bool, str, Sequence[str]]] + parents=None, # type: Optional[List[str]] + execution_queue=None, # type: Optional[str] + continue_on_fail=False, # type: bool + docker=None, # type: Optional[str] + docker_args=None, # type: Optional[str] + docker_bash_setup_script=None, # type: Optional[str] + task_type=None, # type: Optional[str] + auto_connect_frameworks=None, # type: Optional[dict] + auto_connect_arg_parser=None, # type: Optional[dict] + repo=None, # type: Optional[str] + repo_branch=None, # type: Optional[str] + repo_commit=None, # type: Optional[str] + helper_functions=None, # type: Optional[Sequence[Callable]] + monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]] + monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]] + monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]] + retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa + pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa + post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa + status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa + tags=None, # type: Optional[Union[str, Sequence[str]]] + output_uri=None, # type: Optional[Union[str, bool]] + draft=False, # type: Optional[bool] + working_dir=None, # type: Optional[str] + continue_behaviour=None # type: Optional[dict] ): # type: (...) -> Callable """ @@ -4330,6 +4460,7 @@ class PipelineDecorator(PipelineController): :return: function wrapper """ + def decorator_wrap(func): if continue_on_fail: warnings.warn("`continue_on_fail` is deprecated. Use `continue_behaviour` instead", DeprecationWarning) @@ -4343,7 +4474,7 @@ class PipelineDecorator(PipelineController): # add default argument values if inspect_func.args: default_values = list(inspect_func.defaults or []) - default_values = ([None] * (len(inspect_func.args)-len(default_values))) + default_values + default_values = ([None] * (len(inspect_func.args) - len(default_values))) + default_values function_kwargs = {k: v for k, v in zip(inspect_func.args, default_values)} else: function_kwargs = dict() @@ -4378,7 +4509,7 @@ class PipelineDecorator(PipelineController): output_uri=output_uri, draft=draft, working_dir=working_dir, - continue_behaviour=continue_behaviour + continue_behaviour=continue_behaviour, ) if cls._singleton: @@ -4390,9 +4521,11 @@ class PipelineDecorator(PipelineController): def wrapper(*args, **kwargs): if cls._debug_execute_step_function: args = walk_nested_dict_tuple_list( - args, lambda x: x._remoteref() if isinstance(x, LazyEvalWrapper) else x) + args, lambda x: x._remoteref() if isinstance(x, LazyEvalWrapper) else x + ) kwargs = walk_nested_dict_tuple_list( - kwargs, lambda x: x._remoteref() if isinstance(x, LazyEvalWrapper) else x) + kwargs, lambda x: x._remoteref() if isinstance(x, LazyEvalWrapper) else x + ) func_return = [] @@ -4405,14 +4538,18 @@ class PipelineDecorator(PipelineController): if len(function_return) == 1: ret_val = LazyEvalWrapper( callback=functools.partial(result_wrapper, func_return, None), - remote_reference=functools.partial(result_wrapper, func_return, None)) + remote_reference=functools.partial(result_wrapper, func_return, None), + ) cls._ref_lazy_loader_id_to_node_name[id(ret_val)] = _name return ret_val else: - return_w = [LazyEvalWrapper( - callback=functools.partial(result_wrapper, func_return, i), - remote_reference=functools.partial(result_wrapper, func_return, i)) - for i, _ in enumerate(function_return)] + return_w = [ + LazyEvalWrapper( + callback=functools.partial(result_wrapper, func_return, i), + remote_reference=functools.partial(result_wrapper, func_return, i), + ) + for i, _ in enumerate(function_return) + ] for i in return_w: cls._ref_lazy_loader_id_to_node_name[id(i)] = _name return return_w @@ -4430,8 +4567,7 @@ class PipelineDecorator(PipelineController): kwargs_artifacts.update( { k: walk_nested_dict_tuple_list( - v, - lambda x: x._remoteref() if isinstance(x, LazyEvalWrapper) else x + v, lambda x: x._remoteref() if isinstance(x, LazyEvalWrapper) else x ) for k, v in kwargs.items() if isinstance(v, LazyEvalWrapper) @@ -4448,22 +4584,22 @@ class PipelineDecorator(PipelineController): PipelineDecorator._eager_execution_instance = True a_pipeline = PipelineDecorator( name=name, - project='DevOps', # it will not actually be used - version='0.0.0', + project="DevOps", # it will not actually be used + version="0.0.0", pool_frequency=111, add_pipeline_tags=False, target_project=None, ) - target_queue = \ - PipelineDecorator._default_execution_queue or \ - Task.current_task().data.execution.queue + target_queue = ( + PipelineDecorator._default_execution_queue or Task.current_task().data.execution.queue + ) if target_queue: PipelineDecorator.set_default_execution_queue(target_queue) else: # if we are not running from a queue, we are probably in debug mode a_pipeline._clearml_job_class = LocalClearmlJob - a_pipeline._default_execution_queue = 'mock' + a_pipeline._default_execution_queue = "mock" # restore tags, the pipeline might add a few Task.current_task().set_tags(original_tags[0]) @@ -4487,9 +4623,9 @@ class PipelineDecorator(PipelineController): # Note that for the first iteration (when `_node.name == _node_name`) # we always increment the name, as the name is always in `_launched_step_names` while _node.name in cls._singleton._launched_step_names or ( - _node.name in cls._singleton._nodes and - cls._singleton._nodes[_node.name].job_code_section != - cls._singleton._nodes[_node_name].job_code_section + _node.name in cls._singleton._nodes + and cls._singleton._nodes[_node.name].job_code_section + != cls._singleton._nodes[_node_name].job_code_section ): _node.name = "{}_{}".format(_node_name, counter) counter += 1 @@ -4497,9 +4633,13 @@ class PipelineDecorator(PipelineController): if cls._singleton._pre_step_callbacks.get(_node_name): cls._singleton._pre_step_callbacks[_node.name] = cls._singleton._pre_step_callbacks[_node_name] if cls._singleton._post_step_callbacks.get(_node_name): - cls._singleton._post_step_callbacks[_node.name] = cls._singleton._post_step_callbacks[_node_name] + cls._singleton._post_step_callbacks[_node.name] = cls._singleton._post_step_callbacks[ + _node_name + ] if cls._singleton._status_change_callbacks.get(_node_name): - cls._singleton._status_change_callbacks[_node.name] = cls._singleton._status_change_callbacks[_node_name] + cls._singleton._status_change_callbacks[_node.name] = cls._singleton._status_change_callbacks[ + _node_name + ] _node_name = _node.name if _node.name not in cls._singleton._nodes: cls._singleton._nodes[_node.name] = _node @@ -4508,14 +4648,23 @@ class PipelineDecorator(PipelineController): cls._singleton._launched_step_names.add(_node_name) _node = cls._singleton._nodes[_node_name] cls._retries[_node_name] = 0 - cls._retries_callbacks[_node_name] = retry_on_failure if callable(retry_on_failure) else \ - (functools.partial(cls._singleton._default_retry_on_failure_callback, max_retries=retry_on_failure) - if isinstance(retry_on_failure, int) else cls._singleton._retry_on_failure_callback) + cls._retries_callbacks[_node_name] = ( + retry_on_failure + if callable(retry_on_failure) + else ( + functools.partial( + cls._singleton._default_retry_on_failure_callback, max_retries=retry_on_failure + ) + if isinstance(retry_on_failure, int) + else cls._singleton._retry_on_failure_callback + ) + ) # The actual launch is a bit slow, we run it in the background launch_thread = Thread( target=cls._component_launch, - args=(_node_name, _node, kwargs_artifacts, kwargs, current_thread().ident)) + args=(_node_name, _node, kwargs_artifacts, kwargs, current_thread().ident), + ) def results_reference(return_name): # wait until launch is completed @@ -4533,7 +4682,8 @@ class PipelineDecorator(PipelineController): if _node.job.is_failed() and not _node.continue_on_fail: raise ValueError( - 'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id())) + 'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id()) + ) _node.executed = _node.job.task_id() return "{}.{}".format(_node.job.task_id(), return_name) @@ -4551,8 +4701,9 @@ class PipelineDecorator(PipelineController): return None cls._wait_for_node(_node) - if (_node.job.is_failed() and not _node.continue_on_fail) or \ - (_node.job.is_aborted() and not _node.continue_on_abort): + if (_node.job.is_failed() and not _node.continue_on_fail) or ( + _node.job.is_aborted() and not _node.continue_on_abort + ): raise ValueError( 'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id()) ) @@ -4574,9 +4725,13 @@ class PipelineDecorator(PipelineController): ) return task.get_parameters(cast=True).get(CreateFromFunction.return_section + "/" + return_name) - return_w = [LazyEvalWrapper( - callback=functools.partial(result_wrapper, n), - remote_reference=functools.partial(results_reference, n)) for n in function_return] + return_w = [ + LazyEvalWrapper( + callback=functools.partial(result_wrapper, n), + remote_reference=functools.partial(results_reference, n), + ) + for n in function_return + ] for i in return_w: cls._ref_lazy_loader_id_to_node_name[id(i)] = _node_name @@ -4591,36 +4746,37 @@ class PipelineDecorator(PipelineController): @classmethod def pipeline( - cls, - _func=None, *, # noqa - name, # type: str - project, # type: str - version=None, # type: Optional[str] - return_value=None, # type: Optional[str] - default_queue=None, # type: Optional[str] - pool_frequency=0.2, # type: float - add_pipeline_tags=False, # type: bool - target_project=None, # type: Optional[str] - abort_on_failure=False, # type: bool - pipeline_execution_queue='services', # type: Optional[str] - multi_instance_support=False, # type: bool - add_run_number=True, # type: bool - args_map=None, # type: dict[str, List[str]] - start_controller_locally=False, # type: bool - retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa - docker=None, # type: Optional[str] - docker_args=None, # type: Optional[str] - docker_bash_setup_script=None, # type: Optional[str] - packages=None, # type: Optional[Union[bool, str, Sequence[str]]] - repo=None, # type: Optional[str] - repo_branch=None, # type: Optional[str] - repo_commit=None, # type: Optional[str] - artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]] - artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]] - output_uri=None, # type: Optional[Union[str, bool]] - skip_global_imports=False, # type: bool - working_dir=None, # type: Optional[str] - enable_local_imports=True # type: bool + cls, + _func=None, + *, # noqa + name, # type: str + project, # type: str + version=None, # type: Optional[str] + return_value=None, # type: Optional[str] + default_queue=None, # type: Optional[str] + pool_frequency=0.2, # type: float + add_pipeline_tags=False, # type: bool + target_project=None, # type: Optional[str] + abort_on_failure=False, # type: bool + pipeline_execution_queue="services", # type: Optional[str] + multi_instance_support=False, # type: bool + add_run_number=True, # type: bool + args_map=None, # type: dict[str, List[str]] + start_controller_locally=False, # type: bool + retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa + docker=None, # type: Optional[str] + docker_args=None, # type: Optional[str] + docker_bash_setup_script=None, # type: Optional[str] + packages=None, # type: Optional[Union[bool, str, Sequence[str]]] + repo=None, # type: Optional[str] + repo_branch=None, # type: Optional[str] + repo_commit=None, # type: Optional[str] + artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]] + artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]] + output_uri=None, # type: Optional[Union[str, bool]] + skip_global_imports=False, # type: bool + working_dir=None, # type: Optional[str] + enable_local_imports=True # type: bool ): # type: (...) -> Callable """ @@ -4745,8 +4901,8 @@ class PipelineDecorator(PipelineController): If False, the directory won't be appended to PYTHONPATH. Default is True. Ignored while running remotely. """ - def decorator_wrap(func): + def decorator_wrap(func): def internal_decorator(*args, **kwargs): pipeline_kwargs = dict(**(kwargs or {})) pipeline_kwargs_types = dict() @@ -4766,8 +4922,7 @@ class PipelineDecorator(PipelineController): pipeline_kwargs = default_kwargs if inspect_func.annotations: - pipeline_kwargs_types = { - str(k): inspect_func.annotations[k] for k in inspect_func.annotations} + pipeline_kwargs_types = {str(k): inspect_func.annotations[k] for k in inspect_func.annotations} # run the entire pipeline locally, as python functions if cls._debug_execute_step_function: @@ -4793,7 +4948,7 @@ class PipelineDecorator(PipelineController): output_uri=output_uri, skip_global_imports=skip_global_imports, working_dir=working_dir, - enable_local_imports=enable_local_imports + enable_local_imports=enable_local_imports, ) ret_val = func(**pipeline_kwargs) LazyEvalWrapper.trigger_all_remote_references() @@ -4812,7 +4967,8 @@ class PipelineDecorator(PipelineController): try: # noinspection PyProtectedMember multi_pipeline_call_counter = int( - t._get_runtime_properties().get('multi_pipeline_counter', None)) + t._get_runtime_properties().get("multi_pipeline_counter", None) + ) # NOTICE! if this is not our call we LEAVE immediately # check if this is our call to start, if not we will wait for the next one @@ -4848,34 +5004,36 @@ class PipelineDecorator(PipelineController): output_uri=output_uri, skip_global_imports=skip_global_imports, working_dir=working_dir, - enable_local_imports=enable_local_imports + enable_local_imports=enable_local_imports, ) a_pipeline._args_map = args_map or {} if PipelineDecorator._debug_execute_step_process: a_pipeline._clearml_job_class = LocalClearmlJob - a_pipeline._default_execution_queue = 'mock' + a_pipeline._default_execution_queue = "mock" a_pipeline._clearml_job_class.register_hashing_callback(a_pipeline._adjust_task_hashing) # add pipeline arguments for k in pipeline_kwargs: a_pipeline.add_parameter( - name=k, - default=pipeline_kwargs.get(k), - param_type=pipeline_kwargs_types.get(k) + name=k, default=pipeline_kwargs.get(k), param_type=pipeline_kwargs_types.get(k) ) # sync multi-pipeline call counter (so we know which one to skip) if Task.running_locally() and multi_instance_support and cls._multi_pipeline_call_counter >= 0: # noinspection PyProtectedMember a_pipeline._task._set_runtime_properties( - dict(multi_pipeline_counter=str(cls._multi_pipeline_call_counter))) + dict(multi_pipeline_counter=str(cls._multi_pipeline_call_counter)) + ) # run the actual pipeline - if not start_controller_locally and \ - not PipelineDecorator._debug_execute_step_process and pipeline_execution_queue: + if ( + not start_controller_locally + and not PipelineDecorator._debug_execute_step_process + and pipeline_execution_queue + ): # rerun the pipeline on a remote machine a_pipeline._task.execute_remotely(queue_name=pipeline_execution_queue) # when we get here it means we are running remotely @@ -4931,7 +5089,8 @@ class PipelineDecorator(PipelineController): if multi_instance_support: return cls._multi_pipeline_wrapper( - func=internal_decorator, parallel=bool(multi_instance_support == 'parallel')) + func=internal_decorator, parallel=bool(multi_instance_support == "parallel") + ) return internal_decorator @@ -5003,8 +5162,8 @@ class PipelineDecorator(PipelineController): kwargs.pop(k, None) _node.parameters.pop("{}/{}".format(CreateFromFunction.kwargs_section, k), None) _node.parameters["{}/{}".format(CreateFromFunction.input_artifact_section, k)] = v - if v and '.' in str(v): - parent_id, _ = str(v).split('.', 1) + if v and "." in str(v): + parent_id, _ = str(v).split(".", 1) # find parent and push it into the _node.parents for n, node in sorted(list(cls._singleton._nodes.items()), reverse=True): if n != _node.name and node.executed and node.executed == parent_id: @@ -5013,16 +5172,18 @@ class PipelineDecorator(PipelineController): break leaves = cls._singleton._find_executed_node_leaves() - _node.parents = (_node.parents or []) + [ - x for x in cls._evaluated_return_values.get(tid, []) if x in leaves - ] + _node.parents = (_node.parents or []) + [x for x in cls._evaluated_return_values.get(tid, []) if x in leaves] if not cls._singleton._abort_running_steps_on_failure: for parent in _node.parents: parent = cls._singleton._nodes[parent] - if parent.status == "failed" and parent.skip_children_on_fail or \ - parent.status == "aborted" and parent.skip_children_on_abort or \ - parent.status == "skipped": + if ( + parent.status == "failed" + and parent.skip_children_on_fail + or parent.status == "aborted" + and parent.skip_children_on_abort + or parent.status == "skipped" + ): _node.skip_job = True return @@ -5031,10 +5192,11 @@ class PipelineDecorator(PipelineController): _node.parameters["{}/{}".format(CreateFromFunction.kwargs_section, k)] = v else: # we need to create an artifact - artifact_name = 'result_{}_{}'.format(re.sub(r'\W+', '', _node.name), k) + artifact_name = "result_{}_{}".format(re.sub(r"\W+", "", _node.name), k) cls._singleton._upload_pipeline_artifact(artifact_name=artifact_name, artifact_object=v) - _node.parameters["{}/{}".format(CreateFromFunction.input_artifact_section, k)] = \ - "{}.{}".format(cls._singleton._task.id, artifact_name) + _node.parameters["{}/{}".format(CreateFromFunction.input_artifact_section, k)] = "{}.{}".format( + cls._singleton._task.id, artifact_name + ) # verify the new step cls._singleton._verify_node(_node) @@ -5043,8 +5205,7 @@ class PipelineDecorator(PipelineController): # check if we generated the pipeline we need to update the new eager step if PipelineDecorator._eager_execution_instance and _node.job: # check if we need to add the pipeline tag on the new node - pipeline_tags = [t for t in Task.current_task().get_tags() or [] - if str(t).startswith(cls._node_tag_prefix)] + pipeline_tags = [t for t in Task.current_task().get_tags() or [] if str(t).startswith(cls._node_tag_prefix)] if pipeline_tags and _node.job and _node.job.task: pipeline_tags = list(set((_node.job.task.get_tags() or []) + pipeline_tags)) _node.job.task.set_tags(pipeline_tags) @@ -5054,19 +5215,19 @@ class PipelineDecorator(PipelineController): pipeline_dag = cls._singleton._serialize() # check if node is cached if _node.job.is_cached_task(): - pipeline_dag[_node_name]['is_cached'] = True + pipeline_dag[_node_name]["is_cached"] = True # store entire definition on the parent pipeline from clearml.backend_api.services import tasks + artifact = tasks.Artifact( - key='{}:{}:{}'.format(cls._eager_step_artifact, Task.current_task().id, _node.job.task_id()), + key="{}:{}:{}".format(cls._eager_step_artifact, Task.current_task().id, _node.job.task_id()), type="json", - mode='output', + mode="output", type_data=tasks.ArtifactTypeData( - preview=json.dumps({_node_name: pipeline_dag[_node_name]}), - content_type='application/pipeline') + preview=json.dumps({_node_name: pipeline_dag[_node_name]}), content_type="application/pipeline" + ), ) - req = tasks.AddOrUpdateArtifactsRequest( - task=Task.current_task().parent, artifacts=[artifact], force=True) + req = tasks.AddOrUpdateArtifactsRequest(task=Task.current_task().parent, artifacts=[artifact], force=True) res = Task.current_task().send(req, raise_on_errors=False) if not res or not res.response or not res.response.updated: pass @@ -5076,9 +5237,9 @@ class PipelineDecorator(PipelineController): @classmethod def _multi_pipeline_wrapper( - cls, - func=None, # type: Callable - parallel=False, # type: bool + cls, + func=None, # type: Callable + parallel=False, # type: bool ): # type: (...) -> Callable """ @@ -5112,10 +5273,10 @@ class PipelineDecorator(PipelineController): return func(*args, **kwargs) def sanitized_env(a_queue, *a_args, **a_kwargs): - os.environ.pop('CLEARML_PROC_MASTER_ID', None) - os.environ.pop('TRAINS_PROC_MASTER_ID', None) - os.environ.pop('CLEARML_TASK_ID', None) - os.environ.pop('TRAINS_TASK_ID', None) + os.environ.pop("CLEARML_PROC_MASTER_ID", None) + os.environ.pop("TRAINS_PROC_MASTER_ID", None) + os.environ.pop("CLEARML_TASK_ID", None) + os.environ.pop("TRAINS_TASK_ID", None) if Task.current_task(): # noinspection PyProtectedMember Task.current_task()._reset_current_task_obj() @@ -5127,7 +5288,7 @@ class PipelineDecorator(PipelineController): queue = Queue() - p = Process(target=sanitized_env, args=(queue, ) + args, kwargs=kwargs) + p = Process(target=sanitized_env, args=(queue,) + args, kwargs=kwargs) # make sure we wait for the subprocess. p.daemon = False p.start() @@ -5159,7 +5320,7 @@ class PipelineDecorator(PipelineController): results = [] if not cls._multi_pipeline_instances: return results - print('Waiting for background pipelines to finish') + print("Waiting for background pipelines to finish") for p, queue in cls._multi_pipeline_instances: try: p.join() @@ -5180,6 +5341,7 @@ class PipelineDecorator(PipelineController): if not Task.current_task(): return from clearml.backend_api.services import events + res = Task.current_task().send( events.GetTaskPlotsRequest(task=pipeline_task_id, iters=1), raise_on_errors=False, @@ -5189,25 +5351,32 @@ class PipelineDecorator(PipelineController): execution_details = None for p in res.response.plots: try: - if p['metric'] == cls._report_plot_execution_flow['title'] and \ - p['variant'] == cls._report_plot_execution_flow['series']: - execution_flow = json.loads(p['plot_str']) + if ( + p["metric"] == cls._report_plot_execution_flow["title"] + and p["variant"] == cls._report_plot_execution_flow["series"] + ): + execution_flow = json.loads(p["plot_str"]) - elif p['metric'] == cls._report_plot_execution_details['title'] and \ - p['variant'] == cls._report_plot_execution_details['series']: - execution_details = json.loads(p['plot_str']) - execution_details['layout']['name'] += ' - ' + str(pipeline_task_id) + elif ( + p["metric"] == cls._report_plot_execution_details["title"] + and p["variant"] == cls._report_plot_execution_details["series"] + ): + execution_details = json.loads(p["plot_str"]) + execution_details["layout"]["name"] += " - " + str(pipeline_task_id) except Exception as ex: - getLogger('clearml.automation.controller').warning( - 'Multi-pipeline plot update failed: {}'.format(ex)) + getLogger("clearml.automation.controller").warning("Multi-pipeline plot update failed: {}".format(ex)) if execution_flow: Task.current_task().get_logger().report_plotly( - title=cls._report_plot_execution_flow['title'], - series='{} - {}'.format(cls._report_plot_execution_flow['series'], pipeline_task_id), - iteration=0, figure=execution_flow) + title=cls._report_plot_execution_flow["title"], + series="{} - {}".format(cls._report_plot_execution_flow["series"], pipeline_task_id), + iteration=0, + figure=execution_flow, + ) if execution_details: Task.current_task().get_logger().report_plotly( - title=cls._report_plot_execution_details['title'], - series='{} - {}'.format(cls._report_plot_execution_details['series'], pipeline_task_id), - iteration=0, figure=execution_details) + title=cls._report_plot_execution_details["title"], + series="{} - {}".format(cls._report_plot_execution_details["series"], pipeline_task_id), + iteration=0, + figure=execution_details, + )