diff --git a/clearml/task.py b/clearml/task.py index 4a09af2c..3f4da773 100644 --- a/clearml/task.py +++ b/clearml/task.py @@ -83,7 +83,7 @@ from .utilities.proxy_object import ( from .utilities.resource_monitor import ResourceMonitor from .utilities.seed import make_deterministic from .utilities.lowlevel.threads import get_current_thread_id -from .utilities.process.mp import BackgroundMonitor, leave_process, fork_safe_objects_cleanup_process_lock +from .utilities.process.mp import BackgroundMonitor, leave_process from .utilities.matching import matches_any_wildcard from .utilities.parallel import FutureTaskCaller # noinspection PyProtectedMember @@ -3606,10 +3606,6 @@ class Task(_Task): if not is_sub_process and BackgroundMonitor.is_subprocess_enabled(): BackgroundMonitor.wait_for_sub_process(self) - # finally clean the proces locks - if not is_sub_process: - fork_safe_objects_cleanup_process_lock(tid=self.id, pid=os.getpid()) - @classmethod def __register_at_exit(cls, exit_callback, only_remove_signal_and_exception_hooks=False): class ExitHooks(object): diff --git a/clearml/utilities/process/mp.py b/clearml/utilities/process/mp.py index 35180550..0bb0dd02 100644 --- a/clearml/utilities/process/mp.py +++ b/clearml/utilities/process/mp.py @@ -4,7 +4,6 @@ import struct import sys from functools import partial from multiprocessing import Process, Semaphore, Event as ProcessEvent -from tempfile import gettempdir from threading import Thread, Event as TrEvent, RLock as ThreadRLock from time import sleep, time from typing import List, Dict, Optional @@ -13,7 +12,6 @@ import psutil from six.moves.queue import Empty, Queue as TrQueue from ..py3_interop import AbstractContextManager -from ...utilities.locks.utils import Lock as FileLock try: from multiprocessing import SimpleQueue @@ -36,61 +34,25 @@ except ImportError: class _ForkSafeThreadSyncObject(object): - __process_lock = None - __process_lock_pid = None - - @classmethod - def __get_lock_file(cls, tid=None, pid=None): - from ... import Task - # noinspection PyProtectedMember - tid = tid or Task._Task__get_master_id_task_id() - # noinspection PyProtectedMember - pid = pid or Task._Task__get_master_process_id() - if not pid or not tid: - return None - - uid_file = os.path.join(gettempdir(), ".clearml_{}_{}.lock".format(tid, pid)) - return uid_file - - @classmethod - def _setup_lock(cls): - if cls.__process_lock_pid != os.getpid(): - uid_file = cls.__get_lock_file() - if uid_file is None: - cls.__process_lock_pid = None - return None - - cls.__process_lock = FileLock(uid_file) - cls.__process_lock_pid = os.getpid() - return cls.__process_lock + __process_lock = get_context("fork" if sys.platform == "linux" else "spawn").Lock() @classmethod def _inner_lock(cls): - if cls._setup_lock(): - try: - cls._setup_lock().acquire() - except: # noqa - pass + try: + # let's agree that 90sec should be enough to get a lock on such a short protected piece of code. + # if we failed, we have to assume some deadlock happened + # (Python is not very safe with regrades to Process Locks) + cls.__process_lock.acquire(block=True, timeout=90) + except: # noqa + pass @classmethod def _inner_unlock(cls): - if cls._setup_lock(): - try: - cls._setup_lock().release() - except: # noqa - pass - - @classmethod - def cleanup_process_lock(cls, tid=None, pid=None): - if cls.__process_lock: - cls.__process_lock.delete_lock_file() - else: - uid_file = cls.__get_lock_file(tid=tid, pid=pid) - if uid_file: - try: - os.unlink(uid_file) - except: # noqa - pass + try: + cls.__process_lock.release() + except: # noqa + # if we fail to release we might not have locked it in the first place (see timeout) + pass def __init__(self, functor): self._sync = None @@ -886,7 +848,3 @@ def leave_process(status=0): # ipython/jupyter notebook will not allow to call sys.exit # we have to call the low level function os._exit(status or 0) # noqa - - -def fork_safe_objects_cleanup_process_lock(tid=None, pid=None): - _ForkSafeThreadSyncObject.cleanup_process_lock(tid=tid, pid=pid)