Fix setting tasks that someone just marked as aborted to started - only force Task to started after dequeuing it otherwise lease it as is

This commit is contained in:
allegroai 2024-07-24 17:39:26 +03:00
parent 8f41002845
commit ebc5944b44

View File

@ -727,6 +727,7 @@ class Worker(ServiceCommandSection):
self.is_venv_update = self._session.config.agent.venv_update.enabled self.is_venv_update = self._session.config.agent.venv_update.enabled
self.poetry = PoetryConfig(self._session) self.poetry = PoetryConfig(self._session)
self.docker_image_func = None self.docker_image_func = None
self._patch_docker_cmd_func = None
self._docker_image = None self._docker_image = None
self._docker_arguments = None self._docker_arguments = None
PackageManager.set_pip_version(self._session.config.get("agent.package_manager.pip_version", None)) PackageManager.set_pip_version(self._session.config.get("agent.package_manager.pip_version", None))
@ -740,7 +741,7 @@ class Worker(ServiceCommandSection):
self._services_mode = None self._services_mode = None
self._impersonate_as_task_owner = None self._impersonate_as_task_owner = None
self._worker_tags = None self._worker_tags = None
self._dynamic_gpus = None self._dynamic_gpus = None # valid options, True/False, "fractional"
self._force_current_version = None self._force_current_version = None
self._redirected_stdout_file_no = None self._redirected_stdout_file_no = None
self._uptime_config = self._session.config.get("agent.uptime", None) self._uptime_config = self._session.config.get("agent.uptime", None)
@ -1050,6 +1051,10 @@ class Worker(ServiceCommandSection):
session=task_session, session=task_session,
) )
# patch the full docker cmd if needed, notice this is done post reporting
if self._patch_docker_cmd_func:
full_docker_cmd = self._patch_docker_cmd_func(full_docker_cmd)
cmd = Argv(*full_docker_cmd, display_argv=display_docker_command) cmd = Argv(*full_docker_cmd, display_argv=display_docker_command)
print('Running Docker:\n{}\n'.format(str(cmd))) print('Running Docker:\n{}\n'.format(str(cmd)))
@ -1195,14 +1200,15 @@ class Worker(ServiceCommandSection):
# get current running instances # get current running instances
available_gpus = None available_gpus = None
allocated_gpus = {}
dynamic_gpus_worker_id = None dynamic_gpus_worker_id = None
if gpu_indexes and gpu_queues: if gpu_indexes and gpu_queues:
available_gpus, gpu_queues = self._setup_dynamic_gpus(gpu_queues) available_gpus, gpu_queues = self._setup_dynamic_gpus(gpu_queues, gpu_indexes)
# multi instance support # multi instance support
self._services_mode = True self._services_mode = True
# last 64 tasks # last 64 tasks
list_task_gpus_ids = {} dict_task_gpus_ids = {} # {str(gpu_indexes): task_id}
try: try:
while True: while True:
queue_tags = None queue_tags = None
@ -1226,7 +1232,7 @@ class Worker(ServiceCommandSection):
# update available gpus # update available gpus
if gpu_queues: if gpu_queues:
available_gpus = self._dynamic_gpu_get_available(gpu_indexes) available_gpus, allocated_gpus = self._dynamic_gpu_get_available(gpu_indexes)
# if something went wrong, or we have no free gpus # if something went wrong, or we have no free gpus
# start over from the highest priority queue # start over from the highest priority queue
if not available_gpus: if not available_gpus:
@ -1255,11 +1261,21 @@ class Worker(ServiceCommandSection):
if not len(response.queue.entries): if not len(response.queue.entries):
continue continue
# check if we do not have enough available gpus # check if we do not have enough available gpus
if gpu_queues[queue][0] > len(available_gpus): # Notice that gpu_queues[queue] is (min_gpu, max_gpu) that we should allocate
# for a Task pulled from that queue, this means
# gpu_queues[queue][0] is the minimum number of GPUs we need
# and min_available_fract_gpu is the maximum number of fraction of GPU
# and max_available_gpus is the maximum number of full GPUs we have
min_available_fract_gpu = max([v for v in available_gpus.values()])
max_available_gpus = sum([v for v in available_gpus.values() if v >= 1])
if gpu_queues[queue][0] > max(float(max_available_gpus), min_available_fract_gpu):
# not enough available_gpus, we should sleep and start over # not enough available_gpus, we should sleep and start over
if self._daemon_foreground or worker_params.debug: if self._daemon_foreground or worker_params.debug:
print("Not enough free GPUs {}/{}, sleeping for {:.1f} seconds".format( print("Not enough free GPUs {}/{}, sleeping for {:.1f} seconds".format(
len(available_gpus), gpu_queues[queue][0], self._polling_interval)) max(float(max_available_gpus), min_available_fract_gpu),
gpu_queues[queue][0],
self._polling_interval)
)
sleep(self._polling_interval) sleep(self._polling_interval)
break break
@ -1317,15 +1333,83 @@ class Worker(ServiceCommandSection):
dynamic_gpus_worker_id = self.worker_id dynamic_gpus_worker_id = self.worker_id
# the following is only executed in dynamic gpus mode # the following is only executed in dynamic gpus mode
if gpu_queues and gpu_queues.get(queue): if gpu_queues and gpu_queues.get(queue):
gpus = []
fractions = []
# pick the first available GPUs # pick the first available GPUs
# gpu_queues[queue] = (min_gpus, max_gpus) # gpu_queues[queue] = (min_gpus, max_gpus)
# first check if the max_gpus is larger equal to 1 (if so, allocate full GPUs)
if float(gpu_queues.get(queue)[1]) >= 1:
gpus = [g for g, v in available_gpus.items() if v >= 1]
if gpus:
# get as many gpus as possible with max_gpus as limit, the min is covered before # get as many gpus as possible with max_gpus as limit, the min is covered before
gpus = available_gpus[:gpu_queues.get(queue)[1]] gpus = gpus[:int(gpu_queues.get(queue)[1])]
available_gpus = available_gpus[gpu_queues.get(queue)[1]:] fractions = [1] * len(gpus)
# update available gpus
available_gpus = {g: v for g, v in available_gpus.items() if g not in gpus}
else:
# we assume the minimum was < 1 GPU, otherwise why are we here
pass
# if this is under 1 GPU
if not gpus:
# find the GPU with availability that covers the minimum
_max_req_gpu = min(float(gpu_queues.get(queue)[1]), 1.)
_min_req_gpu = float(gpu_queues.get(queue)[0])
_potential_gpus = {
g: (v - float(_max_req_gpu)) for g, v in available_gpus.items()
if v >= float(_min_req_gpu)}
# sort based on the least available that can fit the maximum
# find the first instance that is positive or 0
_potential_gpus = sorted(_potential_gpus.items(), key=lambda a: a[1])
gpus = [(g, v) for g, v in _potential_gpus if v >= 0]
if gpus:
available_gpus[gpus[0][0]] -= _max_req_gpu
gpus = [gpus[0][0]]
fractions = [_max_req_gpu]
else:
gpus = [_potential_gpus[-1][0]]
# this is where we need to decide on the actual granularity
# now it is hardcoded to 1/8th
_base_fract = 8
avail_fract = int(float(available_gpus[_potential_gpus[-1][0]]) * _base_fract)
fractions = [avail_fract/float(avail_fract)]
available_gpus[_potential_gpus[-1][0]] -= fractions[0]
try:
from clearml_agent_fractional_gpu import patch_docker_cmd_gpu_fraction # noqa
# new docker image func
self._patch_docker_cmd_func = lambda docker_cmd: (
patch_docker_cmd_gpu_fraction(docker_cmd, gpu_fraction=fractions[0]))
except Exception:
print("Error! could not load clearml_agent_fractional_gpu module! "
"failed configuring fractional GPU support")
raise
self.set_runtime_properties( self.set_runtime_properties(
key='available_gpus', value=','.join(str(g) for g in available_gpus)) key='available_gpus',
value=','.join("{}_{}".format(g, str(f)[2:]) for g, f in available_gpus.items()))
# this is where we set the fractions as well as gpus
Session.set_nvidia_visible_env(gpus) Session.set_nvidia_visible_env(gpus)
list_task_gpus_ids.update({str(g): task_id for g in gpus})
if fractions and min(fractions) < 1:
# we assume a single gpu in the list
gpu_idx_fract = ["{}.{}".format(g, str(f)[2:]) for g, f in zip(gpus, fractions)]
# check a new available unique name (id) for us
from string import ascii_lowercase
for x in ascii_lowercase:
if gpu_idx_fract[0]+x not in allocated_gpus:
gpu_idx_fract[0] = gpu_idx_fract[0]+x
break
# add the new task
allocated_gpus[gpu_idx_fract[0]] = fractions[0]
dict_task_gpus_ids.update({str(g): task_id for g in gpu_idx_fract})
self.worker_id = ':'.join(
self.worker_id.split(':')[:-1] + ['gpu'+','.join(str(g) for g in gpu_idx_fract)])
else:
# update the task list
dict_task_gpus_ids.update({str(g): task_id for g in gpus})
self.worker_id = ':'.join( self.worker_id = ':'.join(
self.worker_id.split(':')[:-1] + ['gpu'+','.join(str(g) for g in gpus)]) self.worker_id.split(':')[:-1] + ['gpu'+','.join(str(g) for g in gpus)])
@ -1338,10 +1422,14 @@ class Worker(ServiceCommandSection):
self.run_one_task(queue, task_id, worker_params, task_session=task_session) self.run_one_task(queue, task_id, worker_params, task_session=task_session)
# restore back worker_id / GPUs
if gpu_queues: if gpu_queues:
self.worker_id = dynamic_gpus_worker_id self.worker_id = dynamic_gpus_worker_id
Session.set_nvidia_visible_env(org_gpus) Session.set_nvidia_visible_env(org_gpus)
# clear docker patching function (if exists
self._patch_docker_cmd_func = None
self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues)) self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues))
queue_tags = None queue_tags = None
@ -1362,7 +1450,7 @@ class Worker(ServiceCommandSection):
finally: finally:
# if we are in dynamic gpus mode, shutdown all active runs # if we are in dynamic gpus mode, shutdown all active runs
if self.docker_image_func: if self.docker_image_func:
for t_id in set(list_task_gpus_ids.values()): for t_id in set(dict_task_gpus_ids.values()):
if shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(t_id)): if shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(t_id)):
self.handle_task_termination(task_id=t_id, exit_code=0, stop_reason=TaskStopReason.stopped) self.handle_task_termination(task_id=t_id, exit_code=0, stop_reason=TaskStopReason.stopped)
else: else:
@ -1380,8 +1468,8 @@ class Worker(ServiceCommandSection):
self._unregister() self._unregister()
def _dynamic_gpu_get_available(self, gpu_indexes): def _dynamic_gpu_get_available(self, gpu_indexes):
# cast to string # key: cast to string, value: 1 (i.e. gull GPU)
gpu_indexes = [str(g) for g in gpu_indexes] gpu_indexes = {str(g): 1 for g in gpu_indexes}
# noinspection PyBroadException # noinspection PyBroadException
try: try:
response = self._session.send_api(workers_api.GetAllRequest(last_seen=600)) response = self._session.send_api(workers_api.GetAllRequest(last_seen=600))
@ -1392,29 +1480,50 @@ class Worker(ServiceCommandSection):
our_workers = [ our_workers = [
w.id for w in response.workers w.id for w in response.workers
if w.id.startswith(worker_name) and w.id != self.worker_id] if w.id.startswith(worker_name) and w.id != self.worker_id]
gpus = [] gpus = {}
allocated_gpus = {}
gpu_pattern = re.compile(r"\d+[.]?\d*[a-z]?")
fract_gpu_pattern = re.compile(r"\d+[.]?\d*[a-z]+")
for w in our_workers: for w in our_workers:
for g in w.split(':')[-1].lower().replace('gpu', '').split(','): for g in w.split(':')[-1].lower().replace('gpu', '').split(','):
try: try:
# verify "int.int" # verify pattern "int.int" or "int.int[a-z]"
gpus += [str(g).strip()] if float(g.strip()) >= 0 else [] gpu_idx_name = g.strip()
if gpu_pattern.fullmatch(gpu_idx_name):
# check if this is a fraction
if fract_gpu_pattern.fullmatch(gpu_idx_name):
gpu_idx = gpu_idx_name.split(".")[0]
gpu_fract = float("0.{}".format(gpu_idx_name.split(".")[-1][:-1]))
# the entire gpu
gpus[gpu_idx] = gpus.get(gpu_idx, 0) + gpu_fract
# the gpu fraction uid eg 0.25a
allocated_gpus[gpu_idx_name] = gpu_fract
else:
# or a static MIG slice
gpus[gpu_idx_name] = 1
allocated_gpus[gpu_idx_name] = 1
else:
print("INFO: failed parsing fractional GPU '{}' - skipping".format(g))
except (ValueError, TypeError): except (ValueError, TypeError):
print("INFO: failed parsing GPU int('{}') - skipping".format(g)) print("INFO: failed parsing GPU int('{}') - skipping".format(g))
available_gpus = list(set(gpu_indexes) - set(gpus))
return available_gpus # remove the GPUs we have workers running on
available_gpus = {g: (v - gpus.get(g, 0)) for g, v in gpu_indexes.items() if (v - gpus.get(g, 0)) > 0}
return available_gpus, allocated_gpus
def _setup_dynamic_gpus(self, gpu_queues): def _setup_dynamic_gpus(self, gpu_queues, gpu_indexes):
available_gpus = self.get_runtime_properties() available_gpus = self.get_runtime_properties()
if available_gpus is None: if available_gpus is None:
raise ValueError("Dynamic GPU allocation is not supported by your ClearML-server") raise ValueError("Dynamic GPU allocation is not supported by your ClearML-server")
available_gpus = [prop["value"] for prop in available_gpus if prop["key"] == 'available_gpus'] available_gpus = [prop["value"] for prop in available_gpus if prop["key"] == 'available_gpus']
if available_gpus: if available_gpus:
gpus = [] gpus = {}
for g in available_gpus[-1].split(','): for g_v in available_gpus[-1].split(','):
g, v = g_v.split("_")
try: try:
# verify "int.int" # verify "int.int_float"
gpus += [str(g).strip()] if float(g.strip()) >= 0 else [] if float(g.strip()) >= 0:
gpus[g.strip()] = float("0."+v)
except (ValueError, TypeError): except (ValueError, TypeError):
print("INFO: failed parsing GPU int('{}') - skipping".format(g)) print("INFO: failed parsing GPU int('{}') - skipping".format(g))
available_gpus = gpus available_gpus = gpus
@ -1423,10 +1532,11 @@ class Worker(ServiceCommandSection):
gpu_queues = dict(gpu_queues) gpu_queues = dict(gpu_queues)
if not self.set_runtime_properties( if not self.set_runtime_properties(
key='available_gpus', value=','.join(str(g) for g in available_gpus)): key='available_gpus', value=','.join("{}_{}".format(g, str(v)[2:]) for g, v in available_gpus.items())):
raise ValueError("Dynamic GPU allocation is not supported by your ClearML-server") raise ValueError("Dynamic GPU allocation is not supported by your ClearML-server")
self.cluster_report_monitor(available_gpus=available_gpus, gpu_queues=gpu_queues) # because it sets the MAX not the actual available (i.e. free) GPUS
self.cluster_report_monitor(available_gpus=gpu_indexes, gpu_queues=gpu_queues)
return available_gpus, gpu_queues return available_gpus, gpu_queues
@ -1606,6 +1716,10 @@ class Worker(ServiceCommandSection):
if self._services_mode and dynamic_gpus: if self._services_mode and dynamic_gpus:
raise ValueError("Combining --dynamic-gpus and --services-mode is not supported") raise ValueError("Combining --dynamic-gpus and --services-mode is not supported")
if self._dynamic_gpus == "fractional" and not docker:
raise ValueError("Fractional GPUs are only supported in docker-mode, "
"add --docker to allow docker-mode operation")
# We are not running a daemon we are killing one. # We are not running a daemon we are killing one.
# find the pid send termination signal and leave # find the pid send termination signal and leave
if kwargs.get('stop', False) is not False: if kwargs.get('stop', False) is not False:
@ -1774,6 +1888,7 @@ class Worker(ServiceCommandSection):
if not dynamic_gpus: if not dynamic_gpus:
return None, None, queues return None, None, queues
has_fractional = False
queue_names = [q.name for q in queues] queue_names = [q.name for q in queues]
if not all('=' in q for q in queue_names): if not all('=' in q for q in queue_names):
raise ValueError("using --dynamic-gpus, --queue [{}], " raise ValueError("using --dynamic-gpus, --queue [{}], "
@ -1802,9 +1917,12 @@ class Worker(ServiceCommandSection):
for s in queue_names: for s in queue_names:
s_p = s.split('=') s_p = s.split('=')
name = s[:-1 - len(s_p[-1])] name = s[:-1 - len(s_p[-1])]
min_max_g = int(s_p[-1].split('-')[0] or 1), int(s_p[-1].split('-')[-1]) min_max_g = float(s_p[-1].split('-')[0] or 1), float(s_p[-1].split('-')[-1])
if min(min_max_g) <= 0: if min(min_max_g) <= 0:
raise ValueError("Parsing min/max number of gpus <= 0 is not allowed: \"{}\"".format(s)) raise ValueError("Parsing min/max number of gpus <= 0 is not allowed: \"{}\"".format(s))
if any(g for g in min_max_g if 1 < g != int(g)):
raise ValueError("Parsing min/max number of gpus, fractional gpu cannot be > 1: \"{}\"".format(s))
has_fractional = min(min_max_g) < 1
dynamic_gpus.append((name, min_max_g,)) dynamic_gpus.append((name, min_max_g,))
queue_names = [q for q, _ in dynamic_gpus] queue_names = [q for q, _ in dynamic_gpus]
# resolve queue ids # resolve queue ids
@ -1814,15 +1932,16 @@ class Worker(ServiceCommandSection):
# maintain original priority order # maintain original priority order
queues = [q for q, _ in dynamic_gpus] queues = [q for q, _ in dynamic_gpus]
self._dynamic_gpus = True self._dynamic_gpus = "fractional" if has_fractional else True
return dynamic_gpus, gpu_indexes, queues return dynamic_gpus, gpu_indexes, queues
def _register_dynamic_gpus(self, gpu_indexes): def _register_dynamic_gpus(self, gpu_indexes):
# test server support # test server support
available_gpus = self._dynamic_gpu_get_available(gpu_indexes) available_gpus, allocated_gpus = self._dynamic_gpu_get_available(gpu_indexes)
if not self.set_runtime_properties( if not self.set_runtime_properties(
key='available_gpus', value=','.join(str(g) for g in available_gpus)): key='available_gpus',
value=','.join("{}_{}".format(g, str(v)[2:]) for g, v in available_gpus.items())):
raise ValueError("Dynamic GPU allocation is not supported by your ClearML-server") raise ValueError("Dynamic GPU allocation is not supported by your ClearML-server")
def report_monitor(self, report): def report_monitor(self, report):
@ -2484,6 +2603,8 @@ class Worker(ServiceCommandSection):
raise ValueError( raise ValueError(
"Execution required enqueued task, but task id={} is not queued.".format(current_task.id) "Execution required enqueued task, but task id={} is not queued.".format(current_task.id)
) )
# only force set started if we actually dequeued it (which would have changed the state)
if res.ok and res.json().get("data", {}).get("updated", 0):
# Set task status to started to prevent any external monitoring from killing it # Set task status to started to prevent any external monitoring from killing it
self._session.api_client.tasks.started( self._session.api_client.tasks.started(
task=current_task.id, task=current_task.id,