Fix service-mode support for venvs

Fix --services-mode with venvs
This commit is contained in:
allegroai 2021-02-14 13:45:17 +02:00
parent 407deb84e9
commit ca242424ab
4 changed files with 64 additions and 30 deletions

View File

@ -412,6 +412,7 @@ class Worker(ServiceCommandSection):
self._daemon_foreground = None self._daemon_foreground = None
self._standalone_mode = None self._standalone_mode = None
self._services_mode = None self._services_mode = None
self._dynamic_gpus = None
self._force_current_version = None self._force_current_version = None
self._redirected_stdout_file_no = None self._redirected_stdout_file_no = None
self._uptime_config = self._session.config.get("agent.uptime", None) self._uptime_config = self._session.config.get("agent.uptime", None)
@ -499,6 +500,9 @@ class Worker(ServiceCommandSection):
) )
docker_image = None docker_image = None
worker_id = '{}:service:{}'.format(self.worker_id, task_id) \
if self._services_mode and not self._dynamic_gpus else self.worker_id
if self.docker_image_func: if self.docker_image_func:
try: try:
response = get_task(self._session, task_id, only_fields=["execution.docker_cmd"]) response = get_task(self._session, task_id, only_fields=["execution.docker_cmd"])
@ -526,7 +530,7 @@ class Worker(ServiceCommandSection):
if self._services_mode: if self._services_mode:
# if this is services mode, give the docker a unique worker id, as it will register itself. # if this is services mode, give the docker a unique worker id, as it will register itself.
full_docker_cmd = self.docker_image_func( full_docker_cmd = self.docker_image_func(
worker_id='{}:service:{}'.format(self.worker_id, task_id), worker_id=worker_id,
docker_image=docker_image, docker_arguments=docker_arguments) docker_image=docker_image, docker_arguments=docker_arguments)
else: else:
full_docker_cmd = self.docker_image_func(docker_image=docker_image, docker_arguments=docker_arguments) full_docker_cmd = self.docker_image_func(docker_image=docker_image, docker_arguments=docker_arguments)
@ -568,7 +572,7 @@ class Worker(ServiceCommandSection):
status = ExitStatus.failure status = ExitStatus.failure
try: try:
# set WORKER ID # set WORKER ID
ENV_WORKER_ID.set(self.worker_id) ENV_WORKER_ID.set(worker_id)
if self._docker_force_pull and docker_image: if self._docker_force_pull and docker_image:
full_pull_cmd = ['docker', 'pull', docker_image] full_pull_cmd = ['docker', 'pull', docker_image]
@ -958,16 +962,12 @@ class Worker(ServiceCommandSection):
self._session.print_configuration() self._session.print_configuration()
def daemon(self, queues, log_level, foreground=False, docker=False, detached=False, order_fairness=False, **kwargs): def daemon(self, queues, log_level, foreground=False, docker=False, detached=False, order_fairness=False, **kwargs):
# if we do not need to create queues, make sure they are valid
# match previous behaviour when we validated queue names before everything else
queues = self._resolve_queue_names(queues, create_if_missing=kwargs.get('create_queue', False))
self._standalone_mode = kwargs.get('standalone_mode', False) self._standalone_mode = kwargs.get('standalone_mode', False)
self._services_mode = kwargs.get('services_mode', False) self._services_mode = kwargs.get('services_mode', False)
# must have docker in services_mode # must have docker in services_mode
if self._services_mode: if self._services_mode:
kwargs = self._verify_command_states(kwargs) kwargs = self._verify_command_states(kwargs)
docker = docker or kwargs.get('docker')
self._uptime_config = kwargs.get('uptime', None) or self._uptime_config self._uptime_config = kwargs.get('uptime', None) or self._uptime_config
self._downtime_config = kwargs.get('downtime', None) or self._downtime_config self._downtime_config = kwargs.get('downtime', None) or self._downtime_config
if self._uptime_config and self._downtime_config: if self._uptime_config and self._downtime_config:
@ -980,6 +980,13 @@ class Worker(ServiceCommandSection):
# support --dynamic-gpus # support --dynamic-gpus
dynamic_gpus, gpu_indexes, queues = self._parse_dynamic_gpus(kwargs, queues) dynamic_gpus, gpu_indexes, queues = self._parse_dynamic_gpus(kwargs, queues)
if self._services_mode and dynamic_gpus:
raise ValueError("Combining --dynamic-gpus and --services-mode is not supported")
# if we do not need to create queues, make sure they are valid
# match previous behaviour when we validated queue names before everything else
queues = self._resolve_queue_names(queues, create_if_missing=kwargs.get('create_queue', False))
# We are not running a daemon we are killing one. # We are not running a daemon we are killing one.
# find the pid send termination signal and leave # find the pid send termination signal and leave
if kwargs.get('stop', False): if kwargs.get('stop', False):
@ -1002,7 +1009,7 @@ class Worker(ServiceCommandSection):
# make sure we only have a single instance, # make sure we only have a single instance,
# also make sure we set worker_id properly and cache folders # also make sure we set worker_id properly and cache folders
self._singleton() self._singleton(dynamic=bool(dynamic_gpus))
# check if we have the latest version # check if we have the latest version
start_check_update_daemon() start_check_update_daemon()
@ -1123,6 +1130,11 @@ class Worker(ServiceCommandSection):
if not dynamic_gpus: if not dynamic_gpus:
return None, None, queues return None, None, queues
queue_names = [q.name for q in queues]
if not all('=' in q for q in queue_names):
raise ValueError("using --dynamic-gpus, --queues [{}], "
"queue must be in format <queue_name>=<num_gpus>".format(queue_names))
gpu_indexes = kwargs.get('gpus') gpu_indexes = kwargs.get('gpus')
# test gpus were passed correctly # test gpus were passed correctly
@ -1137,17 +1149,16 @@ class Worker(ServiceCommandSection):
except Exception: except Exception:
raise ValueError('Failed parsing --gpus {}'.format(kwargs.get('gpus'))) raise ValueError('Failed parsing --gpus {}'.format(kwargs.get('gpus')))
dynamic_gpus = [(s[:-1 - len(s.split('=')[-1])], int(s.split('=')[-1])) for s in dynamic_gpus] dynamic_gpus = [(s[:-1 - len(s.split('=')[-1])], int(s.split('=')[-1])) for s in queue_names]
queue_names = [q for q, _ in dynamic_gpus]
# resolve queue ids # resolve queue ids
dynamic_gpus_q = self._resolve_queue_names([q for q, _ in dynamic_gpus], create_if_missing=False) dynamic_gpus_q = self._resolve_queue_names(
queue_names, create_if_missing=kwargs.get('create_queue', False))
dynamic_gpus = list(zip(dynamic_gpus_q, [i for _, i in dynamic_gpus])) dynamic_gpus = list(zip(dynamic_gpus_q, [i for _, i in dynamic_gpus]))
if set(dynamic_gpus_q) != set(queues):
raise ValueError(
"--dynamic-gpus [{}] and --queues [{}] must contain the same queues".format(
dynamic_gpus, queues))
dynamic_gpus = sorted( dynamic_gpus = sorted(
dynamic_gpus, reverse=True, key=cmp_to_key( dynamic_gpus, reverse=True, key=cmp_to_key(
lambda x, y: -1 if x[1] < y[1] or x[1] == y[1] and queues.index(x[0]) > queues.index(y[0]) lambda x, y: -1 if x[1] < y[1] or x[1] == y[1] and
dynamic_gpus_q.index(x[0]) > dynamic_gpus_q.index(y[0])
else +1)) else +1))
# order queue priority based on the combination we have # order queue priority based on the combination we have
queues = [q for q, _ in dynamic_gpus] queues = [q for q, _ in dynamic_gpus]
@ -1158,6 +1169,8 @@ class Worker(ServiceCommandSection):
key='available_gpus', value=','.join(str(g) for g in available_gpus)): key='available_gpus', value=','.join(str(g) for g in available_gpus)):
raise ValueError("Dynamic GPU allocation is not supported by the ClearML-server") raise ValueError("Dynamic GPU allocation is not supported by the ClearML-server")
self._dynamic_gpus = True
return dynamic_gpus, gpu_indexes, queues return dynamic_gpus, gpu_indexes, queues
def report_monitor(self, report): def report_monitor(self, report):
@ -1631,10 +1644,16 @@ class Worker(ServiceCommandSection):
# We expect the same behaviour in case full_monitoring was set, and in case docker mode is used # We expect the same behaviour in case full_monitoring was set, and in case docker mode is used
if full_monitoring or docker is not False: if full_monitoring or docker is not False:
if full_monitoring and not (ENV_WORKER_ID.get() or '').strip(): if full_monitoring:
self._session.config["agent"]["worker_id"] = '' if not (ENV_WORKER_ID.get() or '').strip():
self._session.config["agent"]["worker_id"] = ''
# make sure we support multiple instances if we need to # make sure we support multiple instances if we need to
self._singleton() self._singleton()
self.temp_config_path = self.temp_config_path or safe_mkstemp(
suffix=".cfg", prefix=".clearml_agent.", text=True, name_only=True
)
self.dump_config(self.temp_config_path)
self._session._config_file = self.temp_config_path
worker_params = WorkerParams( worker_params = WorkerParams(
log_level=log_level, log_level=log_level,
@ -1647,6 +1666,10 @@ class Worker(ServiceCommandSection):
self.stop_monitor() self.stop_monitor()
self._unregister() self._unregister()
if full_monitoring and self.temp_config_path:
safe_remove_file(self._session.config_file)
Singleton.close_pid_file()
return return
self._session.print_configuration() self._session.print_configuration()
@ -2852,9 +2875,9 @@ class Worker(ServiceCommandSection):
worker_name, worker_id)) worker_name, worker_id))
return False return False
def _singleton(self): def _singleton(self, dynamic=False):
# ensure singleton # ensure singleton
worker_id, worker_name = self._generate_worker_id_name() worker_id, worker_name = self._generate_worker_id_name(dynamic=dynamic)
# if we are running in services mode, we allow double register since # if we are running in services mode, we allow double register since
# docker-compose will kill instances before they cleanup # docker-compose will kill instances before they cleanup
@ -2869,13 +2892,14 @@ class Worker(ServiceCommandSection):
# update folders based on free slot # update folders based on free slot
self._session.create_cache_folders(slot_index=worker_slot) self._session.create_cache_folders(slot_index=worker_slot)
def _generate_worker_id_name(self): def _generate_worker_id_name(self, dynamic=False):
worker_id = self._session.config["agent.worker_id"] worker_id = self._session.config["agent.worker_id"]
worker_name = self._session.config["agent.worker_name"] worker_name = self._session.config["agent.worker_name"]
if not worker_id and os.environ.get('NVIDIA_VISIBLE_DEVICES') is not None: if not worker_id and os.environ.get('NVIDIA_VISIBLE_DEVICES') is not None:
nvidia_visible_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES') nvidia_visible_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES')
if nvidia_visible_devices and nvidia_visible_devices.lower() != 'none': if nvidia_visible_devices and nvidia_visible_devices.lower() != 'none':
worker_id = '{}:gpu{}'.format(worker_name, nvidia_visible_devices) worker_id = '{}:{}gpu{}'.format(
worker_name, 'd' if dynamic else '', nvidia_visible_devices)
elif nvidia_visible_devices == '': elif nvidia_visible_devices == '':
pass pass
else: else:

View File

@ -135,7 +135,8 @@ class FolderCache(object):
# if we do not have enough free space, do nothing. # if we do not have enough free space, do nothing.
if not self._check_min_free_space(): if not self._check_min_free_space():
warning('Could not add cache entry, not enough free space on drive, ' warning('Could not add cache entry, not enough free space on drive, '
'free space threshold {} GB'.format(self._min_free_space_gb)) 'free space threshold {} GB. Clearing all cache entries!'.format(self._min_free_space_gb))
self._remove_old_entries(max_cache_entries=0)
return False return False
# create the new entry for us # create the new entry for us
@ -175,10 +176,11 @@ class FolderCache(object):
""" """
return self._last_copied_entry_folder return self._last_copied_entry_folder
def _remove_old_entries(self): def _remove_old_entries(self, max_cache_entries=None):
# type: () -> () # type: (Optional[int]) -> ()
""" """
Notice we only keep self._max_cache_entries-1, assuming we will be adding a new entry soon Notice we only keep self._max_cache_entries-1, assuming we will be adding a new entry soon
:param int max_cache_entries: if not None use instead of self._max_cache_entries
""" """
folder_entries = [(cache_folder, cache_folder.stat().st_mtime) folder_entries = [(cache_folder, cache_folder.stat().st_mtime)
for cache_folder in self._cache_folder.glob('*') for cache_folder in self._cache_folder.glob('*')
@ -193,7 +195,8 @@ class FolderCache(object):
warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex)) warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex))
return return
number_of_entries_to_keep = self._max_cache_entries-1 number_of_entries_to_keep = self._max_cache_entries - 1 \
if max_cache_entries is None else max(0, int(max_cache_entries))
for folder, ts in folder_entries[number_of_entries_to_keep:]: for folder, ts in folder_entries[number_of_entries_to_keep:]:
try: try:
shutil.rmtree(folder.as_posix(), ignore_errors=True) shutil.rmtree(folder.as_posix(), ignore_errors=True)

View File

@ -7,7 +7,7 @@ from tempfile import gettempdir, NamedTemporaryFile
from typing import List, Tuple, Optional from typing import List, Tuple, Optional
from clearml_agent.definitions import ENV_DOCKER_HOST_MOUNT from clearml_agent.definitions import ENV_DOCKER_HOST_MOUNT
from clearml_agent.helper.base import warning, is_windows_platform from clearml_agent.helper.base import warning, is_windows_platform, safe_remove_file
class Singleton(object): class Singleton(object):
@ -22,6 +22,13 @@ class Singleton(object):
_lock_timeout = 10 _lock_timeout = 10
_pid = None _pid = None
@classmethod
def close_pid_file(cls):
if cls._pid_file:
cls._pid_file.close()
safe_remove_file(cls._pid_file.name)
cls._pid_file = None
@classmethod @classmethod
def update_pid_file(cls): def update_pid_file(cls):
new_pid = str(os.getpid()) new_pid = str(os.getpid())
@ -115,7 +122,7 @@ class Singleton(object):
@classmethod @classmethod
def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False): def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False):
if cls.worker_id: if cls.worker_id and cls.instance_slot is not None:
return cls.worker_id, cls.instance_slot return cls.worker_id, cls.instance_slot
# make sure we have a unique name # make sure we have a unique name
instance_num = 0 instance_num = 0

View File

@ -97,10 +97,10 @@ DAEMON_ARGS = dict({
'action': 'store_true', 'action': 'store_true',
}, },
'--dynamic-gpus': { '--dynamic-gpus': {
'help': 'Allow to dynamically allocate gpus based on queue properties, pass \'<queue_name>=<num_gpus>\'.' 'help': 'Allow to dynamically allocate gpus based on queue properties, '
' Example: \'dual_gpus=2 single_gpu=1\'', 'configure with \'--queues <queue_name>=<num_gpus>\'.'
'nargs': '*', ' Example: \'--dynamic-gpus --queue dual_gpus=2 single_gpu=1\'',
'default': None, 'action': 'store_true',
}, },
'--uptime': { '--uptime': {
'help': 'Specify uptime for clearml-agent in "<hours> <days>" format. for example, use "17-20 TUE" to set ' 'help': 'Specify uptime for clearml-agent in "<hours> <days>" format. for example, use "17-20 TUE" to set '