diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index 9cc7fcc0..23fd615c 100755 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -155,6 +155,8 @@ class PipelineController(object): skip_children_on_abort = attrib(type=bool, default=True) # if True, the children of failed steps are skipped skip_children_on_fail = attrib(type=bool, default=True) + # the stage of the step + stage = attrib(type=str, default=None) def __attrs_post_init__(self) -> None: if self.parents is None: @@ -513,6 +515,7 @@ class PipelineController(object): recursively_parse_parameters: bool = False, output_uri: Optional[Union[str, bool]] = None, continue_behaviour: Optional[dict] = None, + stage: Optional[str] = None ) -> bool: """ Add a step to the pipeline execution DAG. @@ -672,6 +675,7 @@ class PipelineController(object): If False, the children will run even if this step was aborted. Any parameters passed from the failed step to its children will default to None - If the keys are not present in the dictionary, their values will default to True + :param stage: Name of the stage. This parameter enables pipeline step grouping into stages :return: True if successful """ @@ -751,6 +755,7 @@ class PipelineController(object): monitor_models=monitor_models or [], output_uri=self._output_uri if output_uri is None else output_uri, continue_behaviour=continue_behaviour, + stage=stage ) self._retries[name] = 0 self._retries_callbacks[name] = ( @@ -819,6 +824,7 @@ class PipelineController(object): draft: Optional[bool] = False, working_dir: Optional[str] = None, continue_behaviour: Optional[dict] = None, + stage: Optional[str] = None ) -> bool: """ Create a Task from a function, including wrapping the function input arguments @@ -1003,6 +1009,7 @@ class PipelineController(object): If False, the children will run even if this step was aborted. Any parameters passed from the failed step to its children will default to None - If the keys are not present in the dictionary, their values will default to True + :param stage: Name of the stage. This parameter enables pipeline step grouping into stages :return: True if successful """ @@ -1056,6 +1063,7 @@ class PipelineController(object): draft=draft, working_dir=working_dir, continue_behaviour=continue_behaviour, + stage=stage ) def start( @@ -2446,6 +2454,7 @@ class PipelineController(object): draft: Optional[bool] = False, working_dir: Optional[str] = None, continue_behaviour: Optional[dict] = None, + stage: Optional[str] = None ) -> bool: """ Create a Task from a function, including wrapping the function input arguments @@ -2630,6 +2639,7 @@ class PipelineController(object): If False, the children will run even if this step was aborted. Any parameters passed from the failed step to its children will default to None If the keys are not present in the dictionary, their values will default to True + :param stage: Name of the stage. This parameter enables pipeline step grouping into stages :return: True if successful """ @@ -2777,6 +2787,7 @@ class PipelineController(object): output_uri=output_uri, draft=draft, continue_behaviour=continue_behaviour, + stage=stage ) self._retries[name] = 0 self._retries_callbacks[name] = ( @@ -4440,6 +4451,7 @@ class PipelineDecorator(PipelineController): draft: Optional[bool] = False, working_dir: Optional[str] = None, continue_behaviour: Optional[dict] = None, + stage: Optional[str] = None ) -> Callable: """ pipeline component function to be executed remotely @@ -4596,6 +4608,7 @@ class PipelineDecorator(PipelineController): If False, the children will run even if this step was aborted. Any parameters passed from the failed step to its children will default to None - If the keys are not present in the dictionary, their values will default to True + :param stage: Name of the stage. This parameter enables pipeline step grouping into stages :return: function wrapper """ @@ -4652,6 +4665,7 @@ class PipelineDecorator(PipelineController): draft=draft, working_dir=working_dir, continue_behaviour=continue_behaviour, + stage=stage ) if cls._singleton: