1
0
mirror of https://github.com/clearml/clearml synced 2025-04-10 15:35:51 +00:00

Store multi-pipeline execution plots on the master pipeline Task

Support pipeline return value stored on pipeline Task
Add PipelineDecorator multi_instance_support
This commit is contained in:
allegroai 2021-11-15 18:01:05 +02:00
parent f75a210335
commit 05ae7ec47b

View File

@ -1,10 +1,13 @@
import atexit
import functools
import inspect
import json
import os
import re
from copy import copy, deepcopy
from datetime import datetime
from logging import getLogger
from multiprocessing import Process, Queue
from threading import Thread, Event, RLock
from time import time
from typing import Sequence, Optional, Mapping, Callable, Any, List, Dict, Union, Tuple
@ -20,7 +23,6 @@ from ..backend_interface.util import get_or_create_project, exact_match_regex
from ..debugging.log import LoggerRoot
from ..model import BaseModel, OutputModel
from ..task import Task
from ..utilities.process.mp import leave_process
from ..utilities.proxy_object import LazyEvalWrapper, flatten_dictionary
@ -43,6 +45,8 @@ class PipelineController(object):
_clearml_job_class = ClearmlJob
_update_execution_plot_interval = 5.*60
_monitor_node_interval = 5.*60
_report_plot_execution_flow = dict(title='Pipeline', series='Execution Flow')
_report_plot_execution_details = dict(title='Pipeline Details', series='Execution Details')
@attrs
class Node(object):
@ -1598,10 +1602,14 @@ class PipelineController(object):
# report DAG
self._task.get_logger().report_plotly(
title='Pipeline', series='Execution Flow', iteration=0, figure=fig)
title=self._report_plot_execution_flow['title'],
series=self._report_plot_execution_flow['series'],
iteration=0, figure=fig)
# report detailed table
self._task.get_logger().report_table(
title='Pipeline Details', series='Execution Details', iteration=0, table_plot=table_values)
title=self._report_plot_execution_details['title'],
series=self._report_plot_execution_details['series'],
iteration=0, table_plot=table_values)
def _build_table_report(self, node_params, visited):
# type: (List, List) -> List[List]
@ -2188,6 +2196,8 @@ class PipelineDecorator(PipelineController):
_debug_execute_step_process = False
_debug_execute_step_function = False
_default_execution_queue = None
_multi_pipeline_instances = []
_atexit_registered = False
def __init__(
self,
@ -2225,6 +2235,7 @@ class PipelineDecorator(PipelineController):
pool_frequency=pool_frequency,
add_pipeline_tags=add_pipeline_tags,
target_project=target_project,
abort_on_failure=abort_on_failure,
)
# if we are in eager execution, make sure parent class knows it
@ -2555,8 +2566,11 @@ class PipelineDecorator(PipelineController):
setting up the Task's environment
:param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference',
'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom'
:param repo: Optional, specify a repository to attach to the function, when remotely executing. Allow users to execute the function inside the specified repository, enabling them to load modules/script from the repository. Notice the execution work directory will be the repository root folder.
Supports both git repo url link, and local repository path (automatically converted into the remote git/commit as is currently checkout).
:param repo: Optional, specify a repository to attach to the function, when remotely executing.
Allow users to execute the function inside the specified repository, enabling them to load modules/script
from the repository. Notice the execution work directory will be the repository root folder.
Supports both git repo url link, and local repository path (automatically converted into the remote
git/commit as is currently checkout).
Example remote url: 'https://github.com/user/repo.git'
Example local repo copy: './repo' -> will automatically store the remote
repo url and commit ID based on the locally cloned copy
@ -2819,12 +2833,14 @@ class PipelineDecorator(PipelineController):
name, # type: str
project, # type: str
version, # type: str
return_value=None, # type: Optional[str]
default_queue=None, # type: Optional[str]
pool_frequency=0.2, # type: float
add_pipeline_tags=False, # type: bool
target_project=None, # type: Optional[str]
abort_on_failure=False, # type: bool
pipeline_execution_queue='services' # type: Optional[str]
pipeline_execution_queue='services', # type: Optional[str]
multi_instance_support=False,
):
# type: (...) -> Callable
"""
@ -2834,6 +2850,8 @@ class PipelineDecorator(PipelineController):
:param project: Provide project storing the pipeline (if main Task exists it overrides its project)
:param version: Must provide pipeline version. This version allows to uniquely identify the pipeline
template execution. Examples for semantic versions: version='1.0.1' , version='23', version='1.2'
:param return_value: Optional, Provide an artifact name to store the pipeline function return object
Notice, If not provided the pipeline will not store the pipeline function return value.
:param default_queue: default pipeline step queue
:param float pool_frequency: The pooling frequency (in minutes) for monitoring experiments / states.
:param bool add_pipeline_tags: (default: False) if True, add `pipe: <pipeline_task_id>` tag to all
@ -2847,6 +2865,13 @@ class PipelineDecorator(PipelineController):
and mark the pipeline as failed.
:param pipeline_execution_queue: remote pipeline execution queue (default 'services' queue).
If None is passed, execute the pipeline logic locally (pipeline steps are still executed remotely)
:param multi_instance_support: If True, allow multiple calls to the same pipeline function,
each call creating a new Pipeline Task. Notice it is recommended to create an additional Task on the
"main process" acting as a master pipeline, automatically collecting the execution plots.
If multi_instance_support=='parallel' then the pipeline calls are executed in parallel,
in the `parallel` case the function calls return None, to collect all pipeline results call
`PipelineDecorator.wait_for_multi_pipelines()`.
Default False, no multi instance pipeline support.
"""
def decorator_wrap(func):
@ -2912,7 +2937,7 @@ class PipelineDecorator(PipelineController):
# this time the pipeline is executed only on the remote machine
try:
func(**pipeline_kwargs)
pipeline_result = func(**pipeline_kwargs)
except Exception:
a_pipeline.stop(mark_failed=True)
raise
@ -2932,12 +2957,22 @@ class PipelineDecorator(PipelineController):
continue
node.job.wait(pool_period=15)
waited = True
# store the pipeline result of we have any:
if return_value and pipeline_result is not None:
a_pipeline._task.upload_artifact(
name=str(return_value), artifact_object=pipeline_result, wait_on_upload=True
)
# now we can stop the pipeline
a_pipeline.stop()
# now we can raise the exception
if triggered_exception:
raise triggered_exception
return
return pipeline_result
if multi_instance_support:
return cls._multi_pipeline_wrapper(
func=internal_decorator, parallel=bool(multi_instance_support == 'parallel'))
return internal_decorator
@ -2970,8 +3005,8 @@ class PipelineDecorator(PipelineController):
def debug_pipeline(cls):
# type: () -> ()
"""
Set debugging mode, run all functions locally as subprocess or serially as functions
Run the full pipeline DAG locally, where steps are executed as sub-processes Tasks
Set debugging mode, run all functions locally as functions
Run the full pipeline DAG locally, where steps are executed as functions
Notice:
running the DAG locally assumes the local code execution (i.e. it will not clone & apply git diff)
Pipeline steps are executed as functions (no Task will be created), fo ease debugging J
@ -2979,6 +3014,84 @@ class PipelineDecorator(PipelineController):
cls._debug_execute_step_process = True
cls._debug_execute_step_function = True
@classmethod
def _multi_pipeline_wrapper(
cls,
func=None, # type: Callable
parallel=False, # type: bool
):
# type: (...) -> Callable
"""
Add support for multiple pipeline function calls,
enabling execute multiple instances of the same pipeline from a single script.
.. code-block:: python
@PipelineDecorator.pipeline(
multi_instance_support=True, name="custom pipeline logic", project="examples", version="1.0")
def pipeline(parameter=1):
print(f"running with parameter={parameter}")
# run both pipeline (if multi_instance_support=='parallel', run pipelines in parallel)
pipeline(parameter=1)
pipeline(parameter=2)
:param parallel: If True, the pipeline is running in the background, which implies calling
the pipeline twice means running the pipelines in parallel.
Default: False, pipeline function returns when pipeline completes
:return: Return wrapped pipeline function.
Notice the return value of the pipeline wrapped function:
if parallel==True, return will be None, otherwise expect the return of the pipeline wrapped function
"""
def internal_decorator(*args, **kwargs):
# if this is a debug run just call the function (no parallelization).
if cls._debug_execute_step_function:
return func(*args, **kwargs)
def sanitized_env(a_queue, *a_args, **a_kwargs):
os.environ.pop('CLEARML_PROC_MASTER_ID', None)
os.environ.pop('TRAINS_PROC_MASTER_ID', None)
os.environ.pop('CLEARML_TASK_ID', None)
os.environ.pop('TRAINS_TASK_ID', None)
if Task.current_task():
# noinspection PyProtectedMember
Task.current_task()._reset_current_task_obj()
a_result = func(*a_args, **a_kwargs)
if a_queue is not None:
task_id = Task.current_task().id if Task.current_task() else None
a_queue.put((task_id, a_result))
return a_result
queue = Queue()
p = Process(target=sanitized_env, args=(queue, ) + args, kwargs=kwargs)
# make sure we wait for the subprocess.
p.daemon = False
p.start()
if parallel:
cls._multi_pipeline_instances.append((p, queue))
return
else:
p.join()
# noinspection PyBroadException
try:
pipeline_task, result = queue.get_nowait()
except Exception:
return None
# we should update the master Task plot:
if pipeline_task and Task.current_task():
cls._add_pipeline_plots(pipeline_task)
return result
if parallel and not cls._atexit_registered:
cls._atexit_registered = True
atexit.register(cls._wait_for_multi_pipelines)
return internal_decorator
@classmethod
def get_current_pipeline(cls):
# type: () -> "PipelineDecorator"
@ -2986,3 +3099,72 @@ class PipelineDecorator(PipelineController):
Return the currently running pipeline instance
"""
return cls._singleton
@classmethod
def wait_for_multi_pipelines(cls):
# type () -> List[Any]
"""
Wait until all background multi pipeline execution is completed.
Returns all the pipeline results in call order (first pipeline call at index 0)
:return: List of return values from executed pipeline, based on call order.
"""
return cls._wait_for_multi_pipelines()
@classmethod
def _wait_for_multi_pipelines(cls):
results = []
if not cls._multi_pipeline_instances:
return results
print('Waiting for background pipelines to finish')
for p, queue in cls._multi_pipeline_instances:
try:
p.join()
except: # noqa
pass
# noinspection PyBroadException
try:
pipeline_task, result = queue.get_nowait()
results.append(result)
cls._add_pipeline_plots(pipeline_task)
except Exception:
pass
cls._multi_pipeline_instances = []
return results
@classmethod
def _add_pipeline_plots(cls, pipeline_task_id):
if not Task.current_task():
return
from clearml.backend_api.services import events
res = Task.current_task().send(
events.GetTaskPlotsRequest(task=pipeline_task_id, iters=1),
raise_on_errors=False,
ignore_errors=True,
)
execution_flow = None
execution_details = None
for p in res.response.plots:
try:
if p['metric'] == cls._report_plot_execution_flow['title'] and \
p['variant'] == cls._report_plot_execution_flow['series']:
execution_flow = json.loads(p['plot_str'])
elif p['metric'] == cls._report_plot_execution_details['title'] and \
p['variant'] == cls._report_plot_execution_details['series']:
execution_details = json.loads(p['plot_str'])
execution_details['layout']['name'] += ' - ' + str(pipeline_task_id)
except Exception as ex:
getLogger('clearml.automation.controller').warning(
'Multi-pipeline plot update failed: {}'.format(ex))
if execution_flow:
Task.current_task().get_logger().report_plotly(
title=cls._report_plot_execution_flow['title'],
series='{} - {}'.format(cls._report_plot_execution_flow['series'], pipeline_task_id),
iteration=0, figure=execution_flow)
if execution_details:
Task.current_task().get_logger().report_plotly(
title=cls._report_plot_execution_details['title'],
series='{} - {}'.format(cls._report_plot_execution_details['series'], pipeline_task_id),
iteration=0, figure=execution_details)