Add PipelineController.stop() mark_failed and mark_aborted args

Add pipeline decorator run_locally
Add PipelineController.continue_on_fail property
Add PipelineController.__init__() abort_on_failure arg
This commit is contained in:
allegroai 2021-10-16 23:10:23 +03:00
parent e4ceeb2c11
commit 04797d25d1

View File

@ -20,7 +20,7 @@ from ..debugging.log import LoggerRoot
from ..model import BaseModel
from ..task import Task
from ..utilities.process.mp import leave_process
from ..utilities.proxy_object import LazyEvalWrapper
from ..utilities.proxy_object import LazyEvalWrapper, flatten_dictionary
class PipelineController(object):
@ -57,6 +57,7 @@ class PipelineController(object):
clone_task = attrib(type=bool, default=True) # If True cline the base_task_id, then execute the cloned Task
job = attrib(type=ClearmlJob, default=None) # ClearMLJob object
skip_job = attrib(type=bool, default=False) # if True, this step should be skipped
continue_on_fail = attrib(type=bool, default=False) # if True, the pipeline continues even if the step failed
cache_executed_step = attrib(type=bool, default=False) # if True this pipeline step should be cached
return_artifacts = attrib(type=list, default=[]) # List of artifact names returned by the step
monitor_metrics = attrib(type=list, default=[]) # List of metric title/series to monitor
@ -86,6 +87,7 @@ class PipelineController(object):
add_pipeline_tags=False, # type: bool
target_project=None, # type: Optional[str]
auto_version_bump=True, # type: bool
abort_on_failure=False, # type: bool
):
# type: (...) -> None
"""
@ -102,6 +104,12 @@ class PipelineController(object):
:param bool auto_version_bump: If True (default), if the same pipeline version already exists
(with any difference from the current one), the current pipeline version will be bumped to a new version
version bump examples: 1.0.0 -> 1.0.1 , 1.2 -> 1.3, 10 -> 11 etc.
:param bool abort_on_failure: If False (default), failed pipeline steps will not cause the pipeline
to stop immediately, instead any step that is not connected (or indeirectly connected) to the failed step,
will still be executed. Nonetheless the pipeline itself will be marked failed, unless the failed step
was specifically defined with "continue_on_fail=True".
If True, any failed step will cause the pipeline to immediately abort, stop all running steps,
and mark the pipeline as failed.
"""
self._nodes = {}
self._running_nodes = []
@ -147,6 +155,7 @@ class PipelineController(object):
self._task.add_tags([self._tag])
self._monitored_nodes = {} # type: Dict[str, dict]
self._abort_running_steps_on_failure = abort_on_failure
def set_default_execution_queue(self, default_execution_queue):
# type: (Optional[str]) -> None
@ -183,6 +192,7 @@ class PipelineController(object):
base_task_project=None, # type: Optional[str]
base_task_name=None, # type: Optional[str]
clone_base_task=True, # type: bool
continue_on_fail=False, # type: bool
pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
cache_executed_step=False, # type: bool
@ -258,6 +268,9 @@ class PipelineController(object):
use the base_task_project and base_task_name combination to retrieve the base_task_id to use for the step.
:param clone_base_task: If True (default) the pipeline will clone the base task, and modify/enqueue
the cloned Task. If False, the base-task is used directly, notice it has to be in draft-mode (created).
:param continue_on_fail: (default False). If True, failed step will not cause the pipeline to stop
(or marked as failed). Notice, that steps that are connected (or indirectly connected)
to the failed step will be skipped.
:param pre_execute_callback: Callback function, called when the step (Task) is created
and before it is sent for execution. Allows a user to modify the Task before launch.
Use `node.job` to access the ClearmlJob object, or `node.job.task` to directly access the Task object.
@ -344,6 +357,9 @@ class PipelineController(object):
raise ValueError("configuration_overrides must be a dictionary, with all values "
"either dicts or strings, got \'{}\' instead".format(configuration_overrides))
if task_overrides:
task_overrides = flatten_dictionary(task_overrides, sep='.')
self._nodes[name] = self.Node(
name=name, base_task_id=base_task_id, parents=parents or [],
queue=execution_queue, timeout=time_limit,
@ -352,6 +368,7 @@ class PipelineController(object):
clone_task=clone_base_task,
task_overrides=task_overrides,
cache_executed_step=cache_executed_step,
continue_on_fail=continue_on_fail,
task_factory_func=base_task_factory,
monitor_metrics=monitor_metrics or [],
monitor_artifacts=monitor_artifacts or [],
@ -386,6 +403,7 @@ class PipelineController(object):
monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
time_limit=None, # type: Optional[float]
continue_on_fail=False, # type: bool
pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
cache_executed_step=False, # type: bool
@ -474,6 +492,9 @@ class PipelineController(object):
Example: ['model_weights_*', ]
:param time_limit: Default None, no time limit.
Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed.
:param continue_on_fail: (default False). If True, failed step will not cause the pipeline to stop
(or marked as failed). Notice, that steps that are connected (or indirectly connected)
to the failed step will be skipped.
:param pre_execute_callback: Callback function, called when the step (Task) is created
and before it is sent for execution. Allows a user to modify the Task before launch.
Use `node.job` to access the ClearmlJob object, or `node.job.task` to directly access the Task object.
@ -587,6 +608,7 @@ class PipelineController(object):
clone_task=False,
cache_executed_step=cache_executed_step,
task_factory_func=_create_task,
continue_on_fail=continue_on_fail,
return_artifacts=function_return,
monitor_artifacts=monitor_artifacts,
monitor_metrics=monitor_metrics,
@ -674,9 +696,12 @@ class PipelineController(object):
def start_locally(self, run_pipeline_steps_locally=False):
# type: (bool) -> None
"""
Start the current pipeline locally, in most cases for debug purposes.
By default it will be running the DAG itself locally, as sub-processes.
Notice: running the DAG locally assumes the local code execution (i.e. it will not clone & apply git diff)
Start the current pipeline locally, meaning the pipeline logic is running on the current machine,
instead of on the `services` queue.
Using run_pipeline_steps_locally=True you can run all the pipeline steps locally as sub-processes.
Notice: when running pipeline steps locally, it assumes local code execution
(i.e. it is running the local code as is, regardless of the git commit/diff on the pipeline steps Task)
:param run_pipeline_steps_locally: (default False) If True, run the
pipeline steps themselves locally as a subprocess (use for debugging the pipeline locally,
@ -789,20 +814,31 @@ class PipelineController(object):
name=name, artifact_object=artifact_object, metadata=metadata, delete_after_upload=delete_after_upload,
auto_pickle=auto_pickle, preview=preview, wait_on_upload=wait_on_upload)
def stop(self, timeout=None):
# type: (Optional[float]) -> ()
def stop(self, timeout=None, mark_failed=False, mark_aborted=False):
# type: (Optional[float], bool, bool) -> ()
"""
Stop the pipeline controller and the optimization thread.
If mark_failed and mark_aborted are False (default) mark the pipeline as completed,
unless one of the steps failed, then mark the pipeline as failed
:param float timeout: Wait timeout for the optimization thread to exit (minutes).
:param timeout: Wait timeout for the optimization thread to exit (minutes).
The default is ``None``, indicating do not wait terminate immediately.
:param mark_failed: If True, mark the pipeline task as failed. (default False)
:param mark_aborted: If False, mark the pipeline task as aborted. (default False)
"""
self._stop_event.set()
self.wait(timeout=timeout)
if self._task and self._pipeline_task_status_failed:
if not self._task:
return
self._task.close()
if mark_failed:
self._task.mark_failed(status_reason='Pipeline aborted and failed', force=True)
elif mark_aborted:
self._task.mark_aborted(status_reason='Pipeline aborted', force=True)
elif self._pipeline_task_status_failed:
print('Setting pipeline controller Task as failed (due to failed steps) !')
self._task.close()
self._task.mark_failed(status_reason='Pipeline step failed', force=True)
def wait(self, timeout=None):
@ -1600,6 +1636,8 @@ class PipelineController(object):
return "#bdf5bd" # lightgreen, pending in queue
elif node.job.is_completed():
return "blue" # completed job
elif node.job.is_failed():
return "red" # failed job
else:
return "green" # running job
elif node.skip_job:
@ -1639,15 +1677,20 @@ class PipelineController(object):
# if no a job ended, continue
completed_jobs = []
force_execution_plot_update = False
nodes_failed_stop_pipeline = []
for j in self._running_nodes:
node = self._nodes[j]
if not node.job:
continue
if node.job.is_stopped():
completed_jobs.append(j)
node.executed = node.job.task_id() if not node.job.is_failed() else False
node_failed = node.job.is_failed()
node.executed = node.job.task_id() if not node_failed else False
if j in launched_nodes:
launched_nodes.remove(j)
# check if we need to stop all running steps
if node_failed and self._abort_running_steps_on_failure and not node.continue_on_fail:
nodes_failed_stop_pipeline.append(node.name)
elif node.timeout:
started = node.job.task.data.started
if (datetime.now().astimezone(started.tzinfo) - started).total_seconds() > node.timeout:
@ -1686,6 +1729,12 @@ class PipelineController(object):
if self._post_step_callbacks.get(job_node.name):
self._post_step_callbacks[job_node.name](self, job_node)
# check if we need to stop the pipeline, and abort all running steps
if nodes_failed_stop_pipeline:
print('Aborting pipeline and stopping all running steps, node {} failed'.format(
nodes_failed_stop_pipeline))
break
# Pull the next jobs in the pipeline, based on the completed list
next_nodes = []
for node in list(self._nodes.values()):
@ -1725,8 +1774,9 @@ class PipelineController(object):
# stop all currently running jobs:
for node in list(self._nodes.values()):
if node.executed is False:
if node.executed is False and not node.continue_on_fail:
self._pipeline_task_status_failed = True
if node.job and node.executed and not node.job.is_stopped():
node.job.abort()
elif not node.job and not node.executed:
@ -2117,6 +2167,7 @@ class PipelineDecorator(PipelineController):
pool_frequency=0.2, # type: float
add_pipeline_tags=False, # type: bool
target_project=None, # type: Optional[str]
abort_on_failure=False, # type: bool
):
# type: (...) -> ()
"""
@ -2130,6 +2181,12 @@ class PipelineDecorator(PipelineController):
:param bool add_pipeline_tags: (default: False) if True, add `pipe: <pipeline_task_id>` tag to all
steps (Tasks) created by this pipeline.
:param str target_project: If provided, all pipeline steps are cloned into the target project
:param bool abort_on_failure: If False (default), failed pipeline steps will not cause the pipeline
to stop immediately, instead any step that is not connected (or indeirectly connected) to the failed step,
will still be executed. Nonetheless the pipeline itself will be marked failed, unless the failed step
was specifically defined with "continue_on_fail=True".
If True, any failed step will cause the pipeline to immediately abort, stop all running steps,
and mark the pipeline as failed.
"""
super(PipelineDecorator, self).__init__(
name=name,
@ -2181,6 +2238,7 @@ class PipelineDecorator(PipelineController):
# check the state of all current jobs
# if no a job ended, continue
completed_jobs = []
nodes_failed_stop_pipeline = []
force_execution_plot_update = False
for j in self._running_nodes:
node = self._nodes[j]
@ -2188,9 +2246,13 @@ class PipelineDecorator(PipelineController):
continue
if node.job.is_stopped():
completed_jobs.append(j)
node.executed = node.job.task_id() if not node.job.is_failed() else False
node_failed = node.job.is_failed()
node.executed = node.job.task_id() if not node_failed else False
if j in launched_nodes:
launched_nodes.remove(j)
# check if we need to stop all running steps
if node_failed and self._abort_running_steps_on_failure and not node.continue_on_fail:
nodes_failed_stop_pipeline.append(node.name)
elif node.timeout:
started = node.job.task.data.started
if (datetime.now().astimezone(started.tzinfo) - started).total_seconds() > node.timeout:
@ -2229,6 +2291,12 @@ class PipelineDecorator(PipelineController):
if self._post_step_callbacks.get(job_node.name):
self._post_step_callbacks[job_node.name](self, job_node)
# check if we need to stop the pipeline, and abort all running steps
if nodes_failed_stop_pipeline:
print('Aborting pipeline and stopping all running steps, node {} failed'.format(
nodes_failed_stop_pipeline))
break
# update current state (in configuration, so that we could later continue an aborted pipeline)
self._force_task_configuration_update()
@ -2237,8 +2305,9 @@ class PipelineDecorator(PipelineController):
# stop all currently running jobs, protect against changes while iterating):
for node in list(self._nodes.values()):
if node.executed is False:
if node.executed is False and not node.continue_on_fail:
self._pipeline_task_status_failed = True
if node.job and node.executed and not node.job.is_stopped():
node.job.abort()
elif not node.job and not node.executed:
@ -2413,6 +2482,7 @@ class PipelineDecorator(PipelineController):
packages=None, # type: Optional[Union[str, Sequence[str]]]
parents=None, # type: Optional[List[str]]
execution_queue=None, # type: Optional[str]
continue_on_fail=False, # type: bool
docker=None, # type: Optional[str]
docker_args=None, # type: Optional[str]
docker_bash_setup_script=None, # type: Optional[str]
@ -2446,6 +2516,9 @@ class PipelineDecorator(PipelineController):
have been executed successfully.
:param execution_queue: Optional, the queue to use for executing this specific step.
If not provided, the task will be sent to the default execution queue, as defined on the class
:param continue_on_fail: (default False). If True, failed step will not cause the pipeline to stop
(or marked as failed). Notice, that steps that are connected (or indirectly connected)
to the failed step will be skipped.
:param docker: Select the docker image to be executed in by the remote session
:param docker_args: Add docker arguments, pass a single string
:param docker_bash_setup_script: Add bash script to be executed
@ -2509,6 +2582,7 @@ class PipelineDecorator(PipelineController):
packages=packages,
parents=parents,
execution_queue=execution_queue,
continue_on_fail=continue_on_fail,
docker=docker,
docker_args=docker_args,
docker_bash_setup_script=docker_bash_setup_script,
@ -2668,7 +2742,7 @@ class PipelineDecorator(PipelineController):
def results_reference(return_name):
# wait until job is completed
_node.job.wait(pool_period=0.2)
if _node.job.is_failed():
if _node.job.is_failed() and not _node.continue_on_fail:
raise ValueError(
'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id()))
@ -2706,7 +2780,8 @@ class PipelineDecorator(PipelineController):
pool_frequency=0.2, # type: float
add_pipeline_tags=False, # type: bool
target_project=None, # type: Optional[str]
pipeline_execution_queue='services' # type: Optional[str]
abort_on_failure=False, # type: bool
pipeline_execution_queue='services', # type: Optional[str]
):
# type: (...) -> Callable
"""
@ -2721,6 +2796,12 @@ class PipelineDecorator(PipelineController):
:param bool add_pipeline_tags: (default: False) if True, add `pipe: <pipeline_task_id>` tag to all
steps (Tasks) created by this pipeline.
:param str target_project: If provided, all pipeline steps are cloned into the target project
:param bool abort_on_failure: If False (default), failed pipeline steps will not cause the pipeline
to stop immediately, instead any step that is not connected (or indirectly connected) to the failed step,
will still be executed. Nonetheless the pipeline itself will be marked failed, unless the failed step
was specifically defined with "continue_on_fail=True".
If True, any failed step will cause the pipeline to immediately abort, stop all running steps,
and mark the pipeline as failed.
:param pipeline_execution_queue: remote pipeline execution queue (default 'services' queue).
If None is passed, execute the pipeline logic locally (pipeline steps are still executed remotely)
"""
@ -2759,6 +2840,7 @@ class PipelineDecorator(PipelineController):
pool_frequency=pool_frequency,
add_pipeline_tags=add_pipeline_tags,
target_project=target_project,
abort_on_failure=abort_on_failure,
)
if PipelineDecorator._debug_execute_step_process:
@ -2786,8 +2868,18 @@ class PipelineDecorator(PipelineController):
# when we get here it means we are running remotely
# this time the pipeline is executed only on the remote machine
func(**pipeline_kwargs)
LazyEvalWrapper.trigger_all_remote_references()
try:
func(**pipeline_kwargs)
except Exception:
a_pipeline.stop(mark_failed=True)
raise
triggered_exception = None
try:
LazyEvalWrapper.trigger_all_remote_references()
except Exception as ex:
triggered_exception = ex
# make sure we wait for all nodes to finish
waited = True
while waited:
@ -2799,6 +2891,9 @@ class PipelineDecorator(PipelineController):
waited = True
# now we can stop the pipeline
a_pipeline.stop()
# now we can raise the exception
if triggered_exception:
raise triggered_exception
return
return internal_decorator
@ -2816,15 +2911,35 @@ class PipelineDecorator(PipelineController):
cls._default_execution_queue = str(default_execution_queue) if default_execution_queue else None
@classmethod
def debug_pipeline(cls, execute_steps_as_functions=False):
# type: (bool) -> ()
def run_locally(cls):
# type: () -> ()
"""
Set debugging mode, run all functions locally as subprocess or serially as functions
Set local mode, run all functions locally as subprocess or serially as functions
Run the full pipeline DAG locally, where steps are executed as sub-processes Tasks
Notice: running the DAG locally assumes the local code execution (i.e. it will not clone & apply git diff)
:param execute_steps_as_functions: If True, run the pipeline steps locally
as a function (no Task will be created). Default False.
"""
cls._debug_execute_step_process = True
cls._debug_execute_step_function = execute_steps_as_functions
cls._debug_execute_step_function = False
@classmethod
def debug_pipeline(cls):
# type: () -> ()
"""
Set debugging mode, run all functions locally as subprocess or serially as functions
Run the full pipeline DAG locally, where steps are executed as sub-processes Tasks
Notice:
running the DAG locally assumes the local code execution (i.e. it will not clone & apply git diff)
Pipeline steps are executed as functions (no Task will be created), fo ease debugging J
"""
cls._debug_execute_step_process = True
cls._debug_execute_step_function = True
@classmethod
def get_current_pipeline(cls):
# type: () -> "PipelineDecorator"
"""
Return the currently running pipeline instance
"""
return cls._singleton