Use file lock instead of process lock to avoid future deadlocks since python process lock is not process safe (killing a process holding a lock will Not release the lock)

This commit is contained in:
allegroai 2022-07-28 18:49:18 +03:00
parent 136b0c33e7
commit 44a4dc99b3
2 changed files with 68 additions and 5 deletions

View File

@ -83,7 +83,7 @@ from .utilities.proxy_object import (
from .utilities.resource_monitor import ResourceMonitor
from .utilities.seed import make_deterministic
from .utilities.lowlevel.threads import get_current_thread_id
from .utilities.process.mp import BackgroundMonitor, leave_process
from .utilities.process.mp import BackgroundMonitor, leave_process, fork_safe_objects_cleanup_process_lock
from .utilities.matching import matches_any_wildcard
from .utilities.parallel import FutureTaskCaller
# noinspection PyProtectedMember
@ -3606,6 +3606,10 @@ class Task(_Task):
if not is_sub_process and BackgroundMonitor.is_subprocess_enabled():
BackgroundMonitor.wait_for_sub_process(self)
# finally clean the proces locks
if not is_sub_process:
fork_safe_objects_cleanup_process_lock(tid=self.id, pid=os.getpid())
@classmethod
def __register_at_exit(cls, exit_callback, only_remove_signal_and_exception_hooks=False):
class ExitHooks(object):

View File

@ -4,6 +4,7 @@ import struct
import sys
from functools import partial
from multiprocessing import Process, Semaphore, Event as ProcessEvent
from tempfile import gettempdir
from threading import Thread, Event as TrEvent, RLock as ThreadRLock
from time import sleep, time
from typing import List, Dict, Optional
@ -12,6 +13,7 @@ import psutil
from six.moves.queue import Empty, Queue as TrQueue
from ..py3_interop import AbstractContextManager
from ...utilities.locks.utils import Lock as FileLock
try:
from multiprocessing import SimpleQueue
@ -34,7 +36,61 @@ except ImportError:
class _ForkSafeThreadSyncObject(object):
__process_lock = get_context("fork" if sys.platform == 'linux' else "spawn").RLock()
__process_lock = None
__process_lock_pid = None
@classmethod
def __get_lock_file(cls, tid=None, pid=None):
from ... import Task
# noinspection PyProtectedMember
tid = tid or Task._Task__get_master_id_task_id()
# noinspection PyProtectedMember
pid = pid or Task._Task__get_master_process_id()
if not pid or not tid:
return None
uid_file = os.path.join(gettempdir(), ".clearml_{}_{}.lock".format(tid, pid))
return uid_file
@classmethod
def _setup_lock(cls):
if cls.__process_lock_pid != os.getpid():
uid_file = cls.__get_lock_file()
if uid_file is None:
cls.__process_lock_pid = None
return None
cls.__process_lock = FileLock(uid_file)
cls.__process_lock_pid = os.getpid()
return cls.__process_lock
@classmethod
def _inner_lock(cls):
if cls._setup_lock():
try:
cls._setup_lock().acquire()
except: # noqa
pass
@classmethod
def _inner_unlock(cls):
if cls._setup_lock():
try:
cls._setup_lock().release()
except: # noqa
pass
@classmethod
def cleanup_process_lock(cls, tid=None, pid=None):
if cls.__process_lock:
cls.__process_lock.delete_lock_file()
else:
uid_file = cls.__get_lock_file(tid=tid, pid=pid)
if uid_file:
try:
os.unlink(uid_file)
except: # noqa
pass
def __init__(self, functor):
self._sync = None
@ -52,15 +108,14 @@ class _ForkSafeThreadSyncObject(object):
# Notice the order! we first create the object and THEN update the pid,
# this is so whatever happens we Never try to used the old (pre-forked copy) of the synchronization object
try:
while not self.__process_lock.acquire(block=True, timeout=1.0):
sleep(0.1)
self._inner_lock()
# we have to check gain inside the protected locked area
if self._instance_pid != os.getpid() or not self._sync:
self._sync = self._functor()
self._instance_pid = os.getpid()
finally:
self.__process_lock.release()
self._inner_unlock()
class ForkSafeRLock(_ForkSafeThreadSyncObject):
@ -827,3 +882,7 @@ def leave_process(status=0):
# ipython/jupyter notebook will not allow to call sys.exit
# we have to call the low level function
os._exit(status or 0) # noqa
def fork_safe_objects_cleanup_process_lock(tid=None, pid=None):
_ForkSafeThreadSyncObject.cleanup_process_lock(tid=tid, pid=pid)