mirror of
				https://github.com/clearml/clearml
				synced 2025-06-26 18:16:07 +00:00 
			
		
		
		
	Fix Pipeline pool period should be 5sec local, 20sec remote
Refactor Pipeline controller code
This commit is contained in:
		
							parent
							
								
									ca384aa75c
								
							
						
					
					
						commit
						e30f497618
					
				| @ -11,7 +11,7 @@ from logging import getLogger | ||||
| from multiprocessing import Process, Queue | ||||
| from multiprocessing.pool import ThreadPool | ||||
| from threading import Thread, Event, RLock, current_thread | ||||
| from time import time, sleep | ||||
| from time import time | ||||
| from typing import Sequence, Optional, Mapping, Callable, Any, List, Dict, Union, Tuple | ||||
| 
 | ||||
| from attr import attrib, attrs | ||||
| @ -31,11 +31,6 @@ from ..task import Task | ||||
| from ..utilities.process.mp import leave_process | ||||
| from ..utilities.proxy_object import LazyEvalWrapper, flatten_dictionary, walk_nested_dict_tuple_list | ||||
| 
 | ||||
| try: | ||||
|     import boto3 | ||||
| except ImportError: | ||||
|     boto3 = None | ||||
| 
 | ||||
| 
 | ||||
| class PipelineController(object): | ||||
|     """ | ||||
| @ -62,7 +57,6 @@ class PipelineController(object): | ||||
|     _monitor_node_interval = 5.*60 | ||||
|     _report_plot_execution_flow = dict(title='Pipeline', series='Execution Flow') | ||||
|     _report_plot_execution_details = dict(title='Pipeline Details', series='Execution Details') | ||||
|     _relaunch_check_cache = {} | ||||
|     _evaluated_return_values = {}  # TID: pipeline_name | ||||
|     _add_to_evaluated_return_values = {}  # TID: bool | ||||
| 
 | ||||
| @ -94,8 +88,6 @@ class PipelineController(object): | ||||
|         monitor_metrics = attrib(type=list, default=None)  # List of metric title/series to monitor | ||||
|         monitor_artifacts = attrib(type=list, default=None)  # List of artifact names to monitor | ||||
|         monitor_models = attrib(type=list, default=None)  # List of models to monitor | ||||
|         # If True, relaunch the node if the instance it ran on crashed | ||||
|         relaunch_on_instance_failure = attrib(type=bool, default=False) | ||||
| 
 | ||||
|         def __attrs_post_init__(self): | ||||
|             if self.parents is None: | ||||
| @ -140,7 +132,6 @@ class PipelineController(object): | ||||
|             auto_version_bump=True,  # type: bool | ||||
|             abort_on_failure=False,  # type: bool | ||||
|             add_run_number=True,  # type: bool | ||||
|             relaunch_on_instance_failure=False,  # type: bool | ||||
|     ): | ||||
|         # type: (...) -> None | ||||
|         """ | ||||
| @ -166,9 +157,6 @@ class PipelineController(object): | ||||
|             and mark the pipeline as failed. | ||||
|         :param add_run_number: If True (default), add the run number of the pipeline to the pipeline name. | ||||
|             Example, the second time we launch the pipeline "best pipeline", we rename it to "best pipeline #2" | ||||
|         :param relaunch_on_instance_failure: If True, check if the machine a pipeline step ran on | ||||
|             was terminated. In case it was, the step will be relaunched. As of now, only AWS instances are supported. | ||||
|             Default: False. | ||||
|         """ | ||||
|         self._nodes = {} | ||||
|         self._running_nodes = [] | ||||
| @ -201,7 +189,6 @@ class PipelineController(object): | ||||
|         self._mock_execution = False  # used for nested pipelines (eager execution) | ||||
|         self._pipeline_as_sub_project = bool(Session.check_min_api_server_version("2.17")) | ||||
|         self._last_progress_update_time = 0 | ||||
|         self._relaunch_on_instance_failure = relaunch_on_instance_failure | ||||
|         if not self._task: | ||||
|             task_name = name or project or '{}'.format(datetime.now()) | ||||
|             if self._pipeline_as_sub_project: | ||||
| @ -506,7 +493,6 @@ class PipelineController(object): | ||||
|             pre_execute_callback=None,  # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]]  # noqa | ||||
|             post_execute_callback=None,  # type: Optional[Callable[[PipelineController, PipelineController.Node], None]]  # noqa | ||||
|             cache_executed_step=False,  # type: bool | ||||
|             relaunch_on_instance_failure=None  # type: bool | ||||
|     ): | ||||
|         # type: (...) -> bool | ||||
|         """ | ||||
| @ -550,7 +536,6 @@ class PipelineController(object): | ||||
|             If not provided, no results will be stored as artifacts. | ||||
|         :param project_name: Set the project name for the task. Required if base_task_id is None. | ||||
|         :param task_name: Set the name of the remote task, if not provided use `name` argument. | ||||
|         self.task.reload() | ||||
|         :param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference', | ||||
|             'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom' | ||||
|         :param auto_connect_frameworks: Control the frameworks auto connect, see `Task.init` auto_connect_frameworks | ||||
| @ -641,11 +626,6 @@ class PipelineController(object): | ||||
|             Default: False, a new cloned copy of base_task is always used. | ||||
|             Notice: If the git repo reference does not have a specific commit ID, the Task will never be used. | ||||
| 
 | ||||
|         :param relaunch_on_instance_failure: If True, check if the machine this step ran on | ||||
|             was terminated. In case it was, the step will be relaunched. As of now, only AWS instances are supported. | ||||
|             By default, the value of this flag will be the one found in the PipelineController instance | ||||
|             this function is called from, which it itself is False by default. | ||||
| 
 | ||||
|         :return: True if successful | ||||
|         """ | ||||
|         # always store callback functions (even when running remotely) | ||||
| @ -724,8 +704,6 @@ class PipelineController(object): | ||||
|             a_task.update_task(task_definition) | ||||
|             return a_task | ||||
| 
 | ||||
|         if relaunch_on_instance_failure is None: | ||||
|             relaunch_on_instance_failure = self._relaunch_on_instance_failure | ||||
|         self._nodes[name] = self.Node( | ||||
|             name=name, base_task_id=None, parents=parents or [], | ||||
|             queue=execution_queue, timeout=time_limit, | ||||
| @ -739,7 +717,6 @@ class PipelineController(object): | ||||
|             monitor_metrics=monitor_metrics, | ||||
|             monitor_models=monitor_models, | ||||
|             job_code_section=job_code_section, | ||||
|             relaunch_on_instance_failure=relaunch_on_instance_failure | ||||
|         ) | ||||
| 
 | ||||
|         return True | ||||
| @ -1998,20 +1975,11 @@ class PipelineController(object): | ||||
|                 if not node.job: | ||||
|                     continue | ||||
|                 if node.job.is_stopped(aborted_nonresponsive_as_running=True): | ||||
|                     completed_jobs.append(j) | ||||
|                     node_failed = node.job.is_failed() | ||||
|                     node.executed = node.job.task_id() if not node_failed else False | ||||
|                     if j in launched_nodes: | ||||
|                         launched_nodes.remove(j) | ||||
|                     if (node_failed or node.job.is_aborted()) and self._should_relaunch_node(node): | ||||
|                         # marking the job as None lets us know that this node has to be requeued | ||||
|                         self._task.get_logger().report_text( | ||||
|                             "Relaunching step {} on instance termination".format(node.name) | ||||
|                         ) | ||||
|                         node.job = None | ||||
|                         node.executed = None | ||||
|                         self._running_nodes.remove(j) | ||||
|                         continue | ||||
|                     completed_jobs.append(j) | ||||
|                     # check if we need to stop all running steps | ||||
|                     if node_failed and self._abort_running_steps_on_failure and not node.continue_on_fail: | ||||
|                         nodes_failed_stop_pipeline.append(node.name) | ||||
| @ -2534,60 +2502,6 @@ class PipelineController(object): | ||||
| 
 | ||||
|         return '<a href="{}"> {} </a>'.format(task_link_template.format(project=project_id, task=task_id), task_id) | ||||
| 
 | ||||
|     def _should_relaunch_node(self, node): | ||||
|         # type: ('PipelineController.Node') -> bool | ||||
|         """ | ||||
|         Check if a node should be relaunched. At the moment, this function returns True only if the AWS instance | ||||
|         the node ran on terminated during the run | ||||
| 
 | ||||
|         :param node: The node to check if it should be relaunched | ||||
| 
 | ||||
|         :return: True if the node should be relaunched and False otherwise | ||||
|         """ | ||||
|         if not boto3 or not self._relaunch_on_instance_failure: | ||||
|             return False | ||||
|         worker = (node.job.worker() or "").split(":")[-1] | ||||
|         if not worker: | ||||
|             return False | ||||
| 
 | ||||
|         if (worker, node.name) in self._relaunch_check_cache: | ||||
|             return self._relaunch_check_cache[(worker, node.name)] | ||||
|         # get credentials from all autoscalers (shouldn't be too many) | ||||
|         aws_scaler_tasks = Task.get_tasks( | ||||
|             tags=["AWS"], | ||||
|             task_filter={ | ||||
|                 "status": ["in_progress"], | ||||
|                 "type": ["application"], | ||||
|                 "search_hidden": True, | ||||
|                 "_allow_extra_fields_": True, | ||||
|             }, | ||||
|         ) | ||||
|         used_creds = [] | ||||
|         for task in aws_scaler_tasks: | ||||
|             # noinspection PyBroadException | ||||
|             try: | ||||
|                 parameters = task.get_parameters() | ||||
|                 cred = { | ||||
|                         "aws_access_key_id": parameters["General/cloud_credentials_key"], | ||||
|                         "aws_secret_access_key": parameters["General/cloud_credentials_secret"], | ||||
|                         "region_name": parameters["General/cloud_credentials_region"], | ||||
|                 } | ||||
|                 if cred in used_creds: | ||||
|                     continue | ||||
|                 used_creds.append(cred) | ||||
|                 ec2_resource = boto3.resource("ec2", **cred) | ||||
|                 # check if AWS instance is still running | ||||
|                 # we would like to requeue if the instance somehow died | ||||
|                 # (like when a spot instance is terminated) | ||||
|                 instance = next(iter(ec2_resource.instances.filter(InstanceIds=[worker]))) | ||||
|                 if instance.state["Name"] != "running": | ||||
|                     self._relaunch_check_cache[(worker, node.name)] = True | ||||
|                     return True | ||||
|             except Exception: | ||||
|                 pass | ||||
|         self._relaunch_check_cache[(worker, node.name)] = False | ||||
|         return False | ||||
| 
 | ||||
| 
 | ||||
| class PipelineDecorator(PipelineController): | ||||
|     _added_decorator = []  # type: List[dict] | ||||
| @ -2612,7 +2526,6 @@ class PipelineDecorator(PipelineController): | ||||
|             target_project=None,  # type: Optional[str] | ||||
|             abort_on_failure=False,  # type: bool | ||||
|             add_run_number=True,  # type: bool | ||||
|             relaunch_on_instance_failure=False  # type: bool | ||||
|     ): | ||||
|         # type: (...) -> () | ||||
|         """ | ||||
| @ -2634,9 +2547,6 @@ class PipelineDecorator(PipelineController): | ||||
|             and mark the pipeline as failed. | ||||
|         :param add_run_number: If True (default), add the run number of the pipeline to the pipeline name. | ||||
|             Example, the second time we launch the pipeline "best pipeline", we rename it to "best pipeline #2" | ||||
|         :param relaunch_on_instance_failure: If True, check if the machine a pipeline step ran on | ||||
|             was terminated. In case it was, the step will be relaunched. As of now, only AWS instances are supported. | ||||
|             Default: False | ||||
|         """ | ||||
|         super(PipelineDecorator, self).__init__( | ||||
|             name=name, | ||||
| @ -2647,7 +2557,6 @@ class PipelineDecorator(PipelineController): | ||||
|             target_project=target_project, | ||||
|             abort_on_failure=abort_on_failure, | ||||
|             add_run_number=add_run_number, | ||||
|             relaunch_on_instance_failure=relaunch_on_instance_failure | ||||
|         ) | ||||
| 
 | ||||
|         # if we are in eager execution, make sure parent class knows it | ||||
| @ -2701,15 +2610,14 @@ class PipelineDecorator(PipelineController): | ||||
|                 if not node.job: | ||||
|                     continue | ||||
|                 if node.job.is_stopped(aborted_nonresponsive_as_running=True): | ||||
|                     node_failed = node.job.is_failed() | ||||
|                     if (node_failed or node.job.is_aborted()) and self._should_relaunch_node(node): | ||||
|                         continue | ||||
|                     completed_jobs.append(j) | ||||
|                     node_failed = node.job.is_failed() | ||||
|                     node.executed = node.job.task_id() if not node_failed else False | ||||
|                     if j in launched_nodes: | ||||
|                         launched_nodes.remove(j) | ||||
| 
 | ||||
|                     # check if we need to stop all running steps | ||||
|                     elif node_failed and self._abort_running_steps_on_failure and not node.continue_on_fail: | ||||
|                     if node_failed and self._abort_running_steps_on_failure and not node.continue_on_fail: | ||||
|                         nodes_failed_stop_pipeline.append(node.name) | ||||
|                 elif node.timeout: | ||||
|                     started = node.job.task.data.started | ||||
| @ -2957,8 +2865,7 @@ class PipelineDecorator(PipelineController): | ||||
|             helper_functions=None,  # type: Optional[Sequence[Callable]] | ||||
|             monitor_metrics=None,  # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]] | ||||
|             monitor_artifacts=None,  # type: Optional[List[Union[str, Tuple[str, str]]]] | ||||
|             monitor_models=None,  # type: Optional[List[Union[str, Tuple[str, str]]]] | ||||
|             relaunch_on_instance_failure=None  # type: bool | ||||
|             monitor_models=None  # type: Optional[List[Union[str, Tuple[str, str]]]] | ||||
|     ): | ||||
|         # type: (...) -> Callable | ||||
|         """ | ||||
| @ -3032,10 +2939,6 @@ class PipelineDecorator(PipelineController): | ||||
|             where the first string is the model name as it appears on the component Task, | ||||
|             and the second is the target model name to put on the Pipeline Task | ||||
|             Example: [('model_weights', 'final_model_weights'), ] | ||||
|         :param relaunch_on_instance_failure: If True, check if the machine this step ran on | ||||
|             was terminated. In case it was, the step will be relaunched. As of now, only AWS instances are supported. | ||||
|             By default, the value of this flag will be the one found passed in the PipelineDecorator.pipeline decorator, | ||||
|             which it itself is False by default. | ||||
| 
 | ||||
|         :return: function wrapper | ||||
|         """ | ||||
| @ -3075,7 +2978,6 @@ class PipelineDecorator(PipelineController): | ||||
|                 monitor_metrics=monitor_metrics, | ||||
|                 monitor_models=monitor_models, | ||||
|                 monitor_artifacts=monitor_artifacts, | ||||
|                 relaunch_on_instance_failure=relaunch_on_instance_failure | ||||
|             ) | ||||
| 
 | ||||
|             if cls._singleton: | ||||
| @ -3093,7 +2995,7 @@ class PipelineDecorator(PipelineController): | ||||
| 
 | ||||
|                     func_return = [] | ||||
| 
 | ||||
|                     def result_wrapper_callback(a_func_return, return_index): | ||||
|                     def result_wrapper(a_func_return, return_index): | ||||
|                         if not a_func_return: | ||||
|                             a_func_return.append(func(*args, **kwargs)) | ||||
|                         a_func_return = a_func_return[0] | ||||
| @ -3101,14 +3003,14 @@ class PipelineDecorator(PipelineController): | ||||
| 
 | ||||
|                     if len(function_return) == 1: | ||||
|                         ret_val = LazyEvalWrapper( | ||||
|                             callback=functools.partial(result_wrapper_callback, func_return, None), | ||||
|                             remote_reference=functools.partial(result_wrapper_callback, func_return, None)) | ||||
|                             callback=functools.partial(result_wrapper, func_return, None), | ||||
|                             remote_reference=functools.partial(result_wrapper, func_return, None)) | ||||
|                         cls._ref_lazy_loader_id_to_node_name[id(ret_val)] = _name | ||||
|                         return ret_val | ||||
|                     else: | ||||
|                         return_w = [LazyEvalWrapper( | ||||
|                             callback=functools.partial(result_wrapper_callback, func_return, i), | ||||
|                             remote_reference=functools.partial(result_wrapper_callback, func_return, i)) | ||||
|                             callback=functools.partial(result_wrapper, func_return, i), | ||||
|                             remote_reference=functools.partial(result_wrapper, func_return, i)) | ||||
|                             for i, _ in enumerate(function_return)] | ||||
|                         for i in return_w: | ||||
|                             cls._ref_lazy_loader_id_to_node_name[id(i)] = _name | ||||
| @ -3186,10 +3088,6 @@ class PipelineDecorator(PipelineController): | ||||
|                 # get node and park is as launched | ||||
|                 cls._singleton._launched_step_names.add(_node_name) | ||||
|                 _node = cls._singleton._nodes[_node_name] | ||||
|                 if relaunch_on_instance_failure is not None: | ||||
|                     _node.relaunch_on_instance_failure = relaunch_on_instance_failure | ||||
|                 else: | ||||
|                     _node.relaunch_on_instance_failure = cls._singleton._relaunch_on_instance_failure | ||||
| 
 | ||||
|                 # The actual launch is a bit slow, we run it in the background | ||||
|                 launch_thread = Thread( | ||||
| @ -3209,7 +3107,8 @@ class PipelineDecorator(PipelineController): | ||||
|                             raise ValueError("Job was not created and is also not cached/executed") | ||||
|                         return "{}.{}".format(_node.executed, return_name) | ||||
| 
 | ||||
|                     _node.job.wait(pool_period=1 if cls._debug_execute_step_process else 5, | ||||
|                     # wait in seconds | ||||
|                     _node.job.wait(pool_period=5. if cls._debug_execute_step_process else 20., | ||||
|                                    aborted_nonresponsive_as_running=True) | ||||
|                     if _node.job.is_failed() and not _node.continue_on_fail: | ||||
|                         raise ValueError( | ||||
| @ -3225,37 +3124,11 @@ class PipelineDecorator(PipelineController): | ||||
|                             launch_thread.join() | ||||
|                         except:  # noqa | ||||
|                             pass | ||||
|                     while True: | ||||
|                         # wait until job is completed | ||||
|                         if _node.job: | ||||
|                             _node.job.wait(pool_period=1 if cls._debug_execute_step_process else 5, | ||||
|                                            aborted_nonresponsive_as_running=True) | ||||
|                         else: | ||||
|                             sleep(2) | ||||
|                             continue | ||||
| 
 | ||||
|                         if _node.job.is_failed() or _node.job.is_aborted(): | ||||
|                             # noinspection PyProtectedMember | ||||
|                             if cls._singleton._should_relaunch_node(_node): | ||||
|                                 cls._singleton._task.get_logger().report_text( | ||||
|                                     "Relaunching step {} on instance termination".format(_node.name) | ||||
|                                 ) | ||||
|                                 _node.job = None | ||||
|                                 _node.executed = None | ||||
|                                 cls._component_launch( | ||||
|                                     _node.name, | ||||
|                                     _node, | ||||
|                                     kwargs_artifacts, | ||||
|                                     kwargs, | ||||
|                                     current_thread().ident | ||||
|                                 ) | ||||
|                                 continue | ||||
|                             else: | ||||
|                                 raise ValueError( | ||||
|                                     'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id()) | ||||
|                                 ) | ||||
|                         else: | ||||
|                             break | ||||
|                     # wait until job is completed | ||||
|                     _node.job.wait(pool_period=5. if cls._debug_execute_step_process else 20.) | ||||
|                     if _node.job.is_failed(): | ||||
|                         raise ValueError( | ||||
|                             'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id())) | ||||
| 
 | ||||
|                     _node.executed = _node.job.task_id() | ||||
| 
 | ||||
| @ -3301,7 +3174,6 @@ class PipelineDecorator(PipelineController): | ||||
|             multi_instance_support=False,  # type: bool | ||||
|             add_run_number=True,  # type: bool | ||||
|             args_map=None,  # type: dict[str, List[str]] | ||||
|             relaunch_on_instance_failure=False,  # type: bool | ||||
|             start_controller_locally=False  # type: bool | ||||
|     ): | ||||
|         # type: (...) -> Callable | ||||
| @ -3349,13 +3221,6 @@ class PipelineDecorator(PipelineController): | ||||
|               - paramB: sectionB/paramB | ||||
|               - paramC: sectionB/paramC | ||||
|               - paramD: Args/paramD | ||||
| 
 | ||||
|         :param relaunch_on_instance_failure: If True, check if the machine a pipeline step ran on | ||||
|             was terminated. In case it was, the step will be relaunched. As of now, only AWS instances are supported. | ||||
|             Default: False | ||||
|         :param start_controller_locally: If True, start the controller on the local machine. The steps will run | ||||
|             remotely if `PipelineDecorator.run_locally` or `PipelineDecorator.debug_pipeline` are not called. | ||||
|             Default: False | ||||
|         """ | ||||
|         def decorator_wrap(func): | ||||
| 
 | ||||
| @ -3392,7 +3257,6 @@ class PipelineDecorator(PipelineController): | ||||
|                         target_project=target_project, | ||||
|                         abort_on_failure=abort_on_failure, | ||||
|                         add_run_number=add_run_number, | ||||
|                         relaunch_on_instance_failure=relaunch_on_instance_failure | ||||
|                     ) | ||||
|                     ret_val = func(**pipeline_kwargs) | ||||
|                     LazyEvalWrapper.trigger_all_remote_references() | ||||
| @ -3434,7 +3298,6 @@ class PipelineDecorator(PipelineController): | ||||
|                     target_project=target_project, | ||||
|                     abort_on_failure=abort_on_failure, | ||||
|                     add_run_number=add_run_number, | ||||
|                     relaunch_on_instance_failure=relaunch_on_instance_failure | ||||
|                 ) | ||||
| 
 | ||||
|                 a_pipeline._args_map = args_map or {} | ||||
| @ -3459,6 +3322,7 @@ class PipelineDecorator(PipelineController): | ||||
|                     a_pipeline._task._set_runtime_properties( | ||||
|                         dict(multi_pipeline_counter=str(cls._multi_pipeline_call_counter))) | ||||
| 
 | ||||
|                 # serialize / deserialize state only if we are running locally | ||||
|                 a_pipeline._start(wait=False) | ||||
| 
 | ||||
|                 # sync arguments back (post deserialization and casting back) | ||||
| @ -3467,8 +3331,7 @@ class PipelineDecorator(PipelineController): | ||||
|                         pipeline_kwargs[k] = a_pipeline.get_parameters()[k] | ||||
| 
 | ||||
|                 # run the actual pipeline | ||||
|                 if not start_controller_locally and \ | ||||
|                         not PipelineDecorator._debug_execute_step_process and pipeline_execution_queue: | ||||
|                 if not PipelineDecorator._debug_execute_step_process and pipeline_execution_queue: | ||||
|                     # rerun the pipeline on a remote machine | ||||
|                     a_pipeline._task.execute_remotely(queue_name=pipeline_execution_queue) | ||||
|                     # when we get here it means we are running remotely | ||||
| @ -3493,7 +3356,7 @@ class PipelineDecorator(PipelineController): | ||||
|                     for node in list(a_pipeline._nodes.values()): | ||||
|                         if node.executed or not node.job or node.job.is_stopped(aborted_nonresponsive_as_running=True): | ||||
|                             continue | ||||
|                         node.job.wait(pool_period=1 if cls._debug_execute_step_process else 5, | ||||
|                         node.job.wait(pool_period=5. if cls._debug_execute_step_process else 20., | ||||
|                                       aborted_nonresponsive_as_running=True) | ||||
|                         waited = True | ||||
|                 # store the pipeline result of we have any: | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user
	 allegroai
						allegroai