diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py
index 5d7ecb13..c50d51af 100644
--- a/clearml/automation/controller.py
+++ b/clearml/automation/controller.py
@@ -42,26 +42,27 @@ class PipelineController(object):
The pipeline process (task) itself can be executed manually or by the clearml-agent services queue.
Notice: The pipeline controller lives as long as the pipeline itself is being executed.
"""
- _tag = 'pipeline'
- _project_system_tags = ['pipeline', 'hidden']
- _node_tag_prefix = 'pipe:'
+
+ _tag = "pipeline"
+ _project_system_tags = ["pipeline", "hidden"]
+ _node_tag_prefix = "pipe:"
_step_pattern = r"\${[^}]*}"
- _config_section = 'Pipeline'
- _state_artifact_name = 'pipeline_state'
- _args_section = 'Args'
- _pipeline_section = 'pipeline'
- _pipeline_step_ref = 'pipeline'
- _runtime_property_hash = '_pipeline_hash'
+ _config_section = "Pipeline"
+ _state_artifact_name = "pipeline_state"
+ _args_section = "Args"
+ _pipeline_section = "pipeline"
+ _pipeline_step_ref = "pipeline"
+ _runtime_property_hash = "_pipeline_hash"
_relaunch_status_message = "Relaunching pipeline step..."
- _reserved_pipeline_names = (_pipeline_step_ref, )
+ _reserved_pipeline_names = (_pipeline_step_ref,)
_task_project_lookup = {}
_clearml_job_class = ClearmlJob
- _update_execution_plot_interval = 5.*60
- _update_progress_interval = 10.
- _monitor_node_interval = 5.*60
+ _update_execution_plot_interval = 5.0 * 60
+ _update_progress_interval = 10.0
+ _monitor_node_interval = 5.0 * 60
_pipeline_as_sub_project_cached = None
- _report_plot_execution_flow = dict(title='Pipeline', series='Execution Flow')
- _report_plot_execution_details = dict(title='Pipeline Details', series='Execution Details')
+ _report_plot_execution_flow = dict(title="Pipeline", series="Execution Flow")
+ _report_plot_execution_details = dict(title="Pipeline Details", series="Execution Details")
_evaluated_return_values = {} # TID: pipeline_name
_add_to_evaluated_return_values = {} # TID: bool
_retries = {} # Node.name: int
@@ -173,8 +174,11 @@ class PipelineController(object):
"""
new_copy = PipelineController.Node(
name=self.name,
- **dict((k, deepcopy(v)) for k, v in self.__dict__.items()
- if k not in ('name', 'job', 'executed', 'task_factory_func'))
+ **dict(
+ (k, deepcopy(v))
+ for k, v in self.__dict__.items()
+ if k not in ("name", "job", "executed", "task_factory_func")
+ )
)
new_copy.task_factory_func = self.task_factory_func
return new_copy
@@ -199,31 +203,31 @@ class PipelineController(object):
pass
def __init__(
- self,
- name, # type: str
- project, # type: str
- version=None, # type: Optional[str]
- pool_frequency=0.2, # type: float
- add_pipeline_tags=False, # type: bool
- target_project=True, # type: Optional[Union[str, bool]]
- auto_version_bump=None, # type: Optional[bool]
- abort_on_failure=False, # type: bool
- add_run_number=True, # type: bool
- retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
- docker=None, # type: Optional[str]
- docker_args=None, # type: Optional[str]
- docker_bash_setup_script=None, # type: Optional[str]
- packages=None, # type: Optional[Union[bool, str, Sequence[str]]]
- repo=None, # type: Optional[str]
- repo_branch=None, # type: Optional[str]
- repo_commit=None, # type: Optional[str]
- always_create_from_code=True, # type: bool
- artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]]
- artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
- output_uri=None, # type: Optional[Union[str, bool]]
- skip_global_imports=False, # type: bool
- working_dir=None, # type: Optional[str]
- enable_local_imports=True # type: bool
+ self,
+ name, # type: str
+ project, # type: str
+ version=None, # type: Optional[str]
+ pool_frequency=0.2, # type: float
+ add_pipeline_tags=False, # type: bool
+ target_project=True, # type: Optional[Union[str, bool]]
+ auto_version_bump=None, # type: Optional[bool]
+ abort_on_failure=False, # type: bool
+ add_run_number=True, # type: bool
+ retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
+ docker=None, # type: Optional[str]
+ docker_args=None, # type: Optional[str]
+ docker_bash_setup_script=None, # type: Optional[str]
+ packages=None, # type: Optional[Union[bool, str, Sequence[str]]]
+ repo=None, # type: Optional[str]
+ repo_branch=None, # type: Optional[str]
+ repo_commit=None, # type: Optional[str]
+ always_create_from_code=True, # type: bool
+ artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]]
+ artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
+ output_uri=None, # type: Optional[Union[str, bool]]
+ skip_global_imports=False, # type: bool
+ working_dir=None, # type: Optional[str]
+ enable_local_imports=True, # type: bool
):
# type: (...) -> None
"""
@@ -334,10 +338,8 @@ class PipelineController(object):
self._always_create_from_code = bool(always_create_from_code)
self._version = str(version).strip() if version else None
if self._version and not Version.is_valid_version_string(self._version):
- raise ValueError(
- "Setting non-semantic pipeline version '{}'".format(self._version)
- )
- self._pool_frequency = pool_frequency * 60.
+ raise ValueError("Setting non-semantic pipeline version '{}'".format(self._version))
+ self._pool_frequency = pool_frequency * 60.0
self._thread = None
self._pipeline_args = dict()
self._pipeline_args_desc = dict()
@@ -374,13 +376,13 @@ class PipelineController(object):
task_name=pipeline_project_args["task_name"],
task_type=Task.TaskTypes.controller,
auto_resource_monitoring=False,
- reuse_last_task_id=False
+ reuse_last_task_id=False,
)
# if user disabled the auto-repo, set it back to False (just in case)
if set_force_local_repo:
# noinspection PyProtectedMember
- self._task._wait_for_repo_detection(timeout=300.)
+ self._task._wait_for_repo_detection(timeout=300.0)
Task.force_store_standalone_script(force=False)
self._create_pipeline_projects(
@@ -406,20 +408,23 @@ class PipelineController(object):
self._monitored_nodes = {} # type: Dict[str, dict]
self._abort_running_steps_on_failure = abort_on_failure
self._def_max_retry_on_failure = retry_on_failure if isinstance(retry_on_failure, int) else 0
- self._retry_on_failure_callback = retry_on_failure if callable(retry_on_failure) \
- else self._default_retry_on_failure_callback
+ self._retry_on_failure_callback = (
+ retry_on_failure if callable(retry_on_failure) else self._default_retry_on_failure_callback
+ )
# add direct link to the pipeline page
if self._pipeline_as_sub_project() and self._task:
if add_run_number and self._task.running_locally():
self._add_pipeline_name_run_number(self._task)
# noinspection PyProtectedMember
- self._task.get_logger().report_text('ClearML pipeline page: {}'.format(
- '{}/pipelines/{}/experiments/{}'.format(
- self._task._get_app_server(),
- self._task.project if self._task.project is not None else '*',
- self._task.id,
- ))
+ self._task.get_logger().report_text(
+ "ClearML pipeline page: {}".format(
+ "{}/pipelines/{}/experiments/{}".format(
+ self._task._get_app_server(),
+ self._task.project if self._task.project is not None else "*",
+ self._task.id,
+ )
+ )
)
@classmethod
@@ -445,34 +450,34 @@ class PipelineController(object):
:param float max_execution_minutes: The maximum time (minutes) for the entire pipeline process. The
default is ``None``, indicating no time limit.
"""
- self._pipeline_time_limit = max_execution_minutes * 60. if max_execution_minutes else None
+ self._pipeline_time_limit = max_execution_minutes * 60.0 if max_execution_minutes else None
def add_step(
- self,
- name, # type: str
- base_task_id=None, # type: Optional[str]
- parents=None, # type: Optional[Sequence[str]]
- parameter_override=None, # type: Optional[Mapping[str, Any]]
- configuration_overrides=None, # type: Optional[Mapping[str, Union[str, Mapping]]]
- task_overrides=None, # type: Optional[Mapping[str, Any]]
- execution_queue=None, # type: Optional[str]
- monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]]
- monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
- monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
- time_limit=None, # type: Optional[float]
- base_task_project=None, # type: Optional[str]
- base_task_name=None, # type: Optional[str]
- clone_base_task=True, # type: bool
- continue_on_fail=False, # type: bool
- pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
- post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
- cache_executed_step=False, # type: bool
- base_task_factory=None, # type: Optional[Callable[[PipelineController.Node], Task]]
- retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
- status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
- recursively_parse_parameters=False, # type: bool
- output_uri=None, # type: Optional[Union[str, bool]]
- continue_behaviour=None # type: Optional[dict]
+ self,
+ name, # type: str
+ base_task_id=None, # type: Optional[str]
+ parents=None, # type: Optional[Sequence[str]]
+ parameter_override=None, # type: Optional[Mapping[str, Any]]
+ configuration_overrides=None, # type: Optional[Mapping[str, Union[str, Mapping]]]
+ task_overrides=None, # type: Optional[Mapping[str, Any]]
+ execution_queue=None, # type: Optional[str]
+ monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]]
+ monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
+ monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
+ time_limit=None, # type: Optional[float]
+ base_task_project=None, # type: Optional[str]
+ base_task_name=None, # type: Optional[str]
+ clone_base_task=True, # type: bool
+ continue_on_fail=False, # type: bool
+ pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
+ post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
+ cache_executed_step=False, # type: bool
+ base_task_factory=None, # type: Optional[Callable[[PipelineController.Node], Task]]
+ retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
+ status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
+ recursively_parse_parameters=False, # type: bool
+ output_uri=None, # type: Optional[Union[str, bool]]
+ continue_behaviour=None, # type: Optional[dict]
):
# type: (...) -> bool
"""
@@ -648,40 +653,54 @@ class PipelineController(object):
if not base_task_factory and not base_task_id:
if not base_task_project or not base_task_name:
- raise ValueError('Either base_task_id or base_task_project/base_task_name must be provided')
+ raise ValueError("Either base_task_id or base_task_project/base_task_name must be provided")
base_task = Task.get_task(
project_name=base_task_project,
task_name=base_task_name,
allow_archived=True,
task_filter=dict(
- status=[str(Task.TaskStatusEnum.created), str(Task.TaskStatusEnum.queued),
- str(Task.TaskStatusEnum.in_progress), str(Task.TaskStatusEnum.published),
- str(Task.TaskStatusEnum.stopped), str(Task.TaskStatusEnum.completed),
- str(Task.TaskStatusEnum.closed)],
- )
+ status=[
+ str(Task.TaskStatusEnum.created),
+ str(Task.TaskStatusEnum.queued),
+ str(Task.TaskStatusEnum.in_progress),
+ str(Task.TaskStatusEnum.published),
+ str(Task.TaskStatusEnum.stopped),
+ str(Task.TaskStatusEnum.completed),
+ str(Task.TaskStatusEnum.closed),
+ ],
+ ),
)
if not base_task:
- raise ValueError('Could not find base_task_project={} base_task_name={}'.format(
- base_task_project, base_task_name))
+ raise ValueError(
+ "Could not find base_task_project={} base_task_name={}".format(base_task_project, base_task_name)
+ )
if Task.archived_tag in base_task.get_system_tags():
LoggerRoot.get_base_logger().warning(
- 'Found base_task_project={} base_task_name={} but it is archived'.format(
- base_task_project, base_task_name))
+ "Found base_task_project={} base_task_name={} but it is archived".format(
+ base_task_project, base_task_name
+ )
+ )
base_task_id = base_task.id
if configuration_overrides is not None:
# verify we have a dict or a string on all values
- if not isinstance(configuration_overrides, dict) or \
- not all(isinstance(v, (str, dict)) for v in configuration_overrides.values()):
- raise ValueError("configuration_overrides must be a dictionary, with all values "
- "either dicts or strings, got \'{}\' instead".format(configuration_overrides))
+ if not isinstance(configuration_overrides, dict) or not all(
+ isinstance(v, (str, dict)) for v in configuration_overrides.values()
+ ):
+ raise ValueError(
+ "configuration_overrides must be a dictionary, with all values "
+ "either dicts or strings, got '{}' instead".format(configuration_overrides)
+ )
if task_overrides:
- task_overrides = flatten_dictionary(task_overrides, sep='.')
+ task_overrides = flatten_dictionary(task_overrides, sep=".")
self._nodes[name] = self.Node(
- name=name, base_task_id=base_task_id, parents=parents or [],
- queue=execution_queue, timeout=time_limit,
+ name=name,
+ base_task_id=base_task_id,
+ parents=parents or [],
+ queue=execution_queue,
+ timeout=time_limit,
parameters=parameter_override or {},
recursively_parse_parameters=recursively_parse_parameters,
configurations=configuration_overrides,
@@ -694,12 +713,18 @@ class PipelineController(object):
monitor_artifacts=monitor_artifacts or [],
monitor_models=monitor_models or [],
output_uri=self._output_uri if output_uri is None else output_uri,
- continue_behaviour=continue_behaviour
+ continue_behaviour=continue_behaviour,
)
self._retries[name] = 0
- self._retries_callbacks[name] = retry_on_failure if callable(retry_on_failure) else \
- (functools.partial(self._default_retry_on_failure_callback, max_retries=retry_on_failure)
- if isinstance(retry_on_failure, int) else self._retry_on_failure_callback)
+ self._retries_callbacks[name] = (
+ retry_on_failure
+ if callable(retry_on_failure)
+ else (
+ functools.partial(self._default_retry_on_failure_callback, max_retries=retry_on_failure)
+ if isinstance(retry_on_failure, int)
+ else self._retry_on_failure_callback
+ )
+ )
if status_change_callback:
self._status_change_callbacks[name] = status_change_callback
@@ -709,41 +734,41 @@ class PipelineController(object):
return True
def add_function_step(
- self,
- name, # type: str
- function, # type: Callable
- function_kwargs=None, # type: Optional[Dict[str, Any]]
- function_return=None, # type: Optional[List[str]]
- project_name=None, # type: Optional[str]
- task_name=None, # type: Optional[str]
- task_type=None, # type: Optional[str]
- auto_connect_frameworks=None, # type: Optional[dict]
- auto_connect_arg_parser=None, # type: Optional[dict]
- packages=None, # type: Optional[Union[bool, str, Sequence[str]]]
- repo=None, # type: Optional[str]
- repo_branch=None, # type: Optional[str]
- repo_commit=None, # type: Optional[str]
- helper_functions=None, # type: Optional[Sequence[Callable]]
- docker=None, # type: Optional[str]
- docker_args=None, # type: Optional[str]
- docker_bash_setup_script=None, # type: Optional[str]
- parents=None, # type: Optional[Sequence[str]]
- execution_queue=None, # type: Optional[str]
- monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]]
- monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
- monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
- time_limit=None, # type: Optional[float]
- continue_on_fail=False, # type: bool
- pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
- post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
- cache_executed_step=False, # type: bool
- retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
- status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
- tags=None, # type: Optional[Union[str, Sequence[str]]]
- output_uri=None, # type: Optional[Union[str, bool]]
- draft=False, # type: Optional[bool]
- working_dir=None, # type: Optional[str]
- continue_behaviour=None # type: Optional[dict]
+ self,
+ name, # type: str
+ function, # type: Callable
+ function_kwargs=None, # type: Optional[Dict[str, Any]]
+ function_return=None, # type: Optional[List[str]]
+ project_name=None, # type: Optional[str]
+ task_name=None, # type: Optional[str]
+ task_type=None, # type: Optional[str]
+ auto_connect_frameworks=None, # type: Optional[dict]
+ auto_connect_arg_parser=None, # type: Optional[dict]
+ packages=None, # type: Optional[Union[bool, str, Sequence[str]]]
+ repo=None, # type: Optional[str]
+ repo_branch=None, # type: Optional[str]
+ repo_commit=None, # type: Optional[str]
+ helper_functions=None, # type: Optional[Sequence[Callable]]
+ docker=None, # type: Optional[str]
+ docker_args=None, # type: Optional[str]
+ docker_bash_setup_script=None, # type: Optional[str]
+ parents=None, # type: Optional[Sequence[str]]
+ execution_queue=None, # type: Optional[str]
+ monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]]
+ monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
+ monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
+ time_limit=None, # type: Optional[float]
+ continue_on_fail=False, # type: bool
+ pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
+ post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
+ cache_executed_step=False, # type: bool
+ retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
+ status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
+ tags=None, # type: Optional[Union[str, Sequence[str]]]
+ output_uri=None, # type: Optional[Union[str, bool]]
+ draft=False, # type: Optional[bool]
+ working_dir=None, # type: Optional[str]
+ continue_behaviour=None, # type: Optional[dict]
):
# type: (...) -> bool
"""
@@ -938,7 +963,7 @@ class PipelineController(object):
function_kwargs = function_kwargs or {}
default_kwargs = inspect.getfullargspec(function)
if default_kwargs and default_kwargs.args and default_kwargs.defaults:
- for key, val in zip(default_kwargs.args[-len(default_kwargs.defaults):], default_kwargs.defaults):
+ for key, val in zip(default_kwargs.args[-len(default_kwargs.defaults) :], default_kwargs.defaults):
function_kwargs.setdefault(key, val)
return self._add_function_step(
@@ -975,15 +1000,15 @@ class PipelineController(object):
output_uri=output_uri,
draft=draft,
working_dir=working_dir,
- continue_behaviour=continue_behaviour
+ continue_behaviour=continue_behaviour,
)
def start(
- self,
- queue='services',
- step_task_created_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
- step_task_completed_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
- wait=True,
+ self,
+ queue="services",
+ step_task_created_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
+ step_task_completed_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
+ wait=True,
):
# type: (...) -> bool
"""
@@ -1029,8 +1054,8 @@ class PipelineController(object):
"""
if not self._task:
raise ValueError(
- "Could not find main Task, "
- "PipelineController must be created with `always_create_task=True`")
+ "Could not find main Task, " "PipelineController must be created with `always_create_task=True`"
+ )
# serialize state only if we are running locally
if Task.running_locally() or not self._task.is_main_task():
@@ -1045,7 +1070,7 @@ class PipelineController(object):
self._start(
step_task_created_callback=step_task_created_callback,
step_task_completed_callback=step_task_completed_callback,
- wait=wait
+ wait=wait,
)
return True
@@ -1066,12 +1091,12 @@ class PipelineController(object):
"""
if not self._task:
raise ValueError(
- "Could not find main Task, "
- "PipelineController must be created with `always_create_task=True`")
+ "Could not find main Task, " "PipelineController must be created with `always_create_task=True`"
+ )
if run_pipeline_steps_locally:
self._clearml_job_class = LocalClearmlJob
- self._default_execution_queue = self._default_execution_queue or 'mock'
+ self._default_execution_queue = self._default_execution_queue or "mock"
# serialize state only if we are running locally
if Task.running_locally() or not self._task.is_main_task():
@@ -1187,7 +1212,7 @@ class PipelineController(object):
auto_pickle=None, # type: Optional[bool]
preview=None, # type: Any
wait_on_upload=False, # type: bool
- serialization_function=None # type: Optional[Callable[[Any], Union[bytes, bytearray]]]
+ serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]]
):
# type: (...) -> bool
"""
@@ -1254,7 +1279,7 @@ class PipelineController(object):
auto_pickle=auto_pickle,
preview=preview,
wait_on_upload=wait_on_upload,
- serialization_function=serialization_function
+ serialization_function=serialization_function,
)
def stop(self, timeout=None, mark_failed=False, mark_aborted=False):
@@ -1280,12 +1305,12 @@ class PipelineController(object):
self._task.close()
if mark_failed:
- self._task.mark_failed(status_reason='Pipeline aborted and failed', force=True)
+ self._task.mark_failed(status_reason="Pipeline aborted and failed", force=True)
elif mark_aborted:
- self._task.mark_stopped(status_message='Pipeline aborted', force=True)
+ self._task.mark_stopped(status_message="Pipeline aborted", force=True)
elif self._pipeline_task_status_failed:
- print('Setting pipeline controller Task as failed (due to failed steps) !')
- self._task.mark_failed(status_reason='Pipeline step failed', force=True)
+ print("Setting pipeline controller Task as failed (due to failed steps) !")
+ self._task.mark_failed(status_reason="Pipeline step failed", force=True)
def wait(self, timeout=None):
# type: (Optional[float]) -> bool
@@ -1305,7 +1330,7 @@ class PipelineController(object):
return True
if timeout is not None:
- timeout *= 60.
+ timeout *= 60.0
_thread = self._thread
@@ -1366,7 +1391,7 @@ class PipelineController(object):
"""
if self._start_time is None:
return -1.0
- return (time() - self._start_time) / 60.
+ return (time() - self._start_time) / 60.0
def get_pipeline_dag(self):
# type: () -> Mapping[str, PipelineController.Node]
@@ -1450,13 +1475,13 @@ class PipelineController(object):
@classmethod
def _create_pipeline_project_args(cls, name, project):
- task_name = name or project or '{}'.format(datetime.now())
+ task_name = name or project or "{}".format(datetime.now())
if cls._pipeline_as_sub_project():
parent_project = (project + "/" if project else "") + cls._project_section
project_name = "{}/{}".format(parent_project, task_name)
else:
parent_project = None
- project_name = project or 'Pipelines'
+ project_name = project or "Pipelines"
return {"task_name": task_name, "parent_project": parent_project, "project_name": project_name}
@classmethod
@@ -1526,9 +1551,7 @@ class PipelineController(object):
:return: The newly created PipelineController
"""
- pipeline_project_args = cls._create_pipeline_project_args(
- name=task_name, project=project_name
- )
+ pipeline_project_args = cls._create_pipeline_project_args(name=task_name, project=project_name)
pipeline_controller = Task.create(
project_name=pipeline_project_args["project_name"],
task_name=pipeline_project_args["task_name"],
@@ -1545,7 +1568,7 @@ class PipelineController(object):
docker_bash_setup_script=docker_bash_setup_script,
argparse_args=argparse_args,
add_task_init_call=False,
- force_single_script_file=force_single_script_file
+ force_single_script_file=force_single_script_file,
)
cls._create_pipeline_projects(
task=pipeline_controller,
@@ -1567,7 +1590,7 @@ class PipelineController(object):
comment=None, # type: Optional[str]
parent=None, # type: Optional[str]
project=None, # type: Optional[str]
- version=None # type: Optional[str]
+ version=None, # type: Optional[str]
):
# type: (...) -> PipelineController
"""
@@ -1670,7 +1693,7 @@ class PipelineController(object):
pipeline_name=None, # type: Optional[str]
pipeline_version=None, # type: Optional[str]
pipeline_tags=None, # type: Optional[Sequence[str]]
- shallow_search=False # type: bool
+ shallow_search=False, # type: bool
):
# type: (...) -> "PipelineController"
"""
@@ -1722,7 +1745,9 @@ class PipelineController(object):
pipeline_id = pipeline.id
break
if not pipeline_id:
- error_msg = "Could not find dataset with pipeline_project={}, pipeline_name={}".format(pipeline_project, pipeline_name)
+ error_msg = "Could not find dataset with pipeline_project={}, pipeline_name={}".format(
+ pipeline_project, pipeline_name
+ )
if pipeline_version:
error_msg += ", pipeline_version={}".format(pipeline_version)
raise ValueError(error_msg)
@@ -1795,7 +1820,7 @@ class PipelineController(object):
commit,
helper_functions,
output_uri=None,
- working_dir=None
+ working_dir=None,
):
task_definition = CreateFromFunction.create_task_from_function(
a_function=function,
@@ -1821,15 +1846,15 @@ class PipelineController(object):
artifact_serialization_function=self._artifact_serialization_function,
artifact_deserialization_function=self._artifact_deserialization_function,
skip_global_imports=self._skip_global_imports,
- working_dir=working_dir
+ working_dir=working_dir,
)
return task_definition
def _start(
- self,
- step_task_created_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
- step_task_completed_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
- wait=True,
+ self,
+ step_task_created_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
+ step_task_completed_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
+ wait=True,
):
# type: (...) -> bool
"""
@@ -1887,31 +1912,32 @@ class PipelineController(object):
return True
def _prepare_pipeline(
- self,
- step_task_created_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
- step_task_completed_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
+ self,
+ step_task_created_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
+ step_task_completed_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
):
# type (...) -> None
params, pipeline_dag = self._serialize_pipeline_task()
# deserialize back pipeline state
- if not params['continue_pipeline']:
+ if not params["continue_pipeline"]:
for k in pipeline_dag:
- pipeline_dag[k]['executed'] = None
- pipeline_dag[k]['job_started'] = None
- pipeline_dag[k]['job_ended'] = None
- self._default_execution_queue = params['default_queue']
- self._add_pipeline_tags = params['add_pipeline_tags']
- self._target_project = params['target_project'] or ''
+ pipeline_dag[k]["executed"] = None
+ pipeline_dag[k]["job_started"] = None
+ pipeline_dag[k]["job_ended"] = None
+ self._default_execution_queue = params["default_queue"]
+ self._add_pipeline_tags = params["add_pipeline_tags"]
+ self._target_project = params["target_project"] or ""
self._deserialize(pipeline_dag)
# if we continue the pipeline, make sure that we re-execute failed tasks
- if params['continue_pipeline']:
+ if params["continue_pipeline"]:
for node in list(self._nodes.values()):
if node.executed is False:
node.executed = None
if not self._verify():
- raise ValueError("Failed verifying pipeline execution graph, "
- "it has either inaccessible nodes, or contains cycles")
+ raise ValueError(
+ "Failed verifying pipeline execution graph, " "it has either inaccessible nodes, or contains cycles"
+ )
self.update_execution_plot()
self._start_time = time()
self._stop_event = Event()
@@ -1926,9 +1952,9 @@ class PipelineController(object):
:return: params, pipeline_dag
"""
params = {
- 'default_queue': self._default_execution_queue,
- 'add_pipeline_tags': self._add_pipeline_tags,
- 'target_project': self._target_project,
+ "default_queue": self._default_execution_queue,
+ "add_pipeline_tags": self._add_pipeline_tags,
+ "target_project": self._target_project,
}
pipeline_dag = self._serialize()
@@ -1939,8 +1965,8 @@ class PipelineController(object):
if self._task.running_locally() or self._task.get_configuration_object(name=self._config_section) is None:
# noinspection PyProtectedMember
self._task._set_configuration(
- name=self._config_section, config_type='dictionary',
- config_text=json.dumps(pipeline_dag, indent=2))
+ name=self._config_section, config_type="dictionary", config_text=json.dumps(pipeline_dag, indent=2)
+ )
args_map_inversed = {}
for section, arg_list in self._args_map.items():
for arg in arg_list:
@@ -1957,7 +1983,7 @@ class PipelineController(object):
__update=True,
)
self._task.connect(params, name=self._pipeline_section)
- params['continue_pipeline'] = False
+ params["continue_pipeline"] = False
# make sure we have a unique version number (auto bump version if needed)
# only needed when manually (from code) creating pipelines
@@ -1967,10 +1993,12 @@ class PipelineController(object):
pipeline_hash = self._get_task_hash()
# noinspection PyProtectedMember
- self._task._set_runtime_properties({
- self._runtime_property_hash: "{}:{}".format(pipeline_hash, self._version),
- "version": self._version
- })
+ self._task._set_runtime_properties(
+ {
+ self._runtime_property_hash: "{}:{}".format(pipeline_hash, self._version),
+ "version": self._version,
+ }
+ )
self._task.set_user_properties(version=self._version)
else:
self._task.connect_configuration(pipeline_dag, name=self._config_section)
@@ -1982,23 +2010,23 @@ class PipelineController(object):
new_pipeline_args.update(mutable_dict)
connected_args.update(arg_list)
mutable_dict = {k: v for k, v in self._pipeline_args.items() if k not in connected_args}
- self._task.connect(
- mutable_dict, name=self._args_section
- )
+ self._task.connect(mutable_dict, name=self._args_section)
new_pipeline_args.update(mutable_dict)
self._pipeline_args = new_pipeline_args
self._task.connect(params, name=self._pipeline_section)
# noinspection PyProtectedMember
if self._task._get_runtime_properties().get(self._runtime_property_hash):
- params['continue_pipeline'] = True
+ params["continue_pipeline"] = True
else:
# noinspection PyProtectedMember
pipeline_hash = ClearmlJob._create_task_hash(self._task)
# noinspection PyProtectedMember
- self._task._set_runtime_properties({
- self._runtime_property_hash: "{}:{}".format(pipeline_hash, self._version),
- })
- params['continue_pipeline'] = False
+ self._task._set_runtime_properties(
+ {
+ self._runtime_property_hash: "{}:{}".format(pipeline_hash, self._version),
+ }
+ )
+ params["continue_pipeline"] = False
return params, pipeline_dag
@@ -2014,7 +2042,7 @@ class PipelineController(object):
order_by=["-last_update"],
system_tags=[self._tag],
search_hidden=True,
- _allow_extra_fields_=True
+ _allow_extra_fields_=True,
)
for previous_pipeline_task in previous_pipeline_tasks:
if previous_pipeline_task.runtime.get("version"):
@@ -2024,15 +2052,23 @@ class PipelineController(object):
def _get_task_hash(self):
params_override = dict(**(self._task.get_parameters() or {}))
- params_override.pop('properties/version', None)
+ params_override.pop("properties/version", None)
# dag state without status / states
nodes_items = list(self._nodes.items())
dag = {
name: {
- k: v for k, v in node.__dict__.items()
- if k not in (
- 'job', 'name', 'task_factory_func', 'executed', 'status',
- 'job_started', 'job_ended', 'skip_job'
+ k: v
+ for k, v in node.__dict__.items()
+ if k
+ not in (
+ "job",
+ "name",
+ "task_factory_func",
+ "executed",
+ "status",
+ "job_started",
+ "job_ended",
+ "skip_job",
)
}
for name, node in nodes_items
@@ -2059,12 +2095,13 @@ class PipelineController(object):
:return:
"""
nodes_items = list(self._nodes.items())
- dag = {name: dict((k, v) for k, v in node.__dict__.items()
- if k not in ('job', 'name', 'task_factory_func'))
- for name, node in nodes_items}
+ dag = {
+ name: dict((k, v) for k, v in node.__dict__.items() if k not in ("job", "name", "task_factory_func"))
+ for name, node in nodes_items
+ }
# update state for presentation only
for name, node in nodes_items:
- dag[name]['job_id'] = node.executed or (node.job.task_id() if node.job else None)
+ dag[name]["job_id"] = node.executed or (node.job.task_id() if node.job else None)
return dag
@@ -2081,17 +2118,27 @@ class PipelineController(object):
# if we do not clone the Task, only merge the parts we can override.
for name in list(self._nodes.keys()):
- if not self._nodes[name].clone_task and name in dag_dict and not dag_dict[name].get('clone_task'):
- for k in ('queue', 'parents', 'timeout', 'parameters', 'configurations', 'task_overrides',
- 'executed', 'job_started', 'job_ended'):
+ if not self._nodes[name].clone_task and name in dag_dict and not dag_dict[name].get("clone_task"):
+ for k in (
+ "queue",
+ "parents",
+ "timeout",
+ "parameters",
+ "configurations",
+ "task_overrides",
+ "executed",
+ "job_started",
+ "job_ended",
+ ):
setattr(self._nodes[name], k, dag_dict[name].get(k) or type(getattr(self._nodes[name], k))())
# if we do clone the Task deserialize everything, except the function creating
self._nodes = {
- k: self.Node(name=k, **{kk: vv for kk, vv in v.items() if kk not in ('job_id', )})
- if k not in self._nodes or (v.get('base_task_id') and v.get('clone_task'))
+ k: self.Node(name=k, **{kk: vv for kk, vv in v.items() if kk not in ("job_id",)})
+ if k not in self._nodes or (v.get("base_task_id") and v.get("clone_task"))
else self._nodes[k]
- for k, v in dag_dict.items()}
+ for k, v in dag_dict.items()
+ }
# set the task_factory_func for each cloned node
for node in list(self._nodes.values()):
@@ -2141,8 +2188,10 @@ class PipelineController(object):
raise ValueError("Node '{}', base_task_id is empty".format(node.name))
if not self._default_execution_queue and not node.queue:
- raise ValueError("Node '{}' missing execution queue, "
- "no default queue defined and no specific node queue defined".format(node.name))
+ raise ValueError(
+ "Node '{}' missing execution queue, "
+ "no default queue defined and no specific node queue defined".format(node.name)
+ )
task = node.task_factory_func or Task.get_task(task_id=node.base_task_id)
if not task:
@@ -2162,16 +2211,18 @@ class PipelineController(object):
if ref_step:
parents.add(ref_step)
# verify we have a section name
- if '/' not in k:
+ if "/" not in k:
raise ValueError(
- "Section name is missing in parameter \"{}\", "
+ 'Section name is missing in parameter "{}", '
"parameters should be in the form of "
- "\"`section-name`/parameter\", example: \"Args/param\"".format(v))
+ '"`section-name`/parameter", example: "Args/param"'.format(v)
+ )
if parents and parents != set(node.parents or []):
parents = parents - set(node.parents or [])
- getLogger('clearml.automation.controller').info(
- 'Node "{}" missing parent reference, adding: {}'.format(node.name, parents))
+ getLogger("clearml.automation.controller").info(
+ 'Node "{}" missing parent reference, adding: {}'.format(node.name, parents)
+ )
node.parents = (node.parents or []) + list(parents)
# verify and fix monitoring sections:
@@ -2183,36 +2234,32 @@ class PipelineController(object):
if not all(isinstance(x, (list, tuple)) and x for x in monitors):
raise ValueError("{} should be a list of tuples, found: {}".format(monitor_type, monitors))
# convert single pair into a pair of pairs:
- conformed_monitors = [
- pair if isinstance(pair[0], (list, tuple)) else (pair, pair) for pair in monitors
- ]
+ conformed_monitors = [pair if isinstance(pair[0], (list, tuple)) else (pair, pair) for pair in monitors]
# verify the pair of pairs
- if not all(isinstance(x[0][0], str) and isinstance(x[0][1], str) and
- isinstance(x[1][0], str) and isinstance(x[1][1], str)
- for x in conformed_monitors):
+ if not all(
+ isinstance(x[0][0], str)
+ and isinstance(x[0][1], str)
+ and isinstance(x[1][0], str)
+ and isinstance(x[1][1], str)
+ for x in conformed_monitors
+ ):
raise ValueError("{} should be a list of tuples, found: {}".format(monitor_type, monitors))
else:
# verify a list of tuples
if not all(isinstance(x, (list, tuple, str)) and x for x in monitors):
- raise ValueError(
- "{} should be a list of tuples, found: {}".format(monitor_type, monitors))
+ raise ValueError("{} should be a list of tuples, found: {}".format(monitor_type, monitors))
# convert single str into a pair of pairs:
- conformed_monitors = [
- pair if isinstance(pair, (list, tuple)) else (pair, pair) for pair in monitors
- ]
+ conformed_monitors = [pair if isinstance(pair, (list, tuple)) else (pair, pair) for pair in monitors]
# verify the pair of pairs
- if not all(isinstance(x[0], str) and
- isinstance(x[1], str)
- for x in conformed_monitors):
- raise ValueError(
- "{} should be a list of tuples, found: {}".format(monitor_type, monitors))
+ if not all(isinstance(x[0], str) and isinstance(x[1], str) for x in conformed_monitors):
+ raise ValueError("{} should be a list of tuples, found: {}".format(monitor_type, monitors))
return conformed_monitors
# verify and fix monitoring sections:
- node.monitor_metrics = _verify_monitors(node.monitor_metrics, 'monitor_metrics', nested_pairs=True)
- node.monitor_artifacts = _verify_monitors(node.monitor_artifacts, 'monitor_artifacts')
- node.monitor_models = _verify_monitors(node.monitor_models, 'monitor_models')
+ node.monitor_metrics = _verify_monitors(node.monitor_metrics, "monitor_metrics", nested_pairs=True)
+ node.monitor_artifacts = _verify_monitors(node.monitor_artifacts, "monitor_artifacts")
+ node.monitor_models = _verify_monitors(node.monitor_models, "monitor_models")
return True
@@ -2238,41 +2285,41 @@ class PipelineController(object):
return not bool(set(self._nodes.keys()) - visited)
def _add_function_step(
- self,
- name, # type: str
- function, # type: Callable
- function_kwargs=None, # type: Optional[Dict[str, Any]]
- function_return=None, # type: Optional[List[str]]
- project_name=None, # type: Optional[str]
- task_name=None, # type: Optional[str]
- task_type=None, # type: Optional[str]
- auto_connect_frameworks=None, # type: Optional[dict]
- auto_connect_arg_parser=None, # type: Optional[dict]
- packages=None, # type: Optional[Union[bool, str, Sequence[str]]]
- repo=None, # type: Optional[str]
- repo_branch=None, # type: Optional[str]
- repo_commit=None, # type: Optional[str]
- helper_functions=None, # type: Optional[Sequence[Callable]]
- docker=None, # type: Optional[str]
- docker_args=None, # type: Optional[str]
- docker_bash_setup_script=None, # type: Optional[str]
- parents=None, # type: Optional[Sequence[str]]
- execution_queue=None, # type: Optional[str]
- monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]]
- monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
- monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
- time_limit=None, # type: Optional[float]
- continue_on_fail=False, # type: bool
- pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
- post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
- cache_executed_step=False, # type: bool
- retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
- status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
- tags=None, # type: Optional[Union[str, Sequence[str]]]
- output_uri=None, # type: Optional[Union[str, bool]]
- draft=False, # type: Optional[bool]
- working_dir=None, # type: Optional[str]
- continue_behaviour=None # type: Optional[dict]
+ self,
+ name, # type: str
+ function, # type: Callable
+ function_kwargs=None, # type: Optional[Dict[str, Any]]
+ function_return=None, # type: Optional[List[str]]
+ project_name=None, # type: Optional[str]
+ task_name=None, # type: Optional[str]
+ task_type=None, # type: Optional[str]
+ auto_connect_frameworks=None, # type: Optional[dict]
+ auto_connect_arg_parser=None, # type: Optional[dict]
+ packages=None, # type: Optional[Union[bool, str, Sequence[str]]]
+ repo=None, # type: Optional[str]
+ repo_branch=None, # type: Optional[str]
+ repo_commit=None, # type: Optional[str]
+ helper_functions=None, # type: Optional[Sequence[Callable]]
+ docker=None, # type: Optional[str]
+ docker_args=None, # type: Optional[str]
+ docker_bash_setup_script=None, # type: Optional[str]
+ parents=None, # type: Optional[Sequence[str]]
+ execution_queue=None, # type: Optional[str]
+ monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]]
+ monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
+ monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
+ time_limit=None, # type: Optional[float]
+ continue_on_fail=False, # type: bool
+ pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
+ post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
+ cache_executed_step=False, # type: bool
+ retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
+ status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
+ tags=None, # type: Optional[Union[str, Sequence[str]]]
+ output_uri=None, # type: Optional[Union[str, bool]]
+ draft=False, # type: Optional[bool]
+ working_dir=None, # type: Optional[str]
+ continue_behaviour=None, # type: Optional[dict]
):
# type: (...) -> bool
"""
@@ -2481,7 +2528,7 @@ class PipelineController(object):
continue
if self._step_ref_pattern.match(str(v)):
# check for step artifacts
- step, _, artifact = v[2:-1].partition('.')
+ step, _, artifact = v[2:-1].partition(".")
if step in self._nodes and artifact in self._nodes[step].return_artifacts:
function_input_artifacts[k] = "${{{}.id}}.{}".format(step, artifact)
continue
@@ -2497,8 +2544,10 @@ class PipelineController(object):
parameters = {"{}/{}".format(CreateFromFunction.kwargs_section, k): v for k, v in function_kwargs.items()}
if function_input_artifacts:
parameters.update(
- {"{}/{}".format(CreateFromFunction.input_artifact_section, k): str(v)
- for k, v in function_input_artifacts.items()}
+ {
+ "{}/{}".format(CreateFromFunction.input_artifact_section, k): str(v)
+ for k, v in function_input_artifacts.items()
+ }
)
job_code_section = name
@@ -2556,8 +2605,7 @@ class PipelineController(object):
# update configuration with the task definitions
# noinspection PyProtectedMember
self._task._set_configuration(
- name=name, config_type='json',
- config_text=json.dumps(task_definition, indent=1)
+ name=name, config_type="json", config_text=json.dumps(task_definition, indent=1)
)
else:
# load task definition from configuration
@@ -2568,8 +2616,8 @@ class PipelineController(object):
def _create_task(_):
a_task = Task.create(
project_name=project_name,
- task_name=task_definition.get('name'),
- task_type=task_definition.get('type'),
+ task_name=task_definition.get("name"),
+ task_type=task_definition.get("type"),
)
# replace reference
a_task.update_task(task_definition)
@@ -2583,8 +2631,11 @@ class PipelineController(object):
return a_task
self._nodes[name] = self.Node(
- name=name, base_task_id=None, parents=parents or [],
- queue=execution_queue, timeout=time_limit,
+ name=name,
+ base_task_id=None,
+ parents=parents or [],
+ queue=execution_queue,
+ timeout=time_limit,
parameters=parameters,
clone_task=False,
cache_executed_step=cache_executed_step,
@@ -2598,12 +2649,18 @@ class PipelineController(object):
explicit_docker_image=docker,
output_uri=output_uri,
draft=draft,
- continue_behaviour=continue_behaviour
+ continue_behaviour=continue_behaviour,
)
self._retries[name] = 0
- self._retries_callbacks[name] = retry_on_failure if callable(retry_on_failure) else \
- (functools.partial(self._default_retry_on_failure_callback, max_retries=retry_on_failure)
- if isinstance(retry_on_failure, int) else self._retry_on_failure_callback)
+ self._retries_callbacks[name] = (
+ retry_on_failure
+ if callable(retry_on_failure)
+ else (
+ functools.partial(self._default_retry_on_failure_callback, max_retries=retry_on_failure)
+ if isinstance(retry_on_failure, int)
+ else self._retry_on_failure_callback
+ )
+ )
return True
@@ -2640,10 +2697,10 @@ class PipelineController(object):
node.job_type = None
if node.job or node.executed:
- print('Skipping cached/executed step [{}]'.format(node.name))
+ print("Skipping cached/executed step [{}]".format(node.name))
return False
- print('Launching step [{}]'.format(node.name))
+ print("Launching step [{}]".format(node.name))
updated_hyper_parameters = {}
for k, v in node.parameters.items():
@@ -2681,8 +2738,9 @@ class PipelineController(object):
base_task_id=task_id,
parameter_override=updated_hyper_parameters,
configuration_overrides=node.configurations,
- tags=['{} {}'.format(self._node_tag_prefix, self._task.id)]
- if self._add_pipeline_tags and self._task else None,
+ tags=["{} {}".format(self._node_tag_prefix, self._task.id)]
+ if self._add_pipeline_tags and self._task
+ else None,
parent=self._task.id if self._task else None,
disable_clone_task=disable_clone_task,
task_overrides=task_overrides,
@@ -2704,8 +2762,7 @@ class PipelineController(object):
if skip_node is False:
# skipping node
- getLogger('clearml.automation.controller').warning(
- 'Skipping node {} on callback request'.format(node))
+ getLogger("clearml.automation.controller").warning("Skipping node {} on callback request".format(node))
# delete the job we just created
node.job.delete()
node.skip_job = True
@@ -2742,7 +2799,7 @@ class PipelineController(object):
sankey_node = dict(
label=[],
color=[],
- hovertemplate='%{label}',
+ hovertemplate="%{label}",
# customdata=[],
# hovertemplate='%{label}
Hyper-Parameters:
%{customdata}',
)
@@ -2751,7 +2808,7 @@ class PipelineController(object):
target=[],
value=[],
# hovertemplate='%{target.label}',
- hovertemplate='',
+ hovertemplate="",
)
visited = []
node_params = []
@@ -2778,17 +2835,20 @@ class PipelineController(object):
# sankey_node['label'].append(node.name)
# sankey_node['customdata'].append(
# '
'.join('{}: {}'.format(k, v) for k, v in (node.parameters or {}).items()))
- sankey_node['label'].append(
- '{}
'.format(node.name) +
- '
'.join('{}: {}'.format(k, v if len(str(v)) < 24 else (str(v)[:24]+' ...'))
- for k, v in (node.parameters or {}).items()))
+ sankey_node["label"].append(
+ "{}
".format(node.name)
+ + "
".join(
+ "{}: {}".format(k, v if len(str(v)) < 24 else (str(v)[:24] + " ..."))
+ for k, v in (node.parameters or {}).items()
+ )
+ )
- sankey_node['color'].append(self._get_node_color(node))
+ sankey_node["color"].append(self._get_node_color(node))
for p in parents:
- sankey_link['source'].append(p)
- sankey_link['target'].append(idx)
- sankey_link['value'].append(1)
+ sankey_link["source"].append(p)
+ sankey_link["target"].append(idx)
+ sankey_link["value"].append(1)
# if nothing changed, we give up
if nodes == next_nodes:
@@ -2798,16 +2858,16 @@ class PipelineController(object):
# make sure we have no independent (unconnected) nodes
single_nodes = []
- for i in [n for n in range(len(visited)) if n not in sankey_link['source'] and n not in sankey_link['target']]:
+ for i in [n for n in range(len(visited)) if n not in sankey_link["source"] and n not in sankey_link["target"]]:
single_nodes.append(i)
# create the sankey graph
dag_flow = dict(
link=sankey_link,
node=sankey_node,
- textfont=dict(color='rgba(0,0,0,0)', size=1),
- type='sankey',
- orientation='h'
+ textfont=dict(color="rgba(0,0,0,0)", size=1),
+ type="sankey",
+ orientation="h",
)
table_values = self._build_table_report(node_params, visited)
@@ -2815,43 +2875,53 @@ class PipelineController(object):
# hack, show single node sankey
if single_nodes:
singles_flow = dict(
- x=list(range(len(single_nodes))), y=[1] * len(single_nodes),
- text=[v for i, v in enumerate(sankey_node['label']) if i in single_nodes],
- mode='markers',
+ x=list(range(len(single_nodes))),
+ y=[1] * len(single_nodes),
+ text=[v for i, v in enumerate(sankey_node["label"]) if i in single_nodes],
+ mode="markers",
hovertemplate="%{text}",
marker=dict(
- color=[v for i, v in enumerate(sankey_node['color']) if i in single_nodes],
+ color=[v for i, v in enumerate(sankey_node["color"]) if i in single_nodes],
size=[40] * len(single_nodes),
),
showlegend=False,
- type='scatter',
+ type="scatter",
)
# only single nodes
- if len(single_nodes) == len(sankey_node['label']):
- fig = dict(data=[singles_flow], layout={
- 'hovermode': 'closest', 'xaxis': {'visible': False}, 'yaxis': {'visible': False}})
+ if len(single_nodes) == len(sankey_node["label"]):
+ fig = dict(
+ data=[singles_flow],
+ layout={"hovermode": "closest", "xaxis": {"visible": False}, "yaxis": {"visible": False}},
+ )
else:
- dag_flow['domain'] = {'x': [0.0, 1.0], 'y': [0.2, 1.0]}
- fig = dict(data=[dag_flow, singles_flow],
- layout={'autosize': True,
- 'hovermode': 'closest',
- 'xaxis': {'anchor': 'y', 'domain': [0.0, 1.0], 'visible': False},
- 'yaxis': {'anchor': 'x', 'domain': [0.0, 0.15], 'visible': False}
- })
+ dag_flow["domain"] = {"x": [0.0, 1.0], "y": [0.2, 1.0]}
+ fig = dict(
+ data=[dag_flow, singles_flow],
+ layout={
+ "autosize": True,
+ "hovermode": "closest",
+ "xaxis": {"anchor": "y", "domain": [0.0, 1.0], "visible": False},
+ "yaxis": {"anchor": "x", "domain": [0.0, 0.15], "visible": False},
+ },
+ )
else:
# create the sankey plot
- fig = dict(data=[dag_flow], layout={'xaxis': {'visible': False}, 'yaxis': {'visible': False}})
+ fig = dict(data=[dag_flow], layout={"xaxis": {"visible": False}, "yaxis": {"visible": False}})
# report DAG
self._task.get_logger().report_plotly(
- title=self._report_plot_execution_flow['title'],
- series=self._report_plot_execution_flow['series'],
- iteration=0, figure=fig)
+ title=self._report_plot_execution_flow["title"],
+ series=self._report_plot_execution_flow["series"],
+ iteration=0,
+ figure=fig,
+ )
# report detailed table
self._task.get_logger().report_table(
- title=self._report_plot_execution_details['title'],
- series=self._report_plot_execution_details['series'],
- iteration=0, table_plot=table_values)
+ title=self._report_plot_execution_details["title"],
+ series=self._report_plot_execution_details["series"],
+ iteration=0,
+ table_plot=table_values,
+ )
def _build_table_report(self, node_params, visited):
# type: (List, List) -> List[List]
@@ -2863,14 +2933,16 @@ class PipelineController(object):
:return: Table as a List of a List of strings (cell)
"""
- task_link_template = self._task.get_output_log_web_page() \
- .replace('/{}/'.format(self._task.project), '/{project}/') \
- .replace('/{}/'.format(self._task.id), '/{task}/')
+ task_link_template = (
+ self._task.get_output_log_web_page()
+ .replace("/{}/".format(self._task.project), "/{project}/")
+ .replace("/{}/".format(self._task.id), "/{task}/")
+ )
table_values = [["Pipeline Step", "Task ID", "Task Name", "Status", "Parameters"]]
for name, param in zip(visited, node_params):
- param_str = str(param) if param else ''
+ param_str = str(param) if param else ""
if len(param_str) > 3:
# remove {} from string
param_str = param_str[1:-1]
@@ -2878,14 +2950,17 @@ class PipelineController(object):
step_name = name
if self._nodes[name].base_task_id:
step_name += '\n[ {} ]'.format(
- task_link_template.format(project='*', task=self._nodes[name].base_task_id), 'base task')
+ task_link_template.format(project="*", task=self._nodes[name].base_task_id), "base task"
+ )
table_values.append(
- [step_name,
- self.__create_task_link(self._nodes[name], task_link_template),
- self._nodes[name].job.task.name if self._nodes[name].job else '',
- str(self._nodes[name].status or ""),
- param_str]
+ [
+ step_name,
+ self.__create_task_link(self._nodes[name], task_link_template),
+ self._nodes[name].job.task.name if self._nodes[name].job else "",
+ str(self._nodes[name].status or ""),
+ param_str,
+ ]
)
return table_values
@@ -3010,9 +3085,11 @@ class PipelineController(object):
# type: () -> ()
pipeline_dag = self._serialize()
self._task.upload_artifact(
- name=self._state_artifact_name, artifact_object='',
+ name=self._state_artifact_name,
+ artifact_object="",
metadata=dict(pipeline=hash_dict(pipeline_dag)),
- preview=json.dumps(pipeline_dag, indent=1))
+ preview=json.dumps(pipeline_dag, indent=1),
+ )
def _force_task_configuration_update(self):
# type: () -> ()
@@ -3020,9 +3097,12 @@ class PipelineController(object):
if self._task:
# noinspection PyProtectedMember
self._task._set_configuration(
- name=self._config_section, config_type='dictionary',
+ name=self._config_section,
+ config_type="dictionary",
description="pipeline state: {}".format(hash_dict(pipeline_dag)),
- config_text=json.dumps(pipeline_dag, indent=2), force=True)
+ config_text=json.dumps(pipeline_dag, indent=2),
+ force=True,
+ )
def _update_progress(self):
# type: () -> ()
@@ -3111,11 +3191,11 @@ class PipelineController(object):
# nothing changed, we can sleep
if not completed_jobs and self._running_nodes:
# force updating the pipeline state (plot) at least every 5 min.
- if force_execution_plot_update or time()-last_plot_report > self._update_execution_plot_interval:
+ if force_execution_plot_update or time() - last_plot_report > self._update_execution_plot_interval:
last_plot_report = time()
last_monitor_report = time()
self.update_execution_plot()
- elif time()-last_monitor_report > self._monitor_node_interval:
+ elif time() - last_monitor_report > self._monitor_node_interval:
last_monitor_report = time()
self._scan_monitored_nodes()
continue
@@ -3133,8 +3213,11 @@ class PipelineController(object):
# check if we need to stop the pipeline, and abort all running steps
if nodes_failed_stop_pipeline:
- print('Aborting pipeline and stopping all running steps, node {} failed'.format(
- nodes_failed_stop_pipeline))
+ print(
+ "Aborting pipeline and stopping all running steps, node {} failed".format(
+ nodes_failed_stop_pipeline
+ )
+ )
break
# Pull the next jobs in the pipeline, based on the completed list
@@ -3148,26 +3231,30 @@ class PipelineController(object):
next_nodes.append(node.name)
# update the execution graph
- print('Launching the next {} steps'.format(len(next_nodes)))
- node_launch_success = launch_thread_pool.map(
- self._launch_node, [self._nodes[name] for name in next_nodes])
+ print("Launching the next {} steps".format(len(next_nodes)))
+ node_launch_success = launch_thread_pool.map(self._launch_node, [self._nodes[name] for name in next_nodes])
for name, success in zip(next_nodes, node_launch_success):
if success and not self._nodes[name].skip_job:
if self._nodes[name].job and self._nodes[name].job.task_parameter_override is not None:
self._nodes[name].job.task_parameter_override.update(self._nodes[name].parameters or {})
- print('Launching step: {}'.format(name))
- print('Parameters:\n{}'.format(
- self._nodes[name].job.task_parameter_override if self._nodes[name].job
- else self._nodes[name].parameters))
- print('Configurations:\n{}'.format(self._nodes[name].configurations))
- print('Overrides:\n{}'.format(self._nodes[name].task_overrides))
+ print("Launching step: {}".format(name))
+ print(
+ "Parameters:\n{}".format(
+ self._nodes[name].job.task_parameter_override
+ if self._nodes[name].job
+ else self._nodes[name].parameters
+ )
+ )
+ print("Configurations:\n{}".format(self._nodes[name].configurations))
+ print("Overrides:\n{}".format(self._nodes[name].task_overrides))
launched_nodes.add(name)
# check if node is cached do not wait for event but run the loop again
if self._nodes[name].executed:
pooling_counter = 0
else:
- getLogger('clearml.automation.controller').warning(
- 'Skipping launching step \'{}\': {}'.format(name, self._nodes[name]))
+ getLogger("clearml.automation.controller").warning(
+ "Skipping launching step '{}': {}".format(name, self._nodes[name])
+ )
# update current state (in configuration, so that we could later continue an aborted pipeline)
# visualize pipeline state (plot)
@@ -3246,9 +3333,9 @@ class PipelineController(object):
def _verify_node_name(self, name):
# type: (str) -> None
if name in self._nodes:
- raise ValueError('Node named \'{}\' already exists in the pipeline dag'.format(name))
+ raise ValueError("Node named '{}' already exists in the pipeline dag".format(name))
if name in self._reserved_pipeline_names:
- raise ValueError('Node named \'{}\' is a reserved keyword, use a different name'.format(name))
+ raise ValueError("Node named '{}' is a reserved keyword, use a different name".format(name))
def _scan_monitored_nodes(self):
# type: () -> None
@@ -3272,7 +3359,7 @@ class PipelineController(object):
self._monitored_nodes[node.name] = {}
# if we are done with this node, skip it
- if self._monitored_nodes[node.name].get('completed'):
+ if self._monitored_nodes[node.name].get("completed"):
return
if node.job and node.job.task:
@@ -3284,14 +3371,14 @@ class PipelineController(object):
# update the metrics
if node.monitor_metrics:
- metrics_state = self._monitored_nodes[node.name].get('metrics', {})
+ metrics_state = self._monitored_nodes[node.name].get("metrics", {})
logger = self._task.get_logger()
- scalars = task.get_reported_scalars(x_axis='iter')
+ scalars = task.get_reported_scalars(x_axis="iter")
for (s_title, s_series), (t_title, t_series) in node.monitor_metrics:
values = scalars.get(s_title, {}).get(s_series)
- if values and values.get('x') is not None and values.get('y') is not None:
- x = values['x'][-1]
- y = values['y'][-1]
+ if values and values.get("x") is not None and values.get("y") is not None:
+ x = values["x"][-1]
+ y = values["y"][-1]
last_y = metrics_state.get(s_title, {}).get(s_series)
if last_y is None or y > last_y:
logger.report_scalar(title=t_title, series=t_series, value=y, iteration=int(x))
@@ -3300,7 +3387,7 @@ class PipelineController(object):
metrics_state[s_title] = {}
metrics_state[s_title][s_series] = last_y
- self._monitored_nodes[node.name]['metrics'] = metrics_state
+ self._monitored_nodes[node.name]["metrics"] = metrics_state
if node.monitor_artifacts:
task.reload()
@@ -3350,7 +3437,7 @@ class PipelineController(object):
# update the state (so that we do not scan the node twice)
if node.job.is_stopped(aborted_nonresponsive_as_running=True):
- self._monitored_nodes[node.name]['completed'] = True
+ self._monitored_nodes[node.name]["completed"] = True
def _get_target_project(self, return_project_id=False):
# type: (bool) -> str
@@ -3362,19 +3449,19 @@ class PipelineController(object):
:return: project id/name (None if not valid)
"""
if not self._target_project:
- return ''
+ return ""
- if str(self._target_project).lower().strip() == 'true':
+ if str(self._target_project).lower().strip() == "true":
if not self._task:
- return ''
+ return ""
return self._task.project if return_project_id else self._task.get_project_name()
if not return_project_id:
return self._target_project
return get_or_create_project(
- session=self._task.session if self._task else Task.default_session,
- project_name=self._target_project)
+ session=self._task.session if self._task else Task.default_session, project_name=self._target_project
+ )
@classmethod
def _add_pipeline_name_run_number(cls, task):
@@ -3393,11 +3480,12 @@ class PipelineController(object):
prev_pipelines_ids = task.query_tasks(
task_name=r"^{}(| #\d+)$".format(task_name),
task_filter=dict(
- project=[task.project], system_tags=[cls._tag],
- order_by=['-created'],
+ project=[task.project],
+ system_tags=[cls._tag],
+ order_by=["-created"],
page_size=page_size,
fetch_only_first_page=True,
- )
+ ),
)
max_value = len(prev_pipelines_ids) if prev_pipelines_ids else 0
# we hit the limit
@@ -3408,21 +3496,22 @@ class PipelineController(object):
# we assume we are the latest so let's take a few (last 10) and check the max number
last_task_name = task.query_tasks(
task_filter=dict(task_ids=prev_pipelines_ids[:10], project=[task.project]),
- additional_return_fields=['name'],
+ additional_return_fields=["name"],
) # type: List[Dict]
# let's parse the names
pattern = re.compile(r" #(?P\d+)$")
- task_parts = [pattern.split(t.get('name') or "", 1) for t in last_task_name]
+ task_parts = [pattern.split(t.get("name") or "", 1) for t in last_task_name]
# find the highest number
for parts in task_parts:
if len(parts) >= 2:
try:
- max_value = max(max_value, int(parts[1])+1)
+ max_value = max(max_value, int(parts[1]) + 1)
except (TypeError, ValueError):
pass
except Exception as ex:
- getLogger('clearml.automation.controller').warning(
- 'Pipeline auto run increment failed (skipping): {}'.format(ex))
+ getLogger("clearml.automation.controller").warning(
+ "Pipeline auto run increment failed (skipping): {}".format(ex)
+ )
max_value = 0
if max_value > 1:
@@ -3459,7 +3548,7 @@ class PipelineController(object):
:param str step_ref_string: For example ``"${step1.parameters.Args/param}"``
:return: If step reference is used, return the pipeline step name, otherwise return None
"""
- parts = step_ref_string[2:-1].split('.')
+ parts = step_ref_string[2:-1].split(".")
v = step_ref_string
if len(parts) < 2:
raise ValueError("Node '{}', parameter '{}' is invalid".format(node.name, v))
@@ -3474,30 +3563,33 @@ class PipelineController(object):
if prev_step not in self._nodes:
raise ValueError("Node '{}', parameter '{}', step name '{}' is invalid".format(node.name, v, prev_step))
- if input_type not in ('artifacts', 'parameters', 'models', 'id'):
- raise ValueError(
- "Node {}, parameter '{}', input type '{}' is invalid".format(node.name, v, input_type))
+ if input_type not in ("artifacts", "parameters", "models", "id"):
+ raise ValueError("Node {}, parameter '{}', input type '{}' is invalid".format(node.name, v, input_type))
- if input_type != 'id' and len(parts) < 3:
+ if input_type != "id" and len(parts) < 3:
raise ValueError("Node '{}', parameter '{}' is invalid".format(node.name, v))
- if input_type == 'models':
+ if input_type == "models":
try:
model_type = parts[2].lower()
except Exception:
raise ValueError(
"Node '{}', parameter '{}', input type '{}', model_type is missing {}".format(
- node.name, v, input_type, parts))
- if model_type not in ('input', 'output'):
+ node.name, v, input_type, parts
+ )
+ )
+ if model_type not in ("input", "output"):
raise ValueError(
"Node '{}', parameter '{}', input type '{}', "
- "model_type is invalid (input/output) found {}".format(
- node.name, v, input_type, model_type))
+ "model_type is invalid (input/output) found {}".format(node.name, v, input_type, model_type)
+ )
if len(parts) < 4:
raise ValueError(
"Node '{}', parameter '{}', input type '{}', model index is missing".format(
- node.name, v, input_type))
+ node.name, v, input_type
+ )
+ )
# check casting
try:
@@ -3505,17 +3597,23 @@ class PipelineController(object):
except Exception:
raise ValueError(
"Node '{}', parameter '{}', input type '{}', model index is missing {}".format(
- node.name, v, input_type, parts))
+ node.name, v, input_type, parts
+ )
+ )
if len(parts) < 5:
raise ValueError(
"Node '{}', parameter '{}', input type '{}', model property is missing".format(
- node.name, v, input_type))
+ node.name, v, input_type
+ )
+ )
if not hasattr(BaseModel, parts[4]):
raise ValueError(
"Node '{}', parameter '{}', input type '{}', model property is invalid {}".format(
- node.name, v, input_type, parts[4]))
+ node.name, v, input_type, parts[4]
+ )
+ )
return prev_step
def __parse_step_reference(self, step_ref_string):
@@ -3524,7 +3622,7 @@ class PipelineController(object):
:param step_ref_string: reference string of the form ${step_name.type.value}"
:return: str with value
"""
- parts = step_ref_string[2:-1].split('.')
+ parts = step_ref_string[2:-1].split(".")
if len(parts) < 2:
raise ValueError("Could not parse reference '{}'".format(step_ref_string))
prev_step = parts[0]
@@ -3533,72 +3631,101 @@ class PipelineController(object):
# check if we reference the pipeline arguments themselves
if prev_step == self._pipeline_step_ref:
if parts[1] not in self._pipeline_args:
- raise ValueError("Could not parse reference '{}', "
- "pipeline argument '{}' could not be found".format(step_ref_string, parts[1]))
+ raise ValueError(
+ "Could not parse reference '{}', "
+ "pipeline argument '{}' could not be found".format(step_ref_string, parts[1])
+ )
return self._pipeline_args[parts[1]]
if prev_step not in self._nodes or (
- not self._nodes[prev_step].job and
- not self._nodes[prev_step].executed and
- not self._nodes[prev_step].base_task_id
+ not self._nodes[prev_step].job
+ and not self._nodes[prev_step].executed
+ and not self._nodes[prev_step].base_task_id
):
- raise ValueError("Could not parse reference '{}', step '{}' could not be found".format(
- step_ref_string, prev_step))
+ raise ValueError(
+ "Could not parse reference '{}', step '{}' could not be found".format(step_ref_string, prev_step)
+ )
if input_type not in (
- 'artifacts', 'parameters', 'models', 'id',
- 'script', 'execution', 'container', 'output',
- 'comment', 'models', 'tags', 'system_tags', 'project'):
+ "artifacts",
+ "parameters",
+ "models",
+ "id",
+ "script",
+ "execution",
+ "container",
+ "output",
+ "comment",
+ "models",
+ "tags",
+ "system_tags",
+ "project",
+ ):
raise ValueError("Could not parse reference '{}', type '{}' not valid".format(step_ref_string, input_type))
- if input_type != 'id' and len(parts) < 3:
+ if input_type != "id" and len(parts) < 3:
raise ValueError("Could not parse reference '{}', missing fields in '{}'".format(step_ref_string, parts))
- task = self._nodes[prev_step].job.task if self._nodes[prev_step].job \
+ task = (
+ self._nodes[prev_step].job.task
+ if self._nodes[prev_step].job
else Task.get_task(task_id=self._nodes[prev_step].executed or self._nodes[prev_step].base_task_id)
+ )
task.reload()
- if input_type == 'artifacts':
+ if input_type == "artifacts":
# fix \. to use . in artifacts
- artifact_path = ('.'.join(parts[2:])).replace('\\.', '\\_dot_\\')
- artifact_path = artifact_path.split('.')
+ artifact_path = (".".join(parts[2:])).replace("\\.", "\\_dot_\\")
+ artifact_path = artifact_path.split(".")
obj = task.artifacts
for p in artifact_path:
- p = p.replace('\\_dot_\\', '.')
+ p = p.replace("\\_dot_\\", ".")
if isinstance(obj, dict):
obj = obj.get(p)
elif hasattr(obj, p):
obj = getattr(obj, p)
else:
- raise ValueError("Could not locate artifact {} on previous step {}".format(
- '.'.join(parts[1:]), prev_step))
+ raise ValueError(
+ "Could not locate artifact {} on previous step {}".format(".".join(parts[1:]), prev_step)
+ )
return str(obj)
- elif input_type == 'parameters':
+ elif input_type == "parameters":
step_params = task.get_parameters()
- param_name = '.'.join(parts[2:])
+ param_name = ".".join(parts[2:])
if param_name not in step_params:
- raise ValueError("Could not locate parameter {} on previous step {}".format(
- '.'.join(parts[1:]), prev_step))
+ raise ValueError(
+ "Could not locate parameter {} on previous step {}".format(".".join(parts[1:]), prev_step)
+ )
return step_params.get(param_name)
- elif input_type == 'models':
+ elif input_type == "models":
model_type = parts[2].lower()
- if model_type not in ('input', 'output'):
- raise ValueError("Could not locate model {} on previous step {}".format(
- '.'.join(parts[1:]), prev_step))
+ if model_type not in ("input", "output"):
+ raise ValueError("Could not locate model {} on previous step {}".format(".".join(parts[1:]), prev_step))
try:
model_idx = int(parts[3])
model = task.models[model_type][model_idx]
except Exception:
- raise ValueError("Could not locate model {} on previous step {}, index {} is invalid".format(
- '.'.join(parts[1:]), prev_step, parts[3]))
+ raise ValueError(
+ "Could not locate model {} on previous step {}, index {} is invalid".format(
+ ".".join(parts[1:]), prev_step, parts[3]
+ )
+ )
return str(getattr(model, parts[4]))
- elif input_type == 'id':
+ elif input_type == "id":
return task.id
elif input_type in (
- 'script', 'execution', 'container', 'output',
- 'comment', 'models', 'tags', 'system_tags', 'project'):
+ "script",
+ "execution",
+ "container",
+ "output",
+ "comment",
+ "models",
+ "tags",
+ "system_tags",
+ "project",
+ ):
# noinspection PyProtectedMember
- return task._get_task_property('.'.join(parts[1:]))
+ return task._get_task_property(".".join(parts[1:]))
return None
@@ -3606,7 +3733,7 @@ class PipelineController(object):
def __create_task_link(cls, a_node, task_link_template):
# type: (PipelineController.Node, str) -> str
if not a_node:
- return ''
+ return ""
# create the detailed parameter table
task_id = project_id = None
if a_node.job:
@@ -3621,11 +3748,11 @@ class PipelineController(object):
try:
project_id = Task.get_task(task_id=task_id).project
except Exception:
- project_id = '*'
+ project_id = "*"
cls._task_project_lookup[task_id] = project_id
if not task_id:
- return ''
+ return ""
return ' {} '.format(task_link_template.format(project=project_id, task=task_id), task_id)
@@ -3638,10 +3765,9 @@ class PipelineController(object):
artifact_object=artifact_object,
wait_on_upload=True,
extension_name=(
- ".pkl" if isinstance(artifact_object, dict) and not self._artifact_serialization_function
- else None
+ ".pkl" if isinstance(artifact_object, dict) and not self._artifact_serialization_function else None
),
- serialization_function=self._artifact_serialization_function
+ serialization_function=self._artifact_serialization_function,
)
@@ -3649,7 +3775,7 @@ class PipelineDecorator(PipelineController):
_added_decorator = [] # type: List[dict]
_ref_lazy_loader_id_to_node_name = {} # type: dict
_singleton = None # type: Optional[PipelineDecorator]
- _eager_step_artifact = 'eager_step'
+ _eager_step_artifact = "eager_step"
_eager_execution_instance = False
_debug_execute_step_process = False
_debug_execute_step_function = False
@@ -3659,29 +3785,29 @@ class PipelineDecorator(PipelineController):
_atexit_registered = False
def __init__(
- self,
- name, # type: str
- project, # type: str
- version=None, # type: Optional[str]
- pool_frequency=0.2, # type: float
- add_pipeline_tags=False, # type: bool
- target_project=None, # type: Optional[str]
- abort_on_failure=False, # type: bool
- add_run_number=True, # type: bool
- retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
- docker=None, # type: Optional[str]
- docker_args=None, # type: Optional[str]
- docker_bash_setup_script=None, # type: Optional[str]
- packages=None, # type: Optional[Union[bool, str, Sequence[str]]]
- repo=None, # type: Optional[str]
- repo_branch=None, # type: Optional[str]
- repo_commit=None, # type: Optional[str]
- artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]]
- artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
- output_uri=None, # type: Optional[Union[str, bool]]
- skip_global_imports=False, # type: bool
- working_dir=None, # type: Optional[str]
- enable_local_imports=True # type: bool
+ self,
+ name, # type: str
+ project, # type: str
+ version=None, # type: Optional[str]
+ pool_frequency=0.2, # type: float
+ add_pipeline_tags=False, # type: bool
+ target_project=None, # type: Optional[str]
+ abort_on_failure=False, # type: bool
+ add_run_number=True, # type: bool
+ retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
+ docker=None, # type: Optional[str]
+ docker_args=None, # type: Optional[str]
+ docker_bash_setup_script=None, # type: Optional[str]
+ packages=None, # type: Optional[Union[bool, str, Sequence[str]]]
+ repo=None, # type: Optional[str]
+ repo_branch=None, # type: Optional[str]
+ repo_commit=None, # type: Optional[str]
+ artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]]
+ artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
+ output_uri=None, # type: Optional[Union[str, bool]]
+ skip_global_imports=False, # type: bool
+ working_dir=None, # type: Optional[str]
+ enable_local_imports=True, # type: bool
):
# type: (...) -> ()
"""
@@ -3798,15 +3924,14 @@ class PipelineDecorator(PipelineController):
output_uri=output_uri,
skip_global_imports=skip_global_imports,
working_dir=working_dir,
- enable_local_imports=enable_local_imports
+ enable_local_imports=enable_local_imports,
)
# if we are in eager execution, make sure parent class knows it
if self._eager_execution_instance:
self._mock_execution = True
if PipelineDecorator._default_execution_queue:
- super(PipelineDecorator, self).set_default_execution_queue(
- PipelineDecorator._default_execution_queue)
+ super(PipelineDecorator, self).set_default_execution_queue(PipelineDecorator._default_execution_queue)
for n in self._added_decorator:
self._add_function_step(**n)
@@ -3891,11 +4016,11 @@ class PipelineDecorator(PipelineController):
# nothing changed, we can sleep
if not completed_jobs and self._running_nodes:
# force updating the pipeline state (plot) at least every 5 min.
- if force_execution_plot_update or time()-last_plot_report > self._update_execution_plot_interval:
+ if force_execution_plot_update or time() - last_plot_report > self._update_execution_plot_interval:
last_plot_report = time()
last_monitor_report = time()
self.update_execution_plot()
- elif time()-last_monitor_report > self._monitor_node_interval:
+ elif time() - last_monitor_report > self._monitor_node_interval:
last_monitor_report = time()
self._scan_monitored_nodes()
continue
@@ -3913,8 +4038,11 @@ class PipelineDecorator(PipelineController):
# check if we need to stop the pipeline, and abort all running steps
if nodes_failed_stop_pipeline:
- print('Aborting pipeline and stopping all running steps, node {} failed'.format(
- nodes_failed_stop_pipeline))
+ print(
+ "Aborting pipeline and stopping all running steps, node {} failed".format(
+ nodes_failed_stop_pipeline
+ )
+ )
break
# update current state (in configuration, so that we could later continue an aborted pipeline)
@@ -3971,14 +4099,14 @@ class PipelineDecorator(PipelineController):
# check if we have a new step on the DAG
eager_artifacts = []
for a in artifacts:
- if a.key and a.key.startswith('{}:'.format(self._eager_step_artifact)):
+ if a.key and a.key.startswith("{}:".format(self._eager_step_artifact)):
# expected value: '"eager_step":"parent-node-task-id":"eager-step-task-id'
eager_artifacts.append(a)
# verify we have the step, if we do not, add it.
delete_artifact_keys = []
for artifact in eager_artifacts:
- _, parent_step_task_id, eager_step_task_id = artifact.key.split(':', 2)
+ _, parent_step_task_id, eager_step_task_id = artifact.key.split(":", 2)
# deserialize node definition
eager_node_def = json.loads(artifact.type_data.preview)
@@ -3998,15 +4126,15 @@ class PipelineDecorator(PipelineController):
# should not happen
continue
- new_step_node_name = '{}_{}'.format(parent_node.name, eager_node_name)
+ new_step_node_name = "{}_{}".format(parent_node.name, eager_node_name)
counter = 1
while new_step_node_name in self._nodes:
- new_step_node_name = '{}_{}'.format(new_step_node_name, counter)
+ new_step_node_name = "{}_{}".format(new_step_node_name, counter)
counter += 1
- eager_node_def['name'] = new_step_node_name
- eager_node_def['parents'] = [parent_node.name]
- is_cached = eager_node_def.pop('is_cached', None)
+ eager_node_def["name"] = new_step_node_name
+ eager_node_def["parents"] = [parent_node.name]
+ is_cached = eager_node_def.pop("is_cached", None)
self._nodes[new_step_node_name] = self.Node(**eager_node_def)
self._nodes[new_step_node_name].job = RunningJob(existing_task=eager_step_task_id)
if is_cached:
@@ -4041,12 +4169,12 @@ class PipelineDecorator(PipelineController):
commit,
helper_functions,
output_uri=None,
- working_dir=None
+ working_dir=None,
):
def sanitize(function_source):
matched = re.match(r"[\s]*@[\w]*.component[\s\\]*\(", function_source)
if matched:
- function_source = function_source[matched.span()[1]:]
+ function_source = function_source[matched.span()[1] :]
# find the last ")"
open_parenthesis = 0
last_index = -1
@@ -4059,7 +4187,7 @@ class PipelineDecorator(PipelineController):
elif c == "(":
open_parenthesis += 1
if last_index >= 0:
- function_source = function_source[last_index + 1:].lstrip()
+ function_source = function_source[last_index + 1 :].lstrip()
return function_source
task_definition = CreateFromFunction.create_task_from_function(
@@ -4087,7 +4215,7 @@ class PipelineDecorator(PipelineController):
artifact_serialization_function=self._artifact_serialization_function,
artifact_deserialization_function=self._artifact_deserialization_function,
skip_global_imports=self._skip_global_imports,
- working_dir=working_dir
+ working_dir=working_dir,
)
return task_definition
@@ -4105,15 +4233,16 @@ class PipelineDecorator(PipelineController):
:param task_hash: Task representation dict
:return: Adjusted Task representation dict
"""
- if task_hash.get('hyper_params'):
+ if task_hash.get("hyper_params"):
updated_params = {}
- for k, v in task_hash['hyper_params'].items():
- if k.startswith("{}/".format(CreateFromFunction.input_artifact_section)) and \
- str(v).startswith("{}.".format(self._task.id)):
+ for k, v in task_hash["hyper_params"].items():
+ if k.startswith("{}/".format(CreateFromFunction.input_artifact_section)) and str(v).startswith(
+ "{}.".format(self._task.id)
+ ):
task_id, artifact_name = str(v).split(".", 1)
if artifact_name in self._task.artifacts:
updated_params[k] = self._task.artifacts[artifact_name].hash
- task_hash['hyper_params'].update(updated_params)
+ task_hash["hyper_params"].update(updated_params)
return task_hash
@@ -4139,37 +4268,38 @@ class PipelineDecorator(PipelineController):
@classmethod
def component(
- cls,
- _func=None, *,
- return_values=('return_object', ), # type: Union[str, Sequence[str]]
- name=None, # type: Optional[str]
- cache=False, # type: bool
- packages=None, # type: Optional[Union[bool, str, Sequence[str]]]
- parents=None, # type: Optional[List[str]]
- execution_queue=None, # type: Optional[str]
- continue_on_fail=False, # type: bool
- docker=None, # type: Optional[str]
- docker_args=None, # type: Optional[str]
- docker_bash_setup_script=None, # type: Optional[str]
- task_type=None, # type: Optional[str]
- auto_connect_frameworks=None, # type: Optional[dict]
- auto_connect_arg_parser=None, # type: Optional[dict]
- repo=None, # type: Optional[str]
- repo_branch=None, # type: Optional[str]
- repo_commit=None, # type: Optional[str]
- helper_functions=None, # type: Optional[Sequence[Callable]]
- monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]]
- monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
- monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
- retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
- pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
- post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
- status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
- tags=None, # type: Optional[Union[str, Sequence[str]]]
- output_uri=None, # type: Optional[Union[str, bool]]
- draft=False, # type: Optional[bool]
- working_dir=None, # type: Optional[str]
- continue_behaviour=None # type: Optional[dict]
+ cls,
+ _func=None,
+ *,
+ return_values=("return_object",), # type: Union[str, Sequence[str]]
+ name=None, # type: Optional[str]
+ cache=False, # type: bool
+ packages=None, # type: Optional[Union[bool, str, Sequence[str]]]
+ parents=None, # type: Optional[List[str]]
+ execution_queue=None, # type: Optional[str]
+ continue_on_fail=False, # type: bool
+ docker=None, # type: Optional[str]
+ docker_args=None, # type: Optional[str]
+ docker_bash_setup_script=None, # type: Optional[str]
+ task_type=None, # type: Optional[str]
+ auto_connect_frameworks=None, # type: Optional[dict]
+ auto_connect_arg_parser=None, # type: Optional[dict]
+ repo=None, # type: Optional[str]
+ repo_branch=None, # type: Optional[str]
+ repo_commit=None, # type: Optional[str]
+ helper_functions=None, # type: Optional[Sequence[Callable]]
+ monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]]
+ monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
+ monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
+ retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
+ pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
+ post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
+ status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
+ tags=None, # type: Optional[Union[str, Sequence[str]]]
+ output_uri=None, # type: Optional[Union[str, bool]]
+ draft=False, # type: Optional[bool]
+ working_dir=None, # type: Optional[str]
+ continue_behaviour=None # type: Optional[dict]
):
# type: (...) -> Callable
"""
@@ -4330,6 +4460,7 @@ class PipelineDecorator(PipelineController):
:return: function wrapper
"""
+
def decorator_wrap(func):
if continue_on_fail:
warnings.warn("`continue_on_fail` is deprecated. Use `continue_behaviour` instead", DeprecationWarning)
@@ -4343,7 +4474,7 @@ class PipelineDecorator(PipelineController):
# add default argument values
if inspect_func.args:
default_values = list(inspect_func.defaults or [])
- default_values = ([None] * (len(inspect_func.args)-len(default_values))) + default_values
+ default_values = ([None] * (len(inspect_func.args) - len(default_values))) + default_values
function_kwargs = {k: v for k, v in zip(inspect_func.args, default_values)}
else:
function_kwargs = dict()
@@ -4378,7 +4509,7 @@ class PipelineDecorator(PipelineController):
output_uri=output_uri,
draft=draft,
working_dir=working_dir,
- continue_behaviour=continue_behaviour
+ continue_behaviour=continue_behaviour,
)
if cls._singleton:
@@ -4390,9 +4521,11 @@ class PipelineDecorator(PipelineController):
def wrapper(*args, **kwargs):
if cls._debug_execute_step_function:
args = walk_nested_dict_tuple_list(
- args, lambda x: x._remoteref() if isinstance(x, LazyEvalWrapper) else x)
+ args, lambda x: x._remoteref() if isinstance(x, LazyEvalWrapper) else x
+ )
kwargs = walk_nested_dict_tuple_list(
- kwargs, lambda x: x._remoteref() if isinstance(x, LazyEvalWrapper) else x)
+ kwargs, lambda x: x._remoteref() if isinstance(x, LazyEvalWrapper) else x
+ )
func_return = []
@@ -4405,14 +4538,18 @@ class PipelineDecorator(PipelineController):
if len(function_return) == 1:
ret_val = LazyEvalWrapper(
callback=functools.partial(result_wrapper, func_return, None),
- remote_reference=functools.partial(result_wrapper, func_return, None))
+ remote_reference=functools.partial(result_wrapper, func_return, None),
+ )
cls._ref_lazy_loader_id_to_node_name[id(ret_val)] = _name
return ret_val
else:
- return_w = [LazyEvalWrapper(
- callback=functools.partial(result_wrapper, func_return, i),
- remote_reference=functools.partial(result_wrapper, func_return, i))
- for i, _ in enumerate(function_return)]
+ return_w = [
+ LazyEvalWrapper(
+ callback=functools.partial(result_wrapper, func_return, i),
+ remote_reference=functools.partial(result_wrapper, func_return, i),
+ )
+ for i, _ in enumerate(function_return)
+ ]
for i in return_w:
cls._ref_lazy_loader_id_to_node_name[id(i)] = _name
return return_w
@@ -4430,8 +4567,7 @@ class PipelineDecorator(PipelineController):
kwargs_artifacts.update(
{
k: walk_nested_dict_tuple_list(
- v,
- lambda x: x._remoteref() if isinstance(x, LazyEvalWrapper) else x
+ v, lambda x: x._remoteref() if isinstance(x, LazyEvalWrapper) else x
)
for k, v in kwargs.items()
if isinstance(v, LazyEvalWrapper)
@@ -4448,22 +4584,22 @@ class PipelineDecorator(PipelineController):
PipelineDecorator._eager_execution_instance = True
a_pipeline = PipelineDecorator(
name=name,
- project='DevOps', # it will not actually be used
- version='0.0.0',
+ project="DevOps", # it will not actually be used
+ version="0.0.0",
pool_frequency=111,
add_pipeline_tags=False,
target_project=None,
)
- target_queue = \
- PipelineDecorator._default_execution_queue or \
- Task.current_task().data.execution.queue
+ target_queue = (
+ PipelineDecorator._default_execution_queue or Task.current_task().data.execution.queue
+ )
if target_queue:
PipelineDecorator.set_default_execution_queue(target_queue)
else:
# if we are not running from a queue, we are probably in debug mode
a_pipeline._clearml_job_class = LocalClearmlJob
- a_pipeline._default_execution_queue = 'mock'
+ a_pipeline._default_execution_queue = "mock"
# restore tags, the pipeline might add a few
Task.current_task().set_tags(original_tags[0])
@@ -4487,9 +4623,9 @@ class PipelineDecorator(PipelineController):
# Note that for the first iteration (when `_node.name == _node_name`)
# we always increment the name, as the name is always in `_launched_step_names`
while _node.name in cls._singleton._launched_step_names or (
- _node.name in cls._singleton._nodes and
- cls._singleton._nodes[_node.name].job_code_section !=
- cls._singleton._nodes[_node_name].job_code_section
+ _node.name in cls._singleton._nodes
+ and cls._singleton._nodes[_node.name].job_code_section
+ != cls._singleton._nodes[_node_name].job_code_section
):
_node.name = "{}_{}".format(_node_name, counter)
counter += 1
@@ -4497,9 +4633,13 @@ class PipelineDecorator(PipelineController):
if cls._singleton._pre_step_callbacks.get(_node_name):
cls._singleton._pre_step_callbacks[_node.name] = cls._singleton._pre_step_callbacks[_node_name]
if cls._singleton._post_step_callbacks.get(_node_name):
- cls._singleton._post_step_callbacks[_node.name] = cls._singleton._post_step_callbacks[_node_name]
+ cls._singleton._post_step_callbacks[_node.name] = cls._singleton._post_step_callbacks[
+ _node_name
+ ]
if cls._singleton._status_change_callbacks.get(_node_name):
- cls._singleton._status_change_callbacks[_node.name] = cls._singleton._status_change_callbacks[_node_name]
+ cls._singleton._status_change_callbacks[_node.name] = cls._singleton._status_change_callbacks[
+ _node_name
+ ]
_node_name = _node.name
if _node.name not in cls._singleton._nodes:
cls._singleton._nodes[_node.name] = _node
@@ -4508,14 +4648,23 @@ class PipelineDecorator(PipelineController):
cls._singleton._launched_step_names.add(_node_name)
_node = cls._singleton._nodes[_node_name]
cls._retries[_node_name] = 0
- cls._retries_callbacks[_node_name] = retry_on_failure if callable(retry_on_failure) else \
- (functools.partial(cls._singleton._default_retry_on_failure_callback, max_retries=retry_on_failure)
- if isinstance(retry_on_failure, int) else cls._singleton._retry_on_failure_callback)
+ cls._retries_callbacks[_node_name] = (
+ retry_on_failure
+ if callable(retry_on_failure)
+ else (
+ functools.partial(
+ cls._singleton._default_retry_on_failure_callback, max_retries=retry_on_failure
+ )
+ if isinstance(retry_on_failure, int)
+ else cls._singleton._retry_on_failure_callback
+ )
+ )
# The actual launch is a bit slow, we run it in the background
launch_thread = Thread(
target=cls._component_launch,
- args=(_node_name, _node, kwargs_artifacts, kwargs, current_thread().ident))
+ args=(_node_name, _node, kwargs_artifacts, kwargs, current_thread().ident),
+ )
def results_reference(return_name):
# wait until launch is completed
@@ -4533,7 +4682,8 @@ class PipelineDecorator(PipelineController):
if _node.job.is_failed() and not _node.continue_on_fail:
raise ValueError(
- 'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id()))
+ 'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id())
+ )
_node.executed = _node.job.task_id()
return "{}.{}".format(_node.job.task_id(), return_name)
@@ -4551,8 +4701,9 @@ class PipelineDecorator(PipelineController):
return None
cls._wait_for_node(_node)
- if (_node.job.is_failed() and not _node.continue_on_fail) or \
- (_node.job.is_aborted() and not _node.continue_on_abort):
+ if (_node.job.is_failed() and not _node.continue_on_fail) or (
+ _node.job.is_aborted() and not _node.continue_on_abort
+ ):
raise ValueError(
'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id())
)
@@ -4574,9 +4725,13 @@ class PipelineDecorator(PipelineController):
)
return task.get_parameters(cast=True).get(CreateFromFunction.return_section + "/" + return_name)
- return_w = [LazyEvalWrapper(
- callback=functools.partial(result_wrapper, n),
- remote_reference=functools.partial(results_reference, n)) for n in function_return]
+ return_w = [
+ LazyEvalWrapper(
+ callback=functools.partial(result_wrapper, n),
+ remote_reference=functools.partial(results_reference, n),
+ )
+ for n in function_return
+ ]
for i in return_w:
cls._ref_lazy_loader_id_to_node_name[id(i)] = _node_name
@@ -4591,36 +4746,37 @@ class PipelineDecorator(PipelineController):
@classmethod
def pipeline(
- cls,
- _func=None, *, # noqa
- name, # type: str
- project, # type: str
- version=None, # type: Optional[str]
- return_value=None, # type: Optional[str]
- default_queue=None, # type: Optional[str]
- pool_frequency=0.2, # type: float
- add_pipeline_tags=False, # type: bool
- target_project=None, # type: Optional[str]
- abort_on_failure=False, # type: bool
- pipeline_execution_queue='services', # type: Optional[str]
- multi_instance_support=False, # type: bool
- add_run_number=True, # type: bool
- args_map=None, # type: dict[str, List[str]]
- start_controller_locally=False, # type: bool
- retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
- docker=None, # type: Optional[str]
- docker_args=None, # type: Optional[str]
- docker_bash_setup_script=None, # type: Optional[str]
- packages=None, # type: Optional[Union[bool, str, Sequence[str]]]
- repo=None, # type: Optional[str]
- repo_branch=None, # type: Optional[str]
- repo_commit=None, # type: Optional[str]
- artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]]
- artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
- output_uri=None, # type: Optional[Union[str, bool]]
- skip_global_imports=False, # type: bool
- working_dir=None, # type: Optional[str]
- enable_local_imports=True # type: bool
+ cls,
+ _func=None,
+ *, # noqa
+ name, # type: str
+ project, # type: str
+ version=None, # type: Optional[str]
+ return_value=None, # type: Optional[str]
+ default_queue=None, # type: Optional[str]
+ pool_frequency=0.2, # type: float
+ add_pipeline_tags=False, # type: bool
+ target_project=None, # type: Optional[str]
+ abort_on_failure=False, # type: bool
+ pipeline_execution_queue="services", # type: Optional[str]
+ multi_instance_support=False, # type: bool
+ add_run_number=True, # type: bool
+ args_map=None, # type: dict[str, List[str]]
+ start_controller_locally=False, # type: bool
+ retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
+ docker=None, # type: Optional[str]
+ docker_args=None, # type: Optional[str]
+ docker_bash_setup_script=None, # type: Optional[str]
+ packages=None, # type: Optional[Union[bool, str, Sequence[str]]]
+ repo=None, # type: Optional[str]
+ repo_branch=None, # type: Optional[str]
+ repo_commit=None, # type: Optional[str]
+ artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]]
+ artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
+ output_uri=None, # type: Optional[Union[str, bool]]
+ skip_global_imports=False, # type: bool
+ working_dir=None, # type: Optional[str]
+ enable_local_imports=True # type: bool
):
# type: (...) -> Callable
"""
@@ -4745,8 +4901,8 @@ class PipelineDecorator(PipelineController):
If False, the directory won't be appended to PYTHONPATH. Default is True.
Ignored while running remotely.
"""
- def decorator_wrap(func):
+ def decorator_wrap(func):
def internal_decorator(*args, **kwargs):
pipeline_kwargs = dict(**(kwargs or {}))
pipeline_kwargs_types = dict()
@@ -4766,8 +4922,7 @@ class PipelineDecorator(PipelineController):
pipeline_kwargs = default_kwargs
if inspect_func.annotations:
- pipeline_kwargs_types = {
- str(k): inspect_func.annotations[k] for k in inspect_func.annotations}
+ pipeline_kwargs_types = {str(k): inspect_func.annotations[k] for k in inspect_func.annotations}
# run the entire pipeline locally, as python functions
if cls._debug_execute_step_function:
@@ -4793,7 +4948,7 @@ class PipelineDecorator(PipelineController):
output_uri=output_uri,
skip_global_imports=skip_global_imports,
working_dir=working_dir,
- enable_local_imports=enable_local_imports
+ enable_local_imports=enable_local_imports,
)
ret_val = func(**pipeline_kwargs)
LazyEvalWrapper.trigger_all_remote_references()
@@ -4812,7 +4967,8 @@ class PipelineDecorator(PipelineController):
try:
# noinspection PyProtectedMember
multi_pipeline_call_counter = int(
- t._get_runtime_properties().get('multi_pipeline_counter', None))
+ t._get_runtime_properties().get("multi_pipeline_counter", None)
+ )
# NOTICE! if this is not our call we LEAVE immediately
# check if this is our call to start, if not we will wait for the next one
@@ -4848,34 +5004,36 @@ class PipelineDecorator(PipelineController):
output_uri=output_uri,
skip_global_imports=skip_global_imports,
working_dir=working_dir,
- enable_local_imports=enable_local_imports
+ enable_local_imports=enable_local_imports,
)
a_pipeline._args_map = args_map or {}
if PipelineDecorator._debug_execute_step_process:
a_pipeline._clearml_job_class = LocalClearmlJob
- a_pipeline._default_execution_queue = 'mock'
+ a_pipeline._default_execution_queue = "mock"
a_pipeline._clearml_job_class.register_hashing_callback(a_pipeline._adjust_task_hashing)
# add pipeline arguments
for k in pipeline_kwargs:
a_pipeline.add_parameter(
- name=k,
- default=pipeline_kwargs.get(k),
- param_type=pipeline_kwargs_types.get(k)
+ name=k, default=pipeline_kwargs.get(k), param_type=pipeline_kwargs_types.get(k)
)
# sync multi-pipeline call counter (so we know which one to skip)
if Task.running_locally() and multi_instance_support and cls._multi_pipeline_call_counter >= 0:
# noinspection PyProtectedMember
a_pipeline._task._set_runtime_properties(
- dict(multi_pipeline_counter=str(cls._multi_pipeline_call_counter)))
+ dict(multi_pipeline_counter=str(cls._multi_pipeline_call_counter))
+ )
# run the actual pipeline
- if not start_controller_locally and \
- not PipelineDecorator._debug_execute_step_process and pipeline_execution_queue:
+ if (
+ not start_controller_locally
+ and not PipelineDecorator._debug_execute_step_process
+ and pipeline_execution_queue
+ ):
# rerun the pipeline on a remote machine
a_pipeline._task.execute_remotely(queue_name=pipeline_execution_queue)
# when we get here it means we are running remotely
@@ -4931,7 +5089,8 @@ class PipelineDecorator(PipelineController):
if multi_instance_support:
return cls._multi_pipeline_wrapper(
- func=internal_decorator, parallel=bool(multi_instance_support == 'parallel'))
+ func=internal_decorator, parallel=bool(multi_instance_support == "parallel")
+ )
return internal_decorator
@@ -5003,8 +5162,8 @@ class PipelineDecorator(PipelineController):
kwargs.pop(k, None)
_node.parameters.pop("{}/{}".format(CreateFromFunction.kwargs_section, k), None)
_node.parameters["{}/{}".format(CreateFromFunction.input_artifact_section, k)] = v
- if v and '.' in str(v):
- parent_id, _ = str(v).split('.', 1)
+ if v and "." in str(v):
+ parent_id, _ = str(v).split(".", 1)
# find parent and push it into the _node.parents
for n, node in sorted(list(cls._singleton._nodes.items()), reverse=True):
if n != _node.name and node.executed and node.executed == parent_id:
@@ -5013,16 +5172,18 @@ class PipelineDecorator(PipelineController):
break
leaves = cls._singleton._find_executed_node_leaves()
- _node.parents = (_node.parents or []) + [
- x for x in cls._evaluated_return_values.get(tid, []) if x in leaves
- ]
+ _node.parents = (_node.parents or []) + [x for x in cls._evaluated_return_values.get(tid, []) if x in leaves]
if not cls._singleton._abort_running_steps_on_failure:
for parent in _node.parents:
parent = cls._singleton._nodes[parent]
- if parent.status == "failed" and parent.skip_children_on_fail or \
- parent.status == "aborted" and parent.skip_children_on_abort or \
- parent.status == "skipped":
+ if (
+ parent.status == "failed"
+ and parent.skip_children_on_fail
+ or parent.status == "aborted"
+ and parent.skip_children_on_abort
+ or parent.status == "skipped"
+ ):
_node.skip_job = True
return
@@ -5031,10 +5192,11 @@ class PipelineDecorator(PipelineController):
_node.parameters["{}/{}".format(CreateFromFunction.kwargs_section, k)] = v
else:
# we need to create an artifact
- artifact_name = 'result_{}_{}'.format(re.sub(r'\W+', '', _node.name), k)
+ artifact_name = "result_{}_{}".format(re.sub(r"\W+", "", _node.name), k)
cls._singleton._upload_pipeline_artifact(artifact_name=artifact_name, artifact_object=v)
- _node.parameters["{}/{}".format(CreateFromFunction.input_artifact_section, k)] = \
- "{}.{}".format(cls._singleton._task.id, artifact_name)
+ _node.parameters["{}/{}".format(CreateFromFunction.input_artifact_section, k)] = "{}.{}".format(
+ cls._singleton._task.id, artifact_name
+ )
# verify the new step
cls._singleton._verify_node(_node)
@@ -5043,8 +5205,7 @@ class PipelineDecorator(PipelineController):
# check if we generated the pipeline we need to update the new eager step
if PipelineDecorator._eager_execution_instance and _node.job:
# check if we need to add the pipeline tag on the new node
- pipeline_tags = [t for t in Task.current_task().get_tags() or []
- if str(t).startswith(cls._node_tag_prefix)]
+ pipeline_tags = [t for t in Task.current_task().get_tags() or [] if str(t).startswith(cls._node_tag_prefix)]
if pipeline_tags and _node.job and _node.job.task:
pipeline_tags = list(set((_node.job.task.get_tags() or []) + pipeline_tags))
_node.job.task.set_tags(pipeline_tags)
@@ -5054,19 +5215,19 @@ class PipelineDecorator(PipelineController):
pipeline_dag = cls._singleton._serialize()
# check if node is cached
if _node.job.is_cached_task():
- pipeline_dag[_node_name]['is_cached'] = True
+ pipeline_dag[_node_name]["is_cached"] = True
# store entire definition on the parent pipeline
from clearml.backend_api.services import tasks
+
artifact = tasks.Artifact(
- key='{}:{}:{}'.format(cls._eager_step_artifact, Task.current_task().id, _node.job.task_id()),
+ key="{}:{}:{}".format(cls._eager_step_artifact, Task.current_task().id, _node.job.task_id()),
type="json",
- mode='output',
+ mode="output",
type_data=tasks.ArtifactTypeData(
- preview=json.dumps({_node_name: pipeline_dag[_node_name]}),
- content_type='application/pipeline')
+ preview=json.dumps({_node_name: pipeline_dag[_node_name]}), content_type="application/pipeline"
+ ),
)
- req = tasks.AddOrUpdateArtifactsRequest(
- task=Task.current_task().parent, artifacts=[artifact], force=True)
+ req = tasks.AddOrUpdateArtifactsRequest(task=Task.current_task().parent, artifacts=[artifact], force=True)
res = Task.current_task().send(req, raise_on_errors=False)
if not res or not res.response or not res.response.updated:
pass
@@ -5076,9 +5237,9 @@ class PipelineDecorator(PipelineController):
@classmethod
def _multi_pipeline_wrapper(
- cls,
- func=None, # type: Callable
- parallel=False, # type: bool
+ cls,
+ func=None, # type: Callable
+ parallel=False, # type: bool
):
# type: (...) -> Callable
"""
@@ -5112,10 +5273,10 @@ class PipelineDecorator(PipelineController):
return func(*args, **kwargs)
def sanitized_env(a_queue, *a_args, **a_kwargs):
- os.environ.pop('CLEARML_PROC_MASTER_ID', None)
- os.environ.pop('TRAINS_PROC_MASTER_ID', None)
- os.environ.pop('CLEARML_TASK_ID', None)
- os.environ.pop('TRAINS_TASK_ID', None)
+ os.environ.pop("CLEARML_PROC_MASTER_ID", None)
+ os.environ.pop("TRAINS_PROC_MASTER_ID", None)
+ os.environ.pop("CLEARML_TASK_ID", None)
+ os.environ.pop("TRAINS_TASK_ID", None)
if Task.current_task():
# noinspection PyProtectedMember
Task.current_task()._reset_current_task_obj()
@@ -5127,7 +5288,7 @@ class PipelineDecorator(PipelineController):
queue = Queue()
- p = Process(target=sanitized_env, args=(queue, ) + args, kwargs=kwargs)
+ p = Process(target=sanitized_env, args=(queue,) + args, kwargs=kwargs)
# make sure we wait for the subprocess.
p.daemon = False
p.start()
@@ -5159,7 +5320,7 @@ class PipelineDecorator(PipelineController):
results = []
if not cls._multi_pipeline_instances:
return results
- print('Waiting for background pipelines to finish')
+ print("Waiting for background pipelines to finish")
for p, queue in cls._multi_pipeline_instances:
try:
p.join()
@@ -5180,6 +5341,7 @@ class PipelineDecorator(PipelineController):
if not Task.current_task():
return
from clearml.backend_api.services import events
+
res = Task.current_task().send(
events.GetTaskPlotsRequest(task=pipeline_task_id, iters=1),
raise_on_errors=False,
@@ -5189,25 +5351,32 @@ class PipelineDecorator(PipelineController):
execution_details = None
for p in res.response.plots:
try:
- if p['metric'] == cls._report_plot_execution_flow['title'] and \
- p['variant'] == cls._report_plot_execution_flow['series']:
- execution_flow = json.loads(p['plot_str'])
+ if (
+ p["metric"] == cls._report_plot_execution_flow["title"]
+ and p["variant"] == cls._report_plot_execution_flow["series"]
+ ):
+ execution_flow = json.loads(p["plot_str"])
- elif p['metric'] == cls._report_plot_execution_details['title'] and \
- p['variant'] == cls._report_plot_execution_details['series']:
- execution_details = json.loads(p['plot_str'])
- execution_details['layout']['name'] += ' - ' + str(pipeline_task_id)
+ elif (
+ p["metric"] == cls._report_plot_execution_details["title"]
+ and p["variant"] == cls._report_plot_execution_details["series"]
+ ):
+ execution_details = json.loads(p["plot_str"])
+ execution_details["layout"]["name"] += " - " + str(pipeline_task_id)
except Exception as ex:
- getLogger('clearml.automation.controller').warning(
- 'Multi-pipeline plot update failed: {}'.format(ex))
+ getLogger("clearml.automation.controller").warning("Multi-pipeline plot update failed: {}".format(ex))
if execution_flow:
Task.current_task().get_logger().report_plotly(
- title=cls._report_plot_execution_flow['title'],
- series='{} - {}'.format(cls._report_plot_execution_flow['series'], pipeline_task_id),
- iteration=0, figure=execution_flow)
+ title=cls._report_plot_execution_flow["title"],
+ series="{} - {}".format(cls._report_plot_execution_flow["series"], pipeline_task_id),
+ iteration=0,
+ figure=execution_flow,
+ )
if execution_details:
Task.current_task().get_logger().report_plotly(
- title=cls._report_plot_execution_details['title'],
- series='{} - {}'.format(cls._report_plot_execution_details['series'], pipeline_task_id),
- iteration=0, figure=execution_details)
+ title=cls._report_plot_execution_details["title"],
+ series="{} - {}".format(cls._report_plot_execution_details["series"], pipeline_task_id),
+ iteration=0,
+ figure=execution_details,
+ )