2019-10-25 19:28:44 +00:00
|
|
|
import os
|
|
|
|
import psutil
|
|
|
|
from time import sleep
|
|
|
|
from glob import glob
|
|
|
|
from tempfile import gettempdir, NamedTemporaryFile
|
|
|
|
|
2020-11-11 14:32:47 +00:00
|
|
|
from typing import List, Tuple, Optional
|
|
|
|
|
2020-12-22 21:00:57 +00:00
|
|
|
from clearml_agent.definitions import ENV_DOCKER_HOST_MOUNT
|
2021-02-14 11:45:17 +00:00
|
|
|
from clearml_agent.helper.base import warning, is_windows_platform, safe_remove_file
|
2019-10-25 19:28:44 +00:00
|
|
|
|
|
|
|
|
|
|
|
class Singleton(object):
|
2020-12-22 21:00:57 +00:00
|
|
|
prefix = '.clearmlagent'
|
2019-10-25 19:28:44 +00:00
|
|
|
sep = '_'
|
|
|
|
ext = '.tmp'
|
|
|
|
worker_id = None
|
|
|
|
worker_name_sep = ':'
|
|
|
|
instance_slot = None
|
|
|
|
_pid_file = None
|
|
|
|
_lock_file_name = sep+prefix+sep+'global.lock'
|
|
|
|
_lock_timeout = 10
|
2020-05-31 11:00:14 +00:00
|
|
|
_pid = None
|
|
|
|
|
2021-02-14 11:45:17 +00:00
|
|
|
@classmethod
|
|
|
|
def close_pid_file(cls):
|
|
|
|
if cls._pid_file:
|
|
|
|
cls._pid_file.close()
|
|
|
|
safe_remove_file(cls._pid_file.name)
|
|
|
|
cls._pid_file = None
|
|
|
|
|
2020-05-31 11:00:14 +00:00
|
|
|
@classmethod
|
|
|
|
def update_pid_file(cls):
|
|
|
|
new_pid = str(os.getpid())
|
|
|
|
if not cls._pid_file or cls._pid == new_pid:
|
|
|
|
return
|
|
|
|
old_name = cls._pid_file.name
|
|
|
|
parts = cls._pid_file.name.split(os.path.sep)
|
|
|
|
parts[-1] = parts[-1].replace(cls.sep + cls._pid + cls.sep, cls.sep + new_pid + cls.sep)
|
|
|
|
new_pid_file = os.path.sep.join(parts)
|
|
|
|
cls._pid = new_pid
|
|
|
|
cls._pid_file.name = new_pid_file
|
|
|
|
# we need to rename to match new pid
|
|
|
|
try:
|
|
|
|
os.rename(old_name, new_pid_file)
|
|
|
|
except:
|
|
|
|
pass
|
2019-10-25 19:28:44 +00:00
|
|
|
|
2020-07-10 22:42:56 +00:00
|
|
|
@classmethod
|
|
|
|
def get_lock_filename(cls):
|
|
|
|
return os.path.join(cls._get_temp_folder(), cls._lock_file_name)
|
|
|
|
|
2019-10-25 19:28:44 +00:00
|
|
|
@classmethod
|
2020-06-01 13:34:33 +00:00
|
|
|
def register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False):
|
2019-10-25 19:28:44 +00:00
|
|
|
"""
|
|
|
|
# Exit the process if another instance of us is using the same worker_id
|
|
|
|
|
|
|
|
:param unique_worker_id: if already exists, return negative
|
|
|
|
:param worker_name: slot number will be added to worker name, based on the available instance slot
|
|
|
|
:return: (str worker_id, int slot_number) Return None value on instance already running
|
|
|
|
"""
|
|
|
|
# try to lock file
|
2020-07-10 22:42:56 +00:00
|
|
|
lock_file = cls.get_lock_filename()
|
2019-10-25 19:28:44 +00:00
|
|
|
timeout = 0
|
|
|
|
while os.path.exists(lock_file):
|
|
|
|
if timeout > cls._lock_timeout:
|
|
|
|
warning('lock file timed out {}sec - clearing lock'.format(cls._lock_timeout))
|
|
|
|
try:
|
|
|
|
os.remove(lock_file)
|
|
|
|
except Exception:
|
|
|
|
pass
|
|
|
|
break
|
|
|
|
|
|
|
|
sleep(1)
|
|
|
|
timeout += 1
|
|
|
|
|
|
|
|
with open(lock_file, 'wb') as f:
|
|
|
|
f.write(bytes(os.getpid()))
|
|
|
|
f.flush()
|
|
|
|
try:
|
2020-06-01 13:34:33 +00:00
|
|
|
ret = cls._register_instance(
|
|
|
|
unique_worker_id=unique_worker_id, worker_name=worker_name,
|
|
|
|
api_client=api_client, allow_double=allow_double)
|
2019-10-25 19:28:44 +00:00
|
|
|
except:
|
|
|
|
ret = None, None
|
|
|
|
|
|
|
|
try:
|
|
|
|
os.remove(lock_file)
|
|
|
|
except Exception:
|
|
|
|
pass
|
|
|
|
|
|
|
|
return ret
|
|
|
|
|
|
|
|
@classmethod
|
2020-07-10 22:42:56 +00:00
|
|
|
def get_running_pids(cls):
|
2020-11-11 14:32:47 +00:00
|
|
|
# type: () -> List[Tuple[int, Optional[str], Optional[int], str]]
|
2020-03-05 11:13:03 +00:00
|
|
|
temp_folder = cls._get_temp_folder()
|
2019-10-25 19:28:44 +00:00
|
|
|
files = glob(os.path.join(temp_folder, cls.prefix + cls.sep + '*' + cls.ext))
|
2020-07-10 22:42:56 +00:00
|
|
|
pids = []
|
2019-10-25 19:28:44 +00:00
|
|
|
for file in files:
|
2020-11-11 14:32:47 +00:00
|
|
|
parts = os.path.basename(file).split(cls.sep)
|
2020-07-10 22:42:56 +00:00
|
|
|
# noinspection PyBroadException
|
2019-10-25 19:28:44 +00:00
|
|
|
try:
|
|
|
|
pid = int(parts[1])
|
2020-07-10 22:42:56 +00:00
|
|
|
if not psutil.pid_exists(pid):
|
|
|
|
pid = -1
|
2019-10-25 19:28:44 +00:00
|
|
|
except Exception:
|
|
|
|
# something is wrong, use non existing pid and delete the file
|
|
|
|
pid = -1
|
2020-03-05 11:13:03 +00:00
|
|
|
|
|
|
|
uid, slot = None, None
|
2020-07-10 22:42:56 +00:00
|
|
|
# noinspection PyBroadException
|
2020-03-05 11:13:03 +00:00
|
|
|
try:
|
|
|
|
with open(file, 'r') as f:
|
|
|
|
uid, slot = str(f.read()).split('\n')
|
|
|
|
slot = int(slot)
|
|
|
|
except Exception:
|
|
|
|
pass
|
2020-07-10 22:42:56 +00:00
|
|
|
pids.append((pid, uid, slot, file))
|
|
|
|
|
|
|
|
return pids
|
2020-03-05 11:13:03 +00:00
|
|
|
|
2020-07-10 22:42:56 +00:00
|
|
|
@classmethod
|
|
|
|
def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None, allow_double=False):
|
2021-02-14 11:45:17 +00:00
|
|
|
if cls.worker_id and cls.instance_slot is not None:
|
2020-07-10 22:42:56 +00:00
|
|
|
return cls.worker_id, cls.instance_slot
|
|
|
|
# make sure we have a unique name
|
|
|
|
instance_num = 0
|
|
|
|
slots = {}
|
|
|
|
for pid, uid, slot, file in cls.get_running_pids():
|
2020-03-05 11:13:03 +00:00
|
|
|
worker = None
|
2020-05-09 17:02:46 +00:00
|
|
|
if api_client and ENV_DOCKER_HOST_MOUNT.get() and uid:
|
2020-03-05 11:13:03 +00:00
|
|
|
try:
|
|
|
|
worker = [w for w in api_client.workers.get_all() if w.id == uid]
|
|
|
|
except Exception:
|
|
|
|
worker = None
|
|
|
|
|
2019-10-25 19:28:44 +00:00
|
|
|
# count active instances and delete dead files
|
2020-07-10 22:42:56 +00:00
|
|
|
if not worker and pid < 0:
|
2019-10-25 19:28:44 +00:00
|
|
|
# delete the file
|
|
|
|
try:
|
|
|
|
os.remove(os.path.join(file))
|
|
|
|
except Exception:
|
|
|
|
pass
|
|
|
|
continue
|
|
|
|
|
|
|
|
instance_num += 1
|
2020-03-05 11:13:03 +00:00
|
|
|
if slot is None:
|
2019-10-25 19:28:44 +00:00
|
|
|
continue
|
|
|
|
|
|
|
|
if uid == unique_worker_id:
|
2020-06-01 13:34:33 +00:00
|
|
|
if allow_double:
|
|
|
|
warning('Instance with the same WORKER_ID [{}] was found on this machine. '
|
|
|
|
'We are ignoring it, make sure this not a mistake.'.format(unique_worker_id))
|
|
|
|
else:
|
|
|
|
return None, None
|
2019-10-25 19:28:44 +00:00
|
|
|
|
|
|
|
slots[slot] = uid
|
|
|
|
|
|
|
|
# get a new slot
|
|
|
|
if not slots:
|
|
|
|
cls.instance_slot = 0
|
|
|
|
else:
|
|
|
|
# guarantee we have the minimal slot possible
|
|
|
|
for i in range(max(slots.keys())+2):
|
|
|
|
if i not in slots:
|
|
|
|
cls.instance_slot = i
|
|
|
|
break
|
|
|
|
|
|
|
|
# build worker id based on slot
|
|
|
|
if not unique_worker_id:
|
|
|
|
unique_worker_id = worker_name + cls.worker_name_sep + str(cls.instance_slot)
|
|
|
|
|
|
|
|
# create lock
|
2020-05-31 11:00:14 +00:00
|
|
|
cls._pid = str(os.getpid())
|
|
|
|
cls._pid_file = NamedTemporaryFile(
|
2021-02-14 11:44:39 +00:00
|
|
|
dir=cls._get_temp_folder(), prefix=cls.prefix + cls.sep + cls._pid + cls.sep, suffix=cls.ext,
|
|
|
|
delete=False if is_windows_platform() else True
|
|
|
|
)
|
2019-10-25 19:28:44 +00:00
|
|
|
cls._pid_file.write(('{}\n{}'.format(unique_worker_id, cls.instance_slot)).encode())
|
|
|
|
cls._pid_file.flush()
|
|
|
|
cls.worker_id = unique_worker_id
|
|
|
|
|
|
|
|
return cls.worker_id, cls.instance_slot
|
2020-03-05 11:13:03 +00:00
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def _get_temp_folder(cls):
|
2020-05-09 17:02:46 +00:00
|
|
|
if ENV_DOCKER_HOST_MOUNT.get():
|
|
|
|
return ENV_DOCKER_HOST_MOUNT.get().split(':')[-1]
|
2020-03-05 11:13:03 +00:00
|
|
|
return gettempdir()
|
2020-03-22 17:16:11 +00:00
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def get_slot(cls):
|
|
|
|
return cls.instance_slot or 0
|
2020-07-10 22:42:56 +00:00
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def get_pid_file(cls):
|
|
|
|
if not cls._pid_file:
|
|
|
|
return None
|
|
|
|
return cls._pid_file.name
|