Switch back main Lock guard to multiprocessing Lock

This commit is contained in:
allegroai 2022-07-31 19:32:29 +03:00
parent b661f13a49
commit fcad50b626
2 changed files with 14 additions and 60 deletions

View File

@ -83,7 +83,7 @@ from .utilities.proxy_object import (
from .utilities.resource_monitor import ResourceMonitor
from .utilities.seed import make_deterministic
from .utilities.lowlevel.threads import get_current_thread_id
from .utilities.process.mp import BackgroundMonitor, leave_process, fork_safe_objects_cleanup_process_lock
from .utilities.process.mp import BackgroundMonitor, leave_process
from .utilities.matching import matches_any_wildcard
from .utilities.parallel import FutureTaskCaller
# noinspection PyProtectedMember
@ -3606,10 +3606,6 @@ class Task(_Task):
if not is_sub_process and BackgroundMonitor.is_subprocess_enabled():
BackgroundMonitor.wait_for_sub_process(self)
# finally clean the proces locks
if not is_sub_process:
fork_safe_objects_cleanup_process_lock(tid=self.id, pid=os.getpid())
@classmethod
def __register_at_exit(cls, exit_callback, only_remove_signal_and_exception_hooks=False):
class ExitHooks(object):

View File

@ -4,7 +4,6 @@ import struct
import sys
from functools import partial
from multiprocessing import Process, Semaphore, Event as ProcessEvent
from tempfile import gettempdir
from threading import Thread, Event as TrEvent, RLock as ThreadRLock
from time import sleep, time
from typing import List, Dict, Optional
@ -13,7 +12,6 @@ import psutil
from six.moves.queue import Empty, Queue as TrQueue
from ..py3_interop import AbstractContextManager
from ...utilities.locks.utils import Lock as FileLock
try:
from multiprocessing import SimpleQueue
@ -36,61 +34,25 @@ except ImportError:
class _ForkSafeThreadSyncObject(object):
__process_lock = None
__process_lock_pid = None
@classmethod
def __get_lock_file(cls, tid=None, pid=None):
from ... import Task
# noinspection PyProtectedMember
tid = tid or Task._Task__get_master_id_task_id()
# noinspection PyProtectedMember
pid = pid or Task._Task__get_master_process_id()
if not pid or not tid:
return None
uid_file = os.path.join(gettempdir(), ".clearml_{}_{}.lock".format(tid, pid))
return uid_file
@classmethod
def _setup_lock(cls):
if cls.__process_lock_pid != os.getpid():
uid_file = cls.__get_lock_file()
if uid_file is None:
cls.__process_lock_pid = None
return None
cls.__process_lock = FileLock(uid_file)
cls.__process_lock_pid = os.getpid()
return cls.__process_lock
__process_lock = get_context("fork" if sys.platform == "linux" else "spawn").Lock()
@classmethod
def _inner_lock(cls):
if cls._setup_lock():
try:
cls._setup_lock().acquire()
except: # noqa
pass
try:
# let's agree that 90sec should be enough to get a lock on such a short protected piece of code.
# if we failed, we have to assume some deadlock happened
# (Python is not very safe with regrades to Process Locks)
cls.__process_lock.acquire(block=True, timeout=90)
except: # noqa
pass
@classmethod
def _inner_unlock(cls):
if cls._setup_lock():
try:
cls._setup_lock().release()
except: # noqa
pass
@classmethod
def cleanup_process_lock(cls, tid=None, pid=None):
if cls.__process_lock:
cls.__process_lock.delete_lock_file()
else:
uid_file = cls.__get_lock_file(tid=tid, pid=pid)
if uid_file:
try:
os.unlink(uid_file)
except: # noqa
pass
try:
cls.__process_lock.release()
except: # noqa
# if we fail to release we might not have locked it in the first place (see timeout)
pass
def __init__(self, functor):
self._sync = None
@ -886,7 +848,3 @@ def leave_process(status=0):
# ipython/jupyter notebook will not allow to call sys.exit
# we have to call the low level function
os._exit(status or 0) # noqa
def fork_safe_objects_cleanup_process_lock(tid=None, pid=None):
_ForkSafeThreadSyncObject.cleanup_process_lock(tid=tid, pid=pid)