From 22c5f043aa0ab007d9a23b7b08ae9ff2824ff52e Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sun, 31 May 2020 14:00:14 +0300 Subject: [PATCH] Fix detached mode to correctly use cache folder slots --- trains_agent/commands/worker.py | 7 +++++-- trains_agent/helper/singleton.py | 23 +++++++++++++++++++++-- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/trains_agent/commands/worker.py b/trains_agent/commands/worker.py index e1fb92e..8e74e91 100644 --- a/trains_agent/commands/worker.py +++ b/trains_agent/commands/worker.py @@ -737,6 +737,8 @@ class Worker(ServiceCommandSection): # in detached mode # fully detach stdin.stdout/stderr and leave main process, running in the background daemonize_process(out_file.fileno()) + # make sure we update the singleton lock file to the new pid + Singleton.update_pid_file() # reprint headers to std file (we are now inside the daemon process) print("Worker \"{}\" :".format(self.worker_id)) self._session.print_configuration() @@ -2277,8 +2279,9 @@ class Worker(ServiceCommandSection): else: worker_name = '{}:cpu'.format(worker_name) - self.worker_id, worker_slot = Singleton.register_instance(unique_worker_id=worker_id, worker_name=worker_name, - api_client=self._session.api_client) + self.worker_id, worker_slot = Singleton.register_instance( + unique_worker_id=worker_id, worker_name=worker_name, api_client=self._session.api_client) + if self.worker_id is None: error('Instance with the same WORKER_ID [{}] is already running'.format(worker_id)) exit(1) diff --git a/trains_agent/helper/singleton.py b/trains_agent/helper/singleton.py index ec3a7a9..8507c9e 100644 --- a/trains_agent/helper/singleton.py +++ b/trains_agent/helper/singleton.py @@ -18,6 +18,24 @@ class Singleton(object): _pid_file = None _lock_file_name = sep+prefix+sep+'global.lock' _lock_timeout = 10 + _pid = None + + @classmethod + def update_pid_file(cls): + new_pid = str(os.getpid()) + if not cls._pid_file or cls._pid == new_pid: + return + old_name = cls._pid_file.name + parts = cls._pid_file.name.split(os.path.sep) + parts[-1] = parts[-1].replace(cls.sep + cls._pid + cls.sep, cls.sep + new_pid + cls.sep) + new_pid_file = os.path.sep.join(parts) + cls._pid = new_pid + cls._pid_file.name = new_pid_file + # we need to rename to match new pid + try: + os.rename(old_name, new_pid_file) + except: + pass @classmethod def register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None): @@ -124,8 +142,9 @@ class Singleton(object): unique_worker_id = worker_name + cls.worker_name_sep + str(cls.instance_slot) # create lock - cls._pid_file = NamedTemporaryFile(dir=cls._get_temp_folder(), - prefix=cls.prefix + cls.sep + str(os.getpid()) + cls.sep, suffix=cls.ext) + cls._pid = str(os.getpid()) + cls._pid_file = NamedTemporaryFile( + dir=cls._get_temp_folder(), prefix=cls.prefix + cls.sep + cls._pid + cls.sep, suffix=cls.ext) cls._pid_file.write(('{}\n{}'.format(unique_worker_id, cls.instance_slot)).encode()) cls._pid_file.flush() cls.worker_id = unique_worker_id