mirror of
https://github.com/clearml/clearml-agent
synced 2025-06-26 18:16:15 +00:00
Add support for mounting dockerized experiment folders to host when running on K8s in daemon mode
This commit is contained in:
@@ -463,6 +463,17 @@ def rm_tree(root): # type: (Union[Path, Text]) -> None
|
||||
return shutil.rmtree(os.path.expanduser(os.path.expandvars(Text(root))), onerror=on_error)
|
||||
|
||||
|
||||
def rm_file(filename): # type: (Union[Path, Text]) -> None
|
||||
"""
|
||||
A version of os.unlink that will not raise error
|
||||
"""
|
||||
try:
|
||||
os.unlink(os.path.expanduser(os.path.expandvars(Text(filename))))
|
||||
except:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def is_conda(config):
|
||||
return config['agent.package_manager.type'].lower() == 'conda'
|
||||
|
||||
|
||||
@@ -4,11 +4,12 @@ from time import sleep
|
||||
from glob import glob
|
||||
from tempfile import gettempdir, NamedTemporaryFile
|
||||
|
||||
from trains_agent.definitions import ENV_K8S_HOST_MOUNT
|
||||
from trains_agent.helper.base import warning
|
||||
|
||||
|
||||
class Singleton(object):
|
||||
prefix = 'trainsagent'
|
||||
prefix = '.trainsagent'
|
||||
sep = '_'
|
||||
ext = '.tmp'
|
||||
worker_id = None
|
||||
@@ -19,7 +20,7 @@ class Singleton(object):
|
||||
_lock_timeout = 10
|
||||
|
||||
@classmethod
|
||||
def register_instance(cls, unique_worker_id=None, worker_name=None):
|
||||
def register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None):
|
||||
"""
|
||||
# Exit the process if another instance of us is using the same worker_id
|
||||
|
||||
@@ -28,7 +29,7 @@ class Singleton(object):
|
||||
:return: (str worker_id, int slot_number) Return None value on instance already running
|
||||
"""
|
||||
# try to lock file
|
||||
lock_file = os.path.join(gettempdir(), cls._lock_file_name)
|
||||
lock_file = os.path.join(cls._get_temp_folder(), cls._lock_file_name)
|
||||
timeout = 0
|
||||
while os.path.exists(lock_file):
|
||||
if timeout > cls._lock_timeout:
|
||||
@@ -46,7 +47,8 @@ class Singleton(object):
|
||||
f.write(bytes(os.getpid()))
|
||||
f.flush()
|
||||
try:
|
||||
ret = cls._register_instance(unique_worker_id=unique_worker_id, worker_name=worker_name)
|
||||
ret = cls._register_instance(unique_worker_id=unique_worker_id, worker_name=worker_name,
|
||||
api_client=api_client)
|
||||
except:
|
||||
ret = None, None
|
||||
|
||||
@@ -58,12 +60,12 @@ class Singleton(object):
|
||||
return ret
|
||||
|
||||
@classmethod
|
||||
def _register_instance(cls, unique_worker_id=None, worker_name=None):
|
||||
def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None):
|
||||
if cls.worker_id:
|
||||
return cls.worker_id, cls.instance_slot
|
||||
# make sure we have a unique name
|
||||
instance_num = 0
|
||||
temp_folder = gettempdir()
|
||||
temp_folder = cls._get_temp_folder()
|
||||
files = glob(os.path.join(temp_folder, cls.prefix + cls.sep + '*' + cls.ext))
|
||||
slots = {}
|
||||
for file in files:
|
||||
@@ -73,8 +75,24 @@ class Singleton(object):
|
||||
except Exception:
|
||||
# something is wrong, use non existing pid and delete the file
|
||||
pid = -1
|
||||
|
||||
uid, slot = None, None
|
||||
try:
|
||||
with open(file, 'r') as f:
|
||||
uid, slot = str(f.read()).split('\n')
|
||||
slot = int(slot)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
worker = None
|
||||
if api_client and os.environ.get(ENV_K8S_HOST_MOUNT) and uid:
|
||||
try:
|
||||
worker = [w for w in api_client.workers.get_all() if w.id == uid]
|
||||
except Exception:
|
||||
worker = None
|
||||
|
||||
# count active instances and delete dead files
|
||||
if not psutil.pid_exists(pid):
|
||||
if not worker and not psutil.pid_exists(pid):
|
||||
# delete the file
|
||||
try:
|
||||
os.remove(os.path.join(file))
|
||||
@@ -83,11 +101,7 @@ class Singleton(object):
|
||||
continue
|
||||
|
||||
instance_num += 1
|
||||
try:
|
||||
with open(file, 'r') as f:
|
||||
uid, slot = str(f.read()).split('\n')
|
||||
slot = int(slot)
|
||||
except Exception:
|
||||
if slot is None:
|
||||
continue
|
||||
|
||||
if uid == unique_worker_id:
|
||||
@@ -110,10 +124,16 @@ class Singleton(object):
|
||||
unique_worker_id = worker_name + cls.worker_name_sep + str(cls.instance_slot)
|
||||
|
||||
# create lock
|
||||
cls._pid_file = NamedTemporaryFile(dir=gettempdir(), prefix=cls.prefix + cls.sep + str(os.getpid()) + cls.sep,
|
||||
suffix=cls.ext)
|
||||
cls._pid_file = NamedTemporaryFile(dir=cls._get_temp_folder(),
|
||||
prefix=cls.prefix + cls.sep + str(os.getpid()) + cls.sep, suffix=cls.ext)
|
||||
cls._pid_file.write(('{}\n{}'.format(unique_worker_id, cls.instance_slot)).encode())
|
||||
cls._pid_file.flush()
|
||||
cls.worker_id = unique_worker_id
|
||||
|
||||
return cls.worker_id, cls.instance_slot
|
||||
|
||||
@classmethod
|
||||
def _get_temp_folder(cls):
|
||||
if os.environ.get(ENV_K8S_HOST_MOUNT):
|
||||
return os.environ.get(ENV_K8S_HOST_MOUNT).split(':')[-1]
|
||||
return gettempdir()
|
||||
|
||||
Reference in New Issue
Block a user