Add support for MIG devices, use 0:1 for GPU 0 slice 1 (or use 0.1)

This commit is contained in:
allegroai 2022-08-01 18:58:42 +03:00
parent 48a145a8bd
commit ec9d027678
3 changed files with 52 additions and 25 deletions

View File

@ -555,6 +555,7 @@ class Worker(ServiceCommandSection):
self.worker_id = self._session.config["agent.worker_id"] or "{}:{}".format(
self._session.config["agent.worker_name"], os.getpid()
)
self.parent_worker_id = None # maybe add os env for overriding
self._last_stats = defaultdict(lambda: 0)
self._last_report_timestamp = psutil.time.time()
self.temp_config_path = None
@ -943,7 +944,7 @@ class Worker(ServiceCommandSection):
# update available gpus
if gpu_queues:
available_gpus = self._dynamic_gpu_get_available(gpu_indexes)
# if something went wrong or we have no free gpus
# if something went wrong, or we have no free gpus
# start over from the highest priority queue
if not available_gpus:
if self._daemon_foreground or worker_params.debug:
@ -1029,7 +1030,7 @@ class Worker(ServiceCommandSection):
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
org_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES')
org_gpus = Session.get_nvidia_visible_env()
dynamic_gpus_worker_id = self.worker_id
# the following is only executed in dynamic gpus mode
if gpu_queues and gpu_queues.get(queue):
@ -1040,10 +1041,10 @@ class Worker(ServiceCommandSection):
available_gpus = available_gpus[gpu_queues.get(queue)[1]:]
self.set_runtime_properties(
key='available_gpus', value=','.join(str(g) for g in available_gpus))
os.environ['CUDA_VISIBLE_DEVICES'] = \
os.environ['NVIDIA_VISIBLE_DEVICES'] = ','.join(str(g) for g in gpus)
Session.set_nvidia_visible_env(gpus)
list_task_gpus_ids.update({str(g): task_id for g in gpus})
self.worker_id = ':'.join(self.worker_id.split(':')[:-1] + ['gpu'+','.join(str(g) for g in gpus)])
self.worker_id = ':'.join(
self.worker_id.split(':')[:-1] + ['gpu'+','.join(str(g) for g in gpus)])
self.send_logs(
task_id=task_id,
@ -1056,8 +1057,7 @@ class Worker(ServiceCommandSection):
if gpu_queues:
self.worker_id = dynamic_gpus_worker_id
os.environ['CUDA_VISIBLE_DEVICES'] = \
os.environ['NVIDIA_VISIBLE_DEVICES'] = org_gpus
Session.set_nvidia_visible_env(org_gpus)
self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues))
@ -1097,6 +1097,8 @@ class Worker(ServiceCommandSection):
self._unregister()
def _dynamic_gpu_get_available(self, gpu_indexes):
# cast to string
gpu_indexes = [str(g) for g in gpu_indexes]
# noinspection PyBroadException
try:
response = self._session.send_api(workers_api.GetAllRequest(last_seen=600))
@ -1111,7 +1113,8 @@ class Worker(ServiceCommandSection):
for w in our_workers:
for g in w.split(':')[-1].lower().replace('gpu', '').split(','):
try:
gpus += [int(g.strip())]
# verify "int.int"
gpus += [str(g).strip()] if float(g.strip()) >= 0 else []
except (ValueError, TypeError):
print("INFO: failed parsing GPU int('{}') - skipping".format(g))
available_gpus = list(set(gpu_indexes) - set(gpus))
@ -1127,10 +1130,12 @@ class Worker(ServiceCommandSection):
gpus = []
for g in available_gpus[-1].split(','):
try:
gpus += [int(g.strip())]
# verify "int.int"
gpus += [str(g).strip()] if float(g.strip()) >= 0 else []
except (ValueError, TypeError):
print("INFO: failed parsing GPU int('{}') - skipping".format(g))
available_gpus = gpus
if not isinstance(gpu_queues, dict):
gpu_queues = dict(gpu_queues)
@ -1487,12 +1492,14 @@ class Worker(ServiceCommandSection):
if '-' in gpu_indexes:
gpu_indexes = list(range(int(gpu_indexes.split('-')[0]), 1 + int(gpu_indexes.split('-')[1])))
else:
gpu_indexes = [int(g) for g in gpu_indexes.split(',')]
gpu_indexes = [str(g).replace(":", ".").strip() for g in gpu_indexes.split(',')]
# verify (basically numbers with single "." dot)
gpu_indexes = [str(g) for g in gpu_indexes if float(g) >= 0]
except Exception:
raise ValueError(
'Failed parsing --gpus "{}". '
'--dynamic_gpus must be use with '
'specific gpus for example "0-7" or "0,1,2,3"'.format(kwargs.get('gpus')))
'specific gpus for example "0-7" or "0,1,2,3" or "0:0,0:1,1:0,1:1"'.format(kwargs.get('gpus')))
dynamic_gpus = []
for s in queue_names:
@ -3433,7 +3440,7 @@ class Worker(ServiceCommandSection):
docker_cmd = dict(
worker_id=self.worker_id,
parent_worker_id=self.worker_id,
parent_worker_id=self.parent_worker_id or self.worker_id,
# docker_image=docker_image,
# docker_arguments=docker_arguments,
extra_docker_arguments=self._extra_docker_arguments,
@ -3516,10 +3523,10 @@ class Worker(ServiceCommandSection):
base_cmd = [docker, 'run', '-t']
update_scheme = ""
dockers_nvidia_visible_devices = 'all'
gpu_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES', None)
gpu_devices = Session.get_nvidia_visible_env()
if gpu_devices is None or gpu_devices.lower().strip() == 'all':
if ENV_DOCKER_SKIP_GPUS_FLAG.get():
dockers_nvidia_visible_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES') or \
dockers_nvidia_visible_devices = Session.get_nvidia_visible_env() or \
dockers_nvidia_visible_devices
else:
base_cmd += ['--gpus', 'all', ]
@ -3527,7 +3534,8 @@ class Worker(ServiceCommandSection):
if ENV_DOCKER_SKIP_GPUS_FLAG.get():
dockers_nvidia_visible_devices = gpu_devices
else:
base_cmd += ['--gpus', '\"device={}\"'.format(gpu_devices), ]
# replace back "." to ":" MIG support
base_cmd += ['--gpus', '\"device={}\"'.format(gpu_devices.replace(".", ":")), ]
# We are using --gpu, so we should not pass NVIDIA_VISIBLE_DEVICES, I think.
# base_cmd += ['-e', 'NVIDIA_VISIBLE_DEVICES=' + gpu_devices, ]
elif gpu_devices.strip() == 'none':
@ -3844,6 +3852,9 @@ class Worker(ServiceCommandSection):
unique_worker_id=worker_id, worker_name=worker_name, api_client=self._session.api_client,
allow_double=bool(ENV_DOCKER_HOST_MOUNT.get()) # and bool(self._services_mode),
)
# set the parent ID the first time we have a worker ID (it might change for services-mode / dgpus)
if not self.parent_worker_id:
self.parent_worker_id = self.worker_id
if self.worker_id is None:
error('Instance with the same WORKER_ID [{}] is already running'.format(worker_id))
@ -3854,8 +3865,8 @@ class Worker(ServiceCommandSection):
def _generate_worker_id_name(self, dynamic_gpus=False):
worker_id = self._session.config["agent.worker_id"]
worker_name = self._session.config["agent.worker_name"]
if not worker_id and os.environ.get('NVIDIA_VISIBLE_DEVICES') is not None:
nvidia_visible_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES')
if not worker_id and Session.get_nvidia_visible_env() is not None:
nvidia_visible_devices = Session.get_nvidia_visible_env()
if nvidia_visible_devices and nvidia_visible_devices.lower() != 'none':
worker_id = '{}:{}gpu{}'.format(
worker_name, 'd' if dynamic_gpus else '', nvidia_visible_devices)

View File

@ -82,7 +82,7 @@ class ResourceMonitor(object):
if not worker_tags and ENV_WORKER_TAGS.get():
worker_tags = shlex.split(ENV_WORKER_TAGS.get())
self._worker_tags = worker_tags
if os.environ.get('NVIDIA_VISIBLE_DEVICES') == 'none':
if Session.get_nvidia_visible_env() == 'none':
# NVIDIA_VISIBLE_DEVICES set to none, marks cpu_only flag
# active_gpus == False means no GPU reporting
self._active_gpus = False
@ -92,10 +92,9 @@ class ResourceMonitor(object):
# None means no filtering, report all gpus
self._active_gpus = None
try:
active_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES', '') or \
os.environ.get('CUDA_VISIBLE_DEVICES', '')
active_gpus = Session.get_nvidia_visible_env() or ""
if active_gpus:
self._active_gpus = [int(g.strip()) for g in active_gpus.split(',')]
self._active_gpus = [g.strip() for g in active_gpus.split(',')]
except Exception:
pass
@ -263,7 +262,7 @@ class ResourceMonitor(object):
gpu_stat = self._gpustat.new_query()
for i, g in enumerate(gpu_stat.gpus):
# only monitor the active gpu's, if none were selected, monitor everything
if self._active_gpus and i not in self._active_gpus:
if self._active_gpus and str(i) not in self._active_gpus:
continue
stats["gpu_temperature_{:d}".format(i)] = g["temperature.gpu"]
stats["gpu_utilization_{:d}".format(i)] = g["utilization.gpu"]

View File

@ -76,7 +76,7 @@ class Session(_Session):
cpu_only = kwargs.get('cpu_only')
if cpu_only:
os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['NVIDIA_VISIBLE_DEVICES'] = 'none'
Session.set_nvidia_visible_env('none')
if kwargs.get('gpus') and not os.environ.get('KUBERNETES_SERVICE_HOST') \
and not os.environ.get('KUBERNETES_PORT'):
@ -85,7 +85,7 @@ class Session(_Session):
os.environ.pop('CUDA_VISIBLE_DEVICES', None)
os.environ['NVIDIA_VISIBLE_DEVICES'] = kwargs.get('gpus')
else:
os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['NVIDIA_VISIBLE_DEVICES'] = kwargs.get('gpus')
Session.set_nvidia_visible_env(kwargs.get('gpus'))
if kwargs.get('only_load_config'):
from clearml_agent.backend_api.config import load
@ -327,6 +327,23 @@ class Session(_Session):
def command(self, *args):
return Argv(*args, log=self.get_logger(Argv.__module__))
@staticmethod
def set_nvidia_visible_env(gpus):
if not gpus:
gpus = ""
visible_env = gpus.replace(".", ":") if isinstance(gpus, str) else \
','.join(str(g).replace(".", ":") for g in gpus)
os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['NVIDIA_VISIBLE_DEVICES'] = visible_env
@staticmethod
def get_nvidia_visible_env():
visible_env = os.environ.get('NVIDIA_VISIBLE_DEVICES') or os.environ.get('CUDA_VISIBLE_DEVICES')
if visible_env is None:
return None
visible_env = str(visible_env).replace(":", ".")
return visible_env
@attr.s
class TrainsAgentLogger(object):