diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index 1afd14df..af8f30aa 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -1,10 +1,13 @@ +import atexit import functools import inspect import json +import os import re from copy import copy, deepcopy from datetime import datetime from logging import getLogger +from multiprocessing import Process, Queue from threading import Thread, Event, RLock from time import time from typing import Sequence, Optional, Mapping, Callable, Any, List, Dict, Union, Tuple @@ -20,7 +23,6 @@ from ..backend_interface.util import get_or_create_project, exact_match_regex from ..debugging.log import LoggerRoot from ..model import BaseModel, OutputModel from ..task import Task -from ..utilities.process.mp import leave_process from ..utilities.proxy_object import LazyEvalWrapper, flatten_dictionary @@ -43,6 +45,8 @@ class PipelineController(object): _clearml_job_class = ClearmlJob _update_execution_plot_interval = 5.*60 _monitor_node_interval = 5.*60 + _report_plot_execution_flow = dict(title='Pipeline', series='Execution Flow') + _report_plot_execution_details = dict(title='Pipeline Details', series='Execution Details') @attrs class Node(object): @@ -1598,10 +1602,14 @@ class PipelineController(object): # report DAG self._task.get_logger().report_plotly( - title='Pipeline', series='Execution Flow', iteration=0, figure=fig) + title=self._report_plot_execution_flow['title'], + series=self._report_plot_execution_flow['series'], + iteration=0, figure=fig) # report detailed table self._task.get_logger().report_table( - title='Pipeline Details', series='Execution Details', iteration=0, table_plot=table_values) + title=self._report_plot_execution_details['title'], + series=self._report_plot_execution_details['series'], + iteration=0, table_plot=table_values) def _build_table_report(self, node_params, visited): # type: (List, List) -> List[List] @@ -2188,6 +2196,8 @@ class PipelineDecorator(PipelineController): _debug_execute_step_process = False _debug_execute_step_function = False _default_execution_queue = None + _multi_pipeline_instances = [] + _atexit_registered = False def __init__( self, @@ -2225,6 +2235,7 @@ class PipelineDecorator(PipelineController): pool_frequency=pool_frequency, add_pipeline_tags=add_pipeline_tags, target_project=target_project, + abort_on_failure=abort_on_failure, ) # if we are in eager execution, make sure parent class knows it @@ -2555,8 +2566,11 @@ class PipelineDecorator(PipelineController): setting up the Task's environment :param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference', 'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom' - :param repo: Optional, specify a repository to attach to the function, when remotely executing. Allow users to execute the function inside the specified repository, enabling them to load modules/script from the repository. Notice the execution work directory will be the repository root folder. - Supports both git repo url link, and local repository path (automatically converted into the remote git/commit as is currently checkout). + :param repo: Optional, specify a repository to attach to the function, when remotely executing. + Allow users to execute the function inside the specified repository, enabling them to load modules/script + from the repository. Notice the execution work directory will be the repository root folder. + Supports both git repo url link, and local repository path (automatically converted into the remote + git/commit as is currently checkout). Example remote url: 'https://github.com/user/repo.git' Example local repo copy: './repo' -> will automatically store the remote repo url and commit ID based on the locally cloned copy @@ -2819,12 +2833,14 @@ class PipelineDecorator(PipelineController): name, # type: str project, # type: str version, # type: str + return_value=None, # type: Optional[str] default_queue=None, # type: Optional[str] pool_frequency=0.2, # type: float add_pipeline_tags=False, # type: bool target_project=None, # type: Optional[str] abort_on_failure=False, # type: bool - pipeline_execution_queue='services' # type: Optional[str] + pipeline_execution_queue='services', # type: Optional[str] + multi_instance_support=False, ): # type: (...) -> Callable """ @@ -2834,6 +2850,8 @@ class PipelineDecorator(PipelineController): :param project: Provide project storing the pipeline (if main Task exists it overrides its project) :param version: Must provide pipeline version. This version allows to uniquely identify the pipeline template execution. Examples for semantic versions: version='1.0.1' , version='23', version='1.2' + :param return_value: Optional, Provide an artifact name to store the pipeline function return object + Notice, If not provided the pipeline will not store the pipeline function return value. :param default_queue: default pipeline step queue :param float pool_frequency: The pooling frequency (in minutes) for monitoring experiments / states. :param bool add_pipeline_tags: (default: False) if True, add `pipe: ` tag to all @@ -2847,6 +2865,13 @@ class PipelineDecorator(PipelineController): and mark the pipeline as failed. :param pipeline_execution_queue: remote pipeline execution queue (default 'services' queue). If None is passed, execute the pipeline logic locally (pipeline steps are still executed remotely) + :param multi_instance_support: If True, allow multiple calls to the same pipeline function, + each call creating a new Pipeline Task. Notice it is recommended to create an additional Task on the + "main process" acting as a master pipeline, automatically collecting the execution plots. + If multi_instance_support=='parallel' then the pipeline calls are executed in parallel, + in the `parallel` case the function calls return None, to collect all pipeline results call + `PipelineDecorator.wait_for_multi_pipelines()`. + Default False, no multi instance pipeline support. """ def decorator_wrap(func): @@ -2912,7 +2937,7 @@ class PipelineDecorator(PipelineController): # this time the pipeline is executed only on the remote machine try: - func(**pipeline_kwargs) + pipeline_result = func(**pipeline_kwargs) except Exception: a_pipeline.stop(mark_failed=True) raise @@ -2932,12 +2957,22 @@ class PipelineDecorator(PipelineController): continue node.job.wait(pool_period=15) waited = True + # store the pipeline result of we have any: + if return_value and pipeline_result is not None: + a_pipeline._task.upload_artifact( + name=str(return_value), artifact_object=pipeline_result, wait_on_upload=True + ) + # now we can stop the pipeline a_pipeline.stop() # now we can raise the exception if triggered_exception: raise triggered_exception - return + return pipeline_result + + if multi_instance_support: + return cls._multi_pipeline_wrapper( + func=internal_decorator, parallel=bool(multi_instance_support == 'parallel')) return internal_decorator @@ -2970,8 +3005,8 @@ class PipelineDecorator(PipelineController): def debug_pipeline(cls): # type: () -> () """ - Set debugging mode, run all functions locally as subprocess or serially as functions - Run the full pipeline DAG locally, where steps are executed as sub-processes Tasks + Set debugging mode, run all functions locally as functions + Run the full pipeline DAG locally, where steps are executed as functions Notice: running the DAG locally assumes the local code execution (i.e. it will not clone & apply git diff) Pipeline steps are executed as functions (no Task will be created), fo ease debugging J @@ -2979,6 +3014,84 @@ class PipelineDecorator(PipelineController): cls._debug_execute_step_process = True cls._debug_execute_step_function = True + @classmethod + def _multi_pipeline_wrapper( + cls, + func=None, # type: Callable + parallel=False, # type: bool + ): + # type: (...) -> Callable + """ + Add support for multiple pipeline function calls, + enabling execute multiple instances of the same pipeline from a single script. + + .. code-block:: python + + @PipelineDecorator.pipeline( + multi_instance_support=True, name="custom pipeline logic", project="examples", version="1.0") + def pipeline(parameter=1): + print(f"running with parameter={parameter}") + + # run both pipeline (if multi_instance_support=='parallel', run pipelines in parallel) + pipeline(parameter=1) + pipeline(parameter=2) + + :param parallel: If True, the pipeline is running in the background, which implies calling + the pipeline twice means running the pipelines in parallel. + Default: False, pipeline function returns when pipeline completes + :return: Return wrapped pipeline function. + Notice the return value of the pipeline wrapped function: + if parallel==True, return will be None, otherwise expect the return of the pipeline wrapped function + """ + + def internal_decorator(*args, **kwargs): + # if this is a debug run just call the function (no parallelization). + if cls._debug_execute_step_function: + return func(*args, **kwargs) + + def sanitized_env(a_queue, *a_args, **a_kwargs): + os.environ.pop('CLEARML_PROC_MASTER_ID', None) + os.environ.pop('TRAINS_PROC_MASTER_ID', None) + os.environ.pop('CLEARML_TASK_ID', None) + os.environ.pop('TRAINS_TASK_ID', None) + if Task.current_task(): + # noinspection PyProtectedMember + Task.current_task()._reset_current_task_obj() + a_result = func(*a_args, **a_kwargs) + if a_queue is not None: + task_id = Task.current_task().id if Task.current_task() else None + a_queue.put((task_id, a_result)) + return a_result + + queue = Queue() + + p = Process(target=sanitized_env, args=(queue, ) + args, kwargs=kwargs) + # make sure we wait for the subprocess. + p.daemon = False + p.start() + if parallel: + cls._multi_pipeline_instances.append((p, queue)) + return + else: + p.join() + # noinspection PyBroadException + try: + pipeline_task, result = queue.get_nowait() + except Exception: + return None + + # we should update the master Task plot: + if pipeline_task and Task.current_task(): + cls._add_pipeline_plots(pipeline_task) + + return result + + if parallel and not cls._atexit_registered: + cls._atexit_registered = True + atexit.register(cls._wait_for_multi_pipelines) + + return internal_decorator + @classmethod def get_current_pipeline(cls): # type: () -> "PipelineDecorator" @@ -2986,3 +3099,72 @@ class PipelineDecorator(PipelineController): Return the currently running pipeline instance """ return cls._singleton + + @classmethod + def wait_for_multi_pipelines(cls): + # type () -> List[Any] + """ + Wait until all background multi pipeline execution is completed. + Returns all the pipeline results in call order (first pipeline call at index 0) + + :return: List of return values from executed pipeline, based on call order. + """ + return cls._wait_for_multi_pipelines() + + @classmethod + def _wait_for_multi_pipelines(cls): + results = [] + if not cls._multi_pipeline_instances: + return results + print('Waiting for background pipelines to finish') + for p, queue in cls._multi_pipeline_instances: + try: + p.join() + except: # noqa + pass + # noinspection PyBroadException + try: + pipeline_task, result = queue.get_nowait() + results.append(result) + cls._add_pipeline_plots(pipeline_task) + except Exception: + pass + cls._multi_pipeline_instances = [] + return results + + @classmethod + def _add_pipeline_plots(cls, pipeline_task_id): + if not Task.current_task(): + return + from clearml.backend_api.services import events + res = Task.current_task().send( + events.GetTaskPlotsRequest(task=pipeline_task_id, iters=1), + raise_on_errors=False, + ignore_errors=True, + ) + execution_flow = None + execution_details = None + for p in res.response.plots: + try: + if p['metric'] == cls._report_plot_execution_flow['title'] and \ + p['variant'] == cls._report_plot_execution_flow['series']: + execution_flow = json.loads(p['plot_str']) + + elif p['metric'] == cls._report_plot_execution_details['title'] and \ + p['variant'] == cls._report_plot_execution_details['series']: + execution_details = json.loads(p['plot_str']) + execution_details['layout']['name'] += ' - ' + str(pipeline_task_id) + except Exception as ex: + getLogger('clearml.automation.controller').warning( + 'Multi-pipeline plot update failed: {}'.format(ex)) + + if execution_flow: + Task.current_task().get_logger().report_plotly( + title=cls._report_plot_execution_flow['title'], + series='{} - {}'.format(cls._report_plot_execution_flow['series'], pipeline_task_id), + iteration=0, figure=execution_flow) + if execution_details: + Task.current_task().get_logger().report_plotly( + title=cls._report_plot_execution_details['title'], + series='{} - {}'.format(cls._report_plot_execution_details['series'], pipeline_task_id), + iteration=0, figure=execution_details)