Add support for pipelines with spot instances

This commit is contained in:
allegroai 2022-07-21 17:22:11 +03:00
parent a42c4b0bd3
commit 157df5dad3
3 changed files with 157 additions and 27 deletions

View File

@ -11,7 +11,7 @@ from logging import getLogger
from multiprocessing import Process, Queue
from multiprocessing.pool import ThreadPool
from threading import Thread, Event, RLock, current_thread
from time import time
from time import time, sleep
from typing import Sequence, Optional, Mapping, Callable, Any, List, Dict, Union, Tuple
from attr import attrib, attrs
@ -31,6 +31,11 @@ from ..task import Task
from ..utilities.process.mp import leave_process
from ..utilities.proxy_object import LazyEvalWrapper, flatten_dictionary, walk_nested_dict_tuple_list
try:
import boto3
except ImportError:
boto3 = None
class PipelineController(object):
"""
@ -57,6 +62,7 @@ class PipelineController(object):
_monitor_node_interval = 5.*60
_report_plot_execution_flow = dict(title='Pipeline', series='Execution Flow')
_report_plot_execution_details = dict(title='Pipeline Details', series='Execution Details')
_relaunch_check_cache = {}
_evaluated_return_values = {} # TID: pipeline_name
_add_to_evaluated_return_values = {} # TID: bool
@ -88,6 +94,8 @@ class PipelineController(object):
monitor_metrics = attrib(type=list, default=None) # List of metric title/series to monitor
monitor_artifacts = attrib(type=list, default=None) # List of artifact names to monitor
monitor_models = attrib(type=list, default=None) # List of models to monitor
# If True, relaunch the node if the instance it ran on crashed
relaunch_on_instance_failure = attrib(type=bool, default=False)
def __attrs_post_init__(self):
if self.parents is None:
@ -132,6 +140,7 @@ class PipelineController(object):
auto_version_bump=True, # type: bool
abort_on_failure=False, # type: bool
add_run_number=True, # type: bool
relaunch_on_instance_failure=False, # type: bool
):
# type: (...) -> None
"""
@ -157,6 +166,9 @@ class PipelineController(object):
and mark the pipeline as failed.
:param add_run_number: If True (default), add the run number of the pipeline to the pipeline name.
Example, the second time we launch the pipeline "best pipeline", we rename it to "best pipeline #2"
:param relaunch_on_instance_failure: If True, check if the machine a pipeline step ran on
was terminated. In case it was, the step will be relaunched. As of now, only AWS instances are supported.
Default: False.
"""
self._nodes = {}
self._running_nodes = []
@ -189,6 +201,7 @@ class PipelineController(object):
self._mock_execution = False # used for nested pipelines (eager execution)
self._pipeline_as_sub_project = bool(Session.check_min_api_server_version("2.17"))
self._last_progress_update_time = 0
self._relaunch_on_instance_failure = relaunch_on_instance_failure
if not self._task:
task_name = name or project or '{}'.format(datetime.now())
if self._pipeline_as_sub_project:
@ -493,6 +506,7 @@ class PipelineController(object):
pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
cache_executed_step=False, # type: bool
relaunch_on_instance_failure=None # type: bool
):
# type: (...) -> bool
"""
@ -536,6 +550,7 @@ class PipelineController(object):
If not provided, no results will be stored as artifacts.
:param project_name: Set the project name for the task. Required if base_task_id is None.
:param task_name: Set the name of the remote task, if not provided use `name` argument.
self.task.reload()
:param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference',
'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom'
:param auto_connect_frameworks: Control the frameworks auto connect, see `Task.init` auto_connect_frameworks
@ -626,6 +641,11 @@ class PipelineController(object):
Default: False, a new cloned copy of base_task is always used.
Notice: If the git repo reference does not have a specific commit ID, the Task will never be used.
:param relaunch_on_instance_failure: If True, check if the machine this step ran on
was terminated. In case it was, the step will be relaunched. As of now, only AWS instances are supported.
By default, the value of this flag will be the one found in the PipelineController instance
this function is called from, which it itself is False by default.
:return: True if successful
"""
# always store callback functions (even when running remotely)
@ -704,6 +724,8 @@ class PipelineController(object):
a_task.update_task(task_definition)
return a_task
if relaunch_on_instance_failure is None:
relaunch_on_instance_failure = self._relaunch_on_instance_failure
self._nodes[name] = self.Node(
name=name, base_task_id=None, parents=parents or [],
queue=execution_queue, timeout=time_limit,
@ -717,6 +739,7 @@ class PipelineController(object):
monitor_metrics=monitor_metrics,
monitor_models=monitor_models,
job_code_section=job_code_section,
relaunch_on_instance_failure=relaunch_on_instance_failure
)
return True
@ -1975,11 +1998,20 @@ class PipelineController(object):
if not node.job:
continue
if node.job.is_stopped():
completed_jobs.append(j)
node_failed = node.job.is_failed()
node.executed = node.job.task_id() if not node_failed else False
if j in launched_nodes:
launched_nodes.remove(j)
if (node_failed or node.job.is_aborted()) and self._should_relaunch_node(node):
# marking the job as None lets us know that this node has to be requeued
self._task.get_logger().report_text(
"Relaunching step {} on instance termination".format(node.name)
)
node.job = None
node.executed = None
self._running_nodes.remove(j)
continue
completed_jobs.append(j)
# check if we need to stop all running steps
if node_failed and self._abort_running_steps_on_failure and not node.continue_on_fail:
nodes_failed_stop_pipeline.append(node.name)
@ -2502,6 +2534,57 @@ class PipelineController(object):
return '<a href="{}"> {} </a>'.format(task_link_template.format(project=project_id, task=task_id), task_id)
def _should_relaunch_node(self, node):
# type: ('PipelineController.Node') -> bool
"""
Check if a node should be relaunched. At the moment, this function returns True only if the AWS instance
the node ran on terminated during the run
:param node: The node to check if it should be relaunched
:return: True if the node should be relaunched and False otherwise
"""
if not boto3 or not self._relaunch_on_instance_failure:
return False
worker = node.job.worker().split(":")[-1]
if (worker, node.name) in self._relaunch_check_cache:
return self._relaunch_check_cache[(worker, node.name)]
# get credentials from all autoscalers (shouldn't be too many)
aws_scaler_tasks = Task.get_tasks(
tags=["AWS"],
task_filter={
"status": ["in_progress"],
"type": ["application"],
"search_hidden": True,
"_allow_extra_fields_": True,
},
)
used_creds = []
for task in aws_scaler_tasks:
# noinspection PyBroadException
try:
parameters = task.get_parameters()
cred = {
"aws_access_key_id": parameters["General/cloud_credentials_key"],
"aws_secret_access_key": parameters["General/cloud_credentials_secret"],
"region_name": parameters["General/cloud_credentials_region"],
}
if cred in used_creds:
continue
used_creds.append(cred)
ec2_resource = boto3.resource("ec2", **cred)
# check if AWS instance is still running
# we would like to requeue if the instance somehow died
# (like when a spot instance is terminated)
instance = next(iter(ec2_resource.instances.filter(InstanceIds=[worker])))
if instance.state["Name"] != "running":
self._relaunch_check_cache[(worker, node.name)] = True
return True
except Exception:
pass
self._relaunch_check_cache[(worker, node.name)] = False
return False
class PipelineDecorator(PipelineController):
_added_decorator = [] # type: List[dict]
@ -2526,6 +2609,7 @@ class PipelineDecorator(PipelineController):
target_project=None, # type: Optional[str]
abort_on_failure=False, # type: bool
add_run_number=True, # type: bool
relaunch_on_instance_failure=False # type: bool
):
# type: (...) -> ()
"""
@ -2547,6 +2631,9 @@ class PipelineDecorator(PipelineController):
and mark the pipeline as failed.
:param add_run_number: If True (default), add the run number of the pipeline to the pipeline name.
Example, the second time we launch the pipeline "best pipeline", we rename it to "best pipeline #2"
:param relaunch_on_instance_failure: If True, check if the machine a pipeline step ran on
was terminated. In case it was, the step will be relaunched. As of now, only AWS instances are supported.
Default: False
"""
super(PipelineDecorator, self).__init__(
name=name,
@ -2557,6 +2644,7 @@ class PipelineDecorator(PipelineController):
target_project=target_project,
abort_on_failure=abort_on_failure,
add_run_number=add_run_number,
relaunch_on_instance_failure=relaunch_on_instance_failure
)
# if we are in eager execution, make sure parent class knows it
@ -2610,13 +2698,15 @@ class PipelineDecorator(PipelineController):
if not node.job:
continue
if node.job.is_stopped():
completed_jobs.append(j)
node_failed = node.job.is_failed()
if (node_failed or node.job.is_aborted()) and self._should_relaunch_node(node):
continue
completed_jobs.append(j)
node.executed = node.job.task_id() if not node_failed else False
if j in launched_nodes:
launched_nodes.remove(j)
# check if we need to stop all running steps
if node_failed and self._abort_running_steps_on_failure and not node.continue_on_fail:
elif node_failed and self._abort_running_steps_on_failure and not node.continue_on_fail:
nodes_failed_stop_pipeline.append(node.name)
elif node.timeout:
started = node.job.task.data.started
@ -2864,7 +2954,8 @@ class PipelineDecorator(PipelineController):
helper_functions=None, # type: Optional[Sequence[Callable]]
monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]]
monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
monitor_models=None # type: Optional[List[Union[str, Tuple[str, str]]]]
monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
relaunch_on_instance_failure=None # type: bool
):
# type: (...) -> Callable
"""
@ -2938,6 +3029,10 @@ class PipelineDecorator(PipelineController):
where the first string is the model name as it appears on the component Task,
and the second is the target model name to put on the Pipeline Task
Example: [('model_weights', 'final_model_weights'), ]
:param relaunch_on_instance_failure: If True, check if the machine this step ran on
was terminated. In case it was, the step will be relaunched. As of now, only AWS instances are supported.
By default, the value of this flag will be the one found passed in the PipelineDecorator.pipeline decorator,
which it itself is False by default.
:return: function wrapper
"""
@ -2977,6 +3072,7 @@ class PipelineDecorator(PipelineController):
monitor_metrics=monitor_metrics,
monitor_models=monitor_models,
monitor_artifacts=monitor_artifacts,
relaunch_on_instance_failure=relaunch_on_instance_failure
)
if cls._singleton:
@ -2994,7 +3090,7 @@ class PipelineDecorator(PipelineController):
func_return = []
def result_wrapper(a_func_return, return_index):
def result_wrapper_callback(a_func_return, return_index):
if not a_func_return:
a_func_return.append(func(*args, **kwargs))
a_func_return = a_func_return[0]
@ -3002,14 +3098,14 @@ class PipelineDecorator(PipelineController):
if len(function_return) == 1:
ret_val = LazyEvalWrapper(
callback=functools.partial(result_wrapper, func_return, None),
remote_reference=functools.partial(result_wrapper, func_return, None))
callback=functools.partial(result_wrapper_callback, func_return, None),
remote_reference=functools.partial(result_wrapper_callback, func_return, None))
cls._ref_lazy_loader_id_to_node_name[id(ret_val)] = _name
return ret_val
else:
return_w = [LazyEvalWrapper(
callback=functools.partial(result_wrapper, func_return, i),
remote_reference=functools.partial(result_wrapper, func_return, i))
callback=functools.partial(result_wrapper_callback, func_return, i),
remote_reference=functools.partial(result_wrapper_callback, func_return, i))
for i, _ in enumerate(function_return)]
for i in return_w:
cls._ref_lazy_loader_id_to_node_name[id(i)] = _name
@ -3087,6 +3183,10 @@ class PipelineDecorator(PipelineController):
# get node and park is as launched
cls._singleton._launched_step_names.add(_node_name)
_node = cls._singleton._nodes[_node_name]
if relaunch_on_instance_failure is not None:
_node.relaunch_on_instance_failure = relaunch_on_instance_failure
else:
_node.relaunch_on_instance_failure = cls._singleton._relaunch_on_instance_failure
# The actual launch is a bit slow, we run it in the background
launch_thread = Thread(
@ -3121,11 +3221,34 @@ class PipelineDecorator(PipelineController):
launch_thread.join()
except: # noqa
pass
# wait until job is completed
_node.job.wait(pool_period=0.2)
if _node.job.is_failed():
raise ValueError(
'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id()))
while True:
# wait until job is completed
if _node.job:
_node.job.wait(pool_period=0.2)
else:
sleep(0.2)
continue
if _node.job.is_failed() or _node.job.is_aborted():
if cls._singleton._should_relaunch_node(_node):
cls._singleton._task.get_logger().report_text(
"Relaunching step {} on instance termination".format(_node.name)
)
_node.job = None
_node.executed = None
cls._component_launch(
_node.name,
_node,
kwargs_artifacts,
kwargs,
current_thread().ident
)
continue
else:
raise ValueError(
'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id())
)
else:
break
_node.executed = _node.job.task_id()
@ -3170,7 +3293,9 @@ class PipelineDecorator(PipelineController):
pipeline_execution_queue='services', # type: Optional[str]
multi_instance_support=False, # type: bool
add_run_number=True, # type: bool
args_map=None # type: dict[str, List[str]]
args_map=None, # type: dict[str, List[str]]
relaunch_on_instance_failure=False, # type: bool
start_controller_locally=False # type: bool
):
# type: (...) -> Callable
"""
@ -3217,6 +3342,13 @@ class PipelineDecorator(PipelineController):
- paramB: sectionB/paramB
- paramC: sectionB/paramC
- paramD: Args/paramD
:param relaunch_on_instance_failure: If True, check if the machine a pipeline step ran on
was terminated. In case it was, the step will be relaunched. As of now, only AWS instances are supported.
Default: False
:param start_controller_locally: If True, start the controller on the local machine. The steps will run
remotely if `PipelineDecorator.run_locally` or `PipelineDecorator.debug_pipeline` are not called.
Default: False
"""
def decorator_wrap(func):
@ -3253,6 +3385,7 @@ class PipelineDecorator(PipelineController):
target_project=target_project,
abort_on_failure=abort_on_failure,
add_run_number=add_run_number,
relaunch_on_instance_failure=relaunch_on_instance_failure
)
ret_val = func(**pipeline_kwargs)
LazyEvalWrapper.trigger_all_remote_references()
@ -3294,6 +3427,7 @@ class PipelineDecorator(PipelineController):
target_project=target_project,
abort_on_failure=abort_on_failure,
add_run_number=add_run_number,
relaunch_on_instance_failure=relaunch_on_instance_failure
)
a_pipeline._args_map = args_map or {}
@ -3318,7 +3452,6 @@ class PipelineDecorator(PipelineController):
a_pipeline._task._set_runtime_properties(
dict(multi_pipeline_counter=str(cls._multi_pipeline_call_counter)))
# serialize / deserialize state only if we are running locally
a_pipeline._start(wait=False)
# sync arguments back (post deserialization and casting back)
@ -3327,7 +3460,7 @@ class PipelineDecorator(PipelineController):
pipeline_kwargs[k] = a_pipeline.get_parameters()[k]
# run the actual pipeline
if not PipelineDecorator._debug_execute_step_process and pipeline_execution_queue:
if not start_controller_locally and not PipelineDecorator._debug_execute_step_process and pipeline_execution_queue:
# rerun the pipeline on a remote machine
a_pipeline._task.execute_remotely(queue_name=pipeline_execution_queue)
# when we get here it means we are running remotely

View File

@ -202,11 +202,8 @@ class BaseJob(object):
return self._worker
if self._worker is None:
# the last console outputs will update the worker
self.get_console_output(number_of_reports=1)
# if we still do not have it, store empty string
if not self._worker:
self._worker = ''
self.task.reload()
self._worker = self.task.last_worker
return self._worker
@ -780,10 +777,6 @@ class _JobStub(object):
# type: () -> str
return 'stub'
def worker(self):
# type: () -> ()
return None
def status(self):
# type: () -> str
return 'in_progress'

View File

@ -1058,6 +1058,10 @@ class Task(_Task):
# type: () -> str
return self.storage_uri
@property
def last_worker(self):
return self._data.last_worker
@output_uri.setter
def output_uri(self, value):
# type: (Union[str, bool]) -> None