diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index da0b2f4d..cb860bc7 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -11,7 +11,7 @@ from logging import getLogger from multiprocessing import Process, Queue from multiprocessing.pool import ThreadPool from threading import Thread, Event, RLock, current_thread -from time import time +from time import time, sleep from typing import Sequence, Optional, Mapping, Callable, Any, List, Dict, Union, Tuple from attr import attrib, attrs @@ -31,6 +31,11 @@ from ..task import Task from ..utilities.process.mp import leave_process from ..utilities.proxy_object import LazyEvalWrapper, flatten_dictionary, walk_nested_dict_tuple_list +try: + import boto3 +except ImportError: + boto3 = None + class PipelineController(object): """ @@ -57,6 +62,7 @@ class PipelineController(object): _monitor_node_interval = 5.*60 _report_plot_execution_flow = dict(title='Pipeline', series='Execution Flow') _report_plot_execution_details = dict(title='Pipeline Details', series='Execution Details') + _relaunch_check_cache = {} _evaluated_return_values = {} # TID: pipeline_name _add_to_evaluated_return_values = {} # TID: bool @@ -88,6 +94,8 @@ class PipelineController(object): monitor_metrics = attrib(type=list, default=None) # List of metric title/series to monitor monitor_artifacts = attrib(type=list, default=None) # List of artifact names to monitor monitor_models = attrib(type=list, default=None) # List of models to monitor + # If True, relaunch the node if the instance it ran on crashed + relaunch_on_instance_failure = attrib(type=bool, default=False) def __attrs_post_init__(self): if self.parents is None: @@ -132,6 +140,7 @@ class PipelineController(object): auto_version_bump=True, # type: bool abort_on_failure=False, # type: bool add_run_number=True, # type: bool + relaunch_on_instance_failure=False, # type: bool ): # type: (...) -> None """ @@ -157,6 +166,9 @@ class PipelineController(object): and mark the pipeline as failed. :param add_run_number: If True (default), add the run number of the pipeline to the pipeline name. Example, the second time we launch the pipeline "best pipeline", we rename it to "best pipeline #2" + :param relaunch_on_instance_failure: If True, check if the machine a pipeline step ran on + was terminated. In case it was, the step will be relaunched. As of now, only AWS instances are supported. + Default: False. """ self._nodes = {} self._running_nodes = [] @@ -189,6 +201,7 @@ class PipelineController(object): self._mock_execution = False # used for nested pipelines (eager execution) self._pipeline_as_sub_project = bool(Session.check_min_api_server_version("2.17")) self._last_progress_update_time = 0 + self._relaunch_on_instance_failure = relaunch_on_instance_failure if not self._task: task_name = name or project or '{}'.format(datetime.now()) if self._pipeline_as_sub_project: @@ -493,6 +506,7 @@ class PipelineController(object): pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa cache_executed_step=False, # type: bool + relaunch_on_instance_failure=None # type: bool ): # type: (...) -> bool """ @@ -536,6 +550,7 @@ class PipelineController(object): If not provided, no results will be stored as artifacts. :param project_name: Set the project name for the task. Required if base_task_id is None. :param task_name: Set the name of the remote task, if not provided use `name` argument. + self.task.reload() :param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference', 'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom' :param auto_connect_frameworks: Control the frameworks auto connect, see `Task.init` auto_connect_frameworks @@ -626,6 +641,11 @@ class PipelineController(object): Default: False, a new cloned copy of base_task is always used. Notice: If the git repo reference does not have a specific commit ID, the Task will never be used. + :param relaunch_on_instance_failure: If True, check if the machine this step ran on + was terminated. In case it was, the step will be relaunched. As of now, only AWS instances are supported. + By default, the value of this flag will be the one found in the PipelineController instance + this function is called from, which it itself is False by default. + :return: True if successful """ # always store callback functions (even when running remotely) @@ -704,6 +724,8 @@ class PipelineController(object): a_task.update_task(task_definition) return a_task + if relaunch_on_instance_failure is None: + relaunch_on_instance_failure = self._relaunch_on_instance_failure self._nodes[name] = self.Node( name=name, base_task_id=None, parents=parents or [], queue=execution_queue, timeout=time_limit, @@ -717,6 +739,7 @@ class PipelineController(object): monitor_metrics=monitor_metrics, monitor_models=monitor_models, job_code_section=job_code_section, + relaunch_on_instance_failure=relaunch_on_instance_failure ) return True @@ -1975,11 +1998,20 @@ class PipelineController(object): if not node.job: continue if node.job.is_stopped(): - completed_jobs.append(j) node_failed = node.job.is_failed() node.executed = node.job.task_id() if not node_failed else False if j in launched_nodes: launched_nodes.remove(j) + if (node_failed or node.job.is_aborted()) and self._should_relaunch_node(node): + # marking the job as None lets us know that this node has to be requeued + self._task.get_logger().report_text( + "Relaunching step {} on instance termination".format(node.name) + ) + node.job = None + node.executed = None + self._running_nodes.remove(j) + continue + completed_jobs.append(j) # check if we need to stop all running steps if node_failed and self._abort_running_steps_on_failure and not node.continue_on_fail: nodes_failed_stop_pipeline.append(node.name) @@ -2502,6 +2534,57 @@ class PipelineController(object): return ' {} '.format(task_link_template.format(project=project_id, task=task_id), task_id) + def _should_relaunch_node(self, node): + # type: ('PipelineController.Node') -> bool + """ + Check if a node should be relaunched. At the moment, this function returns True only if the AWS instance + the node ran on terminated during the run + + :param node: The node to check if it should be relaunched + + :return: True if the node should be relaunched and False otherwise + """ + if not boto3 or not self._relaunch_on_instance_failure: + return False + worker = node.job.worker().split(":")[-1] + if (worker, node.name) in self._relaunch_check_cache: + return self._relaunch_check_cache[(worker, node.name)] + # get credentials from all autoscalers (shouldn't be too many) + aws_scaler_tasks = Task.get_tasks( + tags=["AWS"], + task_filter={ + "status": ["in_progress"], + "type": ["application"], + "search_hidden": True, + "_allow_extra_fields_": True, + }, + ) + used_creds = [] + for task in aws_scaler_tasks: + # noinspection PyBroadException + try: + parameters = task.get_parameters() + cred = { + "aws_access_key_id": parameters["General/cloud_credentials_key"], + "aws_secret_access_key": parameters["General/cloud_credentials_secret"], + "region_name": parameters["General/cloud_credentials_region"], + } + if cred in used_creds: + continue + used_creds.append(cred) + ec2_resource = boto3.resource("ec2", **cred) + # check if AWS instance is still running + # we would like to requeue if the instance somehow died + # (like when a spot instance is terminated) + instance = next(iter(ec2_resource.instances.filter(InstanceIds=[worker]))) + if instance.state["Name"] != "running": + self._relaunch_check_cache[(worker, node.name)] = True + return True + except Exception: + pass + self._relaunch_check_cache[(worker, node.name)] = False + return False + class PipelineDecorator(PipelineController): _added_decorator = [] # type: List[dict] @@ -2526,6 +2609,7 @@ class PipelineDecorator(PipelineController): target_project=None, # type: Optional[str] abort_on_failure=False, # type: bool add_run_number=True, # type: bool + relaunch_on_instance_failure=False # type: bool ): # type: (...) -> () """ @@ -2547,6 +2631,9 @@ class PipelineDecorator(PipelineController): and mark the pipeline as failed. :param add_run_number: If True (default), add the run number of the pipeline to the pipeline name. Example, the second time we launch the pipeline "best pipeline", we rename it to "best pipeline #2" + :param relaunch_on_instance_failure: If True, check if the machine a pipeline step ran on + was terminated. In case it was, the step will be relaunched. As of now, only AWS instances are supported. + Default: False """ super(PipelineDecorator, self).__init__( name=name, @@ -2557,6 +2644,7 @@ class PipelineDecorator(PipelineController): target_project=target_project, abort_on_failure=abort_on_failure, add_run_number=add_run_number, + relaunch_on_instance_failure=relaunch_on_instance_failure ) # if we are in eager execution, make sure parent class knows it @@ -2610,13 +2698,15 @@ class PipelineDecorator(PipelineController): if not node.job: continue if node.job.is_stopped(): - completed_jobs.append(j) node_failed = node.job.is_failed() + if (node_failed or node.job.is_aborted()) and self._should_relaunch_node(node): + continue + completed_jobs.append(j) node.executed = node.job.task_id() if not node_failed else False if j in launched_nodes: launched_nodes.remove(j) # check if we need to stop all running steps - if node_failed and self._abort_running_steps_on_failure and not node.continue_on_fail: + elif node_failed and self._abort_running_steps_on_failure and not node.continue_on_fail: nodes_failed_stop_pipeline.append(node.name) elif node.timeout: started = node.job.task.data.started @@ -2864,7 +2954,8 @@ class PipelineDecorator(PipelineController): helper_functions=None, # type: Optional[Sequence[Callable]] monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]] monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]] - monitor_models=None # type: Optional[List[Union[str, Tuple[str, str]]]] + monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]] + relaunch_on_instance_failure=None # type: bool ): # type: (...) -> Callable """ @@ -2938,6 +3029,10 @@ class PipelineDecorator(PipelineController): where the first string is the model name as it appears on the component Task, and the second is the target model name to put on the Pipeline Task Example: [('model_weights', 'final_model_weights'), ] + :param relaunch_on_instance_failure: If True, check if the machine this step ran on + was terminated. In case it was, the step will be relaunched. As of now, only AWS instances are supported. + By default, the value of this flag will be the one found passed in the PipelineDecorator.pipeline decorator, + which it itself is False by default. :return: function wrapper """ @@ -2977,6 +3072,7 @@ class PipelineDecorator(PipelineController): monitor_metrics=monitor_metrics, monitor_models=monitor_models, monitor_artifacts=monitor_artifacts, + relaunch_on_instance_failure=relaunch_on_instance_failure ) if cls._singleton: @@ -2994,7 +3090,7 @@ class PipelineDecorator(PipelineController): func_return = [] - def result_wrapper(a_func_return, return_index): + def result_wrapper_callback(a_func_return, return_index): if not a_func_return: a_func_return.append(func(*args, **kwargs)) a_func_return = a_func_return[0] @@ -3002,14 +3098,14 @@ class PipelineDecorator(PipelineController): if len(function_return) == 1: ret_val = LazyEvalWrapper( - callback=functools.partial(result_wrapper, func_return, None), - remote_reference=functools.partial(result_wrapper, func_return, None)) + callback=functools.partial(result_wrapper_callback, func_return, None), + remote_reference=functools.partial(result_wrapper_callback, func_return, None)) cls._ref_lazy_loader_id_to_node_name[id(ret_val)] = _name return ret_val else: return_w = [LazyEvalWrapper( - callback=functools.partial(result_wrapper, func_return, i), - remote_reference=functools.partial(result_wrapper, func_return, i)) + callback=functools.partial(result_wrapper_callback, func_return, i), + remote_reference=functools.partial(result_wrapper_callback, func_return, i)) for i, _ in enumerate(function_return)] for i in return_w: cls._ref_lazy_loader_id_to_node_name[id(i)] = _name @@ -3087,6 +3183,10 @@ class PipelineDecorator(PipelineController): # get node and park is as launched cls._singleton._launched_step_names.add(_node_name) _node = cls._singleton._nodes[_node_name] + if relaunch_on_instance_failure is not None: + _node.relaunch_on_instance_failure = relaunch_on_instance_failure + else: + _node.relaunch_on_instance_failure = cls._singleton._relaunch_on_instance_failure # The actual launch is a bit slow, we run it in the background launch_thread = Thread( @@ -3121,11 +3221,34 @@ class PipelineDecorator(PipelineController): launch_thread.join() except: # noqa pass - # wait until job is completed - _node.job.wait(pool_period=0.2) - if _node.job.is_failed(): - raise ValueError( - 'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id())) + while True: + # wait until job is completed + if _node.job: + _node.job.wait(pool_period=0.2) + else: + sleep(0.2) + continue + if _node.job.is_failed() or _node.job.is_aborted(): + if cls._singleton._should_relaunch_node(_node): + cls._singleton._task.get_logger().report_text( + "Relaunching step {} on instance termination".format(_node.name) + ) + _node.job = None + _node.executed = None + cls._component_launch( + _node.name, + _node, + kwargs_artifacts, + kwargs, + current_thread().ident + ) + continue + else: + raise ValueError( + 'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id()) + ) + else: + break _node.executed = _node.job.task_id() @@ -3170,7 +3293,9 @@ class PipelineDecorator(PipelineController): pipeline_execution_queue='services', # type: Optional[str] multi_instance_support=False, # type: bool add_run_number=True, # type: bool - args_map=None # type: dict[str, List[str]] + args_map=None, # type: dict[str, List[str]] + relaunch_on_instance_failure=False, # type: bool + start_controller_locally=False # type: bool ): # type: (...) -> Callable """ @@ -3217,6 +3342,13 @@ class PipelineDecorator(PipelineController): - paramB: sectionB/paramB - paramC: sectionB/paramC - paramD: Args/paramD + + :param relaunch_on_instance_failure: If True, check if the machine a pipeline step ran on + was terminated. In case it was, the step will be relaunched. As of now, only AWS instances are supported. + Default: False + :param start_controller_locally: If True, start the controller on the local machine. The steps will run + remotely if `PipelineDecorator.run_locally` or `PipelineDecorator.debug_pipeline` are not called. + Default: False """ def decorator_wrap(func): @@ -3253,6 +3385,7 @@ class PipelineDecorator(PipelineController): target_project=target_project, abort_on_failure=abort_on_failure, add_run_number=add_run_number, + relaunch_on_instance_failure=relaunch_on_instance_failure ) ret_val = func(**pipeline_kwargs) LazyEvalWrapper.trigger_all_remote_references() @@ -3294,6 +3427,7 @@ class PipelineDecorator(PipelineController): target_project=target_project, abort_on_failure=abort_on_failure, add_run_number=add_run_number, + relaunch_on_instance_failure=relaunch_on_instance_failure ) a_pipeline._args_map = args_map or {} @@ -3318,7 +3452,6 @@ class PipelineDecorator(PipelineController): a_pipeline._task._set_runtime_properties( dict(multi_pipeline_counter=str(cls._multi_pipeline_call_counter))) - # serialize / deserialize state only if we are running locally a_pipeline._start(wait=False) # sync arguments back (post deserialization and casting back) @@ -3327,7 +3460,7 @@ class PipelineDecorator(PipelineController): pipeline_kwargs[k] = a_pipeline.get_parameters()[k] # run the actual pipeline - if not PipelineDecorator._debug_execute_step_process and pipeline_execution_queue: + if not start_controller_locally and not PipelineDecorator._debug_execute_step_process and pipeline_execution_queue: # rerun the pipeline on a remote machine a_pipeline._task.execute_remotely(queue_name=pipeline_execution_queue) # when we get here it means we are running remotely diff --git a/clearml/automation/job.py b/clearml/automation/job.py index bc4f3908..fc4c8d04 100644 --- a/clearml/automation/job.py +++ b/clearml/automation/job.py @@ -202,11 +202,8 @@ class BaseJob(object): return self._worker if self._worker is None: - # the last console outputs will update the worker - self.get_console_output(number_of_reports=1) - # if we still do not have it, store empty string - if not self._worker: - self._worker = '' + self.task.reload() + self._worker = self.task.last_worker return self._worker @@ -780,10 +777,6 @@ class _JobStub(object): # type: () -> str return 'stub' - def worker(self): - # type: () -> () - return None - def status(self): # type: () -> str return 'in_progress' diff --git a/clearml/task.py b/clearml/task.py index 99f5dee0..d2112874 100644 --- a/clearml/task.py +++ b/clearml/task.py @@ -1058,6 +1058,10 @@ class Task(_Task): # type: () -> str return self.storage_uri + @property + def last_worker(self): + return self._data.last_worker + @output_uri.setter def output_uri(self, value): # type: (Union[str, bool]) -> None