mirror of
https://github.com/clearml/clearml
synced 2025-04-09 23:24:31 +00:00
Add portalocker for inter-process lock
This commit is contained in:
parent
2e3820603a
commit
b7358d7fef
1
trains/utilities/locks/__init__.py
Normal file
1
trains/utilities/locks/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from .utils import RLock, Lock
|
41
trains/utilities/locks/constants.py
Normal file
41
trains/utilities/locks/constants.py
Normal file
@ -0,0 +1,41 @@
|
||||
"""
|
||||
Locking constants
|
||||
|
||||
Lock types:
|
||||
|
||||
- `LOCK_EX` exclusive lock
|
||||
- `LOCK_SH` shared lock
|
||||
|
||||
Lock flags:
|
||||
|
||||
- `LOCK_NB` non-blocking
|
||||
|
||||
Manually unlock, only needed internally
|
||||
|
||||
- `LOCK_UN` unlock
|
||||
"""
|
||||
import os
|
||||
|
||||
# The actual tests will execute the code anyhow so the following code can
|
||||
# safely be ignored from the coverage tests
|
||||
if os.name == 'nt': # pragma: no cover
|
||||
import msvcrt
|
||||
|
||||
LOCK_EX = 0x1 #: exclusive lock
|
||||
LOCK_SH = 0x2 #: shared lock
|
||||
LOCK_NB = 0x4 #: non-blocking
|
||||
LOCK_UN = msvcrt.LK_UNLCK #: unlock
|
||||
|
||||
LOCKFILE_FAIL_IMMEDIATELY = 1
|
||||
LOCKFILE_EXCLUSIVE_LOCK = 2
|
||||
|
||||
elif os.name == 'posix': # pragma: no cover
|
||||
import fcntl
|
||||
|
||||
LOCK_EX = fcntl.LOCK_EX #: exclusive lock
|
||||
LOCK_SH = fcntl.LOCK_SH #: shared lock
|
||||
LOCK_NB = fcntl.LOCK_NB #: non-blocking
|
||||
LOCK_UN = fcntl.LOCK_UN #: unlock
|
||||
|
||||
else: # pragma: no cover
|
||||
raise RuntimeError('PortaLocker only defined for nt and posix platforms')
|
19
trains/utilities/locks/exceptions.py
Normal file
19
trains/utilities/locks/exceptions.py
Normal file
@ -0,0 +1,19 @@
|
||||
class BaseLockException(Exception):
|
||||
# Error codes:
|
||||
LOCK_FAILED = 1
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.fh = kwargs.pop('fh', None)
|
||||
Exception.__init__(self, *args, **kwargs)
|
||||
|
||||
|
||||
class LockException(BaseLockException):
|
||||
pass
|
||||
|
||||
|
||||
class AlreadyLocked(BaseLockException):
|
||||
pass
|
||||
|
||||
|
||||
class FileToLarge(BaseLockException):
|
||||
pass
|
151
trains/utilities/locks/portalocker.py
Normal file
151
trains/utilities/locks/portalocker.py
Normal file
@ -0,0 +1,151 @@
|
||||
import os
|
||||
import sys
|
||||
from . import exceptions
|
||||
from . import constants
|
||||
|
||||
|
||||
if os.name == 'nt': # pragma: no cover
|
||||
import msvcrt
|
||||
|
||||
if sys.version_info.major == 2:
|
||||
lock_length = -1
|
||||
else:
|
||||
lock_length = int(2**31 - 1)
|
||||
|
||||
def lock(file_, flags):
|
||||
if flags & constants.LOCK_SH:
|
||||
import win32file
|
||||
import pywintypes
|
||||
import winerror
|
||||
__overlapped = pywintypes.OVERLAPPED()
|
||||
if sys.version_info.major == 2:
|
||||
if flags & constants.LOCK_NB:
|
||||
mode = constants.LOCKFILE_FAIL_IMMEDIATELY
|
||||
else:
|
||||
mode = 0
|
||||
|
||||
else:
|
||||
if flags & constants.LOCK_NB:
|
||||
mode = msvcrt.LK_NBRLCK
|
||||
else:
|
||||
mode = msvcrt.LK_RLCK
|
||||
|
||||
# is there any reason not to reuse the following structure?
|
||||
hfile = win32file._get_osfhandle(file_.fileno())
|
||||
try:
|
||||
win32file.LockFileEx(hfile, mode, 0, -0x10000, __overlapped)
|
||||
except pywintypes.error as exc_value:
|
||||
# error: (33, 'LockFileEx', 'The process cannot access the file
|
||||
# because another process has locked a portion of the file.')
|
||||
if exc_value.winerror == winerror.ERROR_LOCK_VIOLATION:
|
||||
raise exceptions.LockException(
|
||||
exceptions.LockException.LOCK_FAILED,
|
||||
exc_value.strerror,
|
||||
fh=file_)
|
||||
else:
|
||||
# Q: Are there exceptions/codes we should be dealing with
|
||||
# here?
|
||||
raise
|
||||
else:
|
||||
mode = constants.LOCKFILE_EXCLUSIVE_LOCK
|
||||
if flags & constants.LOCK_NB:
|
||||
mode |= constants.LOCKFILE_FAIL_IMMEDIATELY
|
||||
|
||||
if flags & constants.LOCK_NB:
|
||||
mode = msvcrt.LK_NBLCK
|
||||
else:
|
||||
mode = msvcrt.LK_LOCK
|
||||
|
||||
# windows locks byte ranges, so make sure to lock from file start
|
||||
try:
|
||||
savepos = file_.tell()
|
||||
if savepos:
|
||||
# [ ] test exclusive lock fails on seek here
|
||||
# [ ] test if shared lock passes this point
|
||||
file_.seek(0)
|
||||
# [x] check if 0 param locks entire file (not documented in
|
||||
# Python)
|
||||
# [x] fails with "IOError: [Errno 13] Permission denied",
|
||||
# but -1 seems to do the trick
|
||||
|
||||
try:
|
||||
msvcrt.locking(file_.fileno(), mode, lock_length)
|
||||
except IOError as exc_value:
|
||||
# [ ] be more specific here
|
||||
raise exceptions.LockException(
|
||||
exceptions.LockException.LOCK_FAILED,
|
||||
exc_value.strerror,
|
||||
fh=file_)
|
||||
finally:
|
||||
if savepos:
|
||||
file_.seek(savepos)
|
||||
except IOError as exc_value:
|
||||
raise exceptions.LockException(
|
||||
exceptions.LockException.LOCK_FAILED, exc_value.strerror,
|
||||
fh=file_)
|
||||
|
||||
def unlock(file_):
|
||||
try:
|
||||
savepos = file_.tell()
|
||||
if savepos:
|
||||
file_.seek(0)
|
||||
|
||||
try:
|
||||
msvcrt.locking(file_.fileno(), constants.LOCK_UN, lock_length)
|
||||
except IOError as exc_value:
|
||||
if exc_value.strerror == 'Permission denied':
|
||||
import pywintypes
|
||||
import win32file
|
||||
import winerror
|
||||
__overlapped = pywintypes.OVERLAPPED()
|
||||
hfile = win32file._get_osfhandle(file_.fileno())
|
||||
try:
|
||||
win32file.UnlockFileEx(
|
||||
hfile, 0, -0x10000, __overlapped)
|
||||
except pywintypes.error as exc_value:
|
||||
if exc_value.winerror == winerror.ERROR_NOT_LOCKED:
|
||||
# error: (158, 'UnlockFileEx',
|
||||
# 'The segment is already unlocked.')
|
||||
# To match the 'posix' implementation, silently
|
||||
# ignore this error
|
||||
pass
|
||||
else:
|
||||
# Q: Are there exceptions/codes we should be
|
||||
# dealing with here?
|
||||
raise
|
||||
else:
|
||||
raise exceptions.LockException(
|
||||
exceptions.LockException.LOCK_FAILED,
|
||||
exc_value.strerror,
|
||||
fh=file_)
|
||||
finally:
|
||||
if savepos:
|
||||
file_.seek(savepos)
|
||||
except IOError as exc_value:
|
||||
raise exceptions.LockException(
|
||||
exceptions.LockException.LOCK_FAILED, exc_value.strerror,
|
||||
fh=file_)
|
||||
|
||||
elif os.name == 'posix': # pragma: no cover
|
||||
import fcntl
|
||||
|
||||
def lock(file_, flags):
|
||||
locking_exceptions = IOError,
|
||||
try: # pragma: no cover
|
||||
locking_exceptions += BlockingIOError,
|
||||
except NameError: # pragma: no cover
|
||||
pass
|
||||
|
||||
try:
|
||||
fcntl.flock(file_.fileno(), flags)
|
||||
except locking_exceptions as exc_value:
|
||||
# The exception code varies on different systems so we'll catch
|
||||
# every IO error
|
||||
raise exceptions.LockException(exc_value, fh=file_)
|
||||
|
||||
def unlock(file_):
|
||||
fcntl.flock(file_.fileno(), constants.LOCK_UN)
|
||||
|
||||
else: # pragma: no cover
|
||||
raise RuntimeError('PortaLocker only defined for nt and posix platforms')
|
||||
|
306
trains/utilities/locks/utils.py
Normal file
306
trains/utilities/locks/utils.py
Normal file
@ -0,0 +1,306 @@
|
||||
import os
|
||||
import time
|
||||
import atexit
|
||||
import tempfile
|
||||
import contextlib
|
||||
from multiprocessing import RLock as ProcessRLock
|
||||
from . import exceptions
|
||||
from . import constants
|
||||
from . import portalocker
|
||||
|
||||
current_time = getattr(time, "monotonic", time.time)
|
||||
|
||||
DEFAULT_TIMEOUT = 10 ** 8
|
||||
DEFAULT_CHECK_INTERVAL = 0.25
|
||||
LOCK_METHOD = constants.LOCK_EX | constants.LOCK_NB
|
||||
|
||||
__all__ = [
|
||||
'Lock',
|
||||
'RLock',
|
||||
'open_atomic',
|
||||
]
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def open_atomic(filename, binary=True):
|
||||
'''Open a file for atomic writing. Instead of locking this method allows
|
||||
you to write the entire file and move it to the actual location. Note that
|
||||
this makes the assumption that a rename is atomic on your platform which
|
||||
is generally the case but not a guarantee.
|
||||
|
||||
http://docs.python.org/library/os.html#os.rename
|
||||
|
||||
>>> filename = 'test_file.txt'
|
||||
>>> if os.path.exists(filename):
|
||||
... os.remove(filename)
|
||||
|
||||
>>> with open_atomic(filename) as fh:
|
||||
... written = fh.write(b'test')
|
||||
>>> assert os.path.exists(filename)
|
||||
>>> os.remove(filename)
|
||||
|
||||
'''
|
||||
assert not os.path.exists(filename), '%r exists' % filename
|
||||
path, name = os.path.split(filename)
|
||||
|
||||
# Create the parent directory if it doesn't exist
|
||||
if path and not os.path.isdir(path): # pragma: no cover
|
||||
os.makedirs(path)
|
||||
|
||||
temp_fh = tempfile.NamedTemporaryFile(
|
||||
mode=binary and 'wb' or 'w',
|
||||
dir=path,
|
||||
delete=False,
|
||||
)
|
||||
yield temp_fh
|
||||
temp_fh.flush()
|
||||
os.fsync(temp_fh.fileno())
|
||||
temp_fh.close()
|
||||
try:
|
||||
os.rename(temp_fh.name, filename)
|
||||
finally:
|
||||
try:
|
||||
os.remove(temp_fh.name)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
class Lock(object):
|
||||
|
||||
def __init__(
|
||||
self, filename, mode='a', timeout=DEFAULT_TIMEOUT,
|
||||
check_interval=DEFAULT_CHECK_INTERVAL, fail_when_locked=False,
|
||||
flags=LOCK_METHOD, **file_open_kwargs):
|
||||
'''Lock manager with build-in timeout
|
||||
|
||||
filename -- filename
|
||||
mode -- the open mode, 'a' or 'ab' should be used for writing
|
||||
truncate -- use truncate to emulate 'w' mode, None is disabled, 0 is
|
||||
truncate to 0 bytes
|
||||
timeout -- timeout when trying to acquire a lock
|
||||
check_interval -- check interval while waiting
|
||||
fail_when_locked -- after the initial lock failed, return an error
|
||||
or lock the file
|
||||
**file_open_kwargs -- The kwargs for the `open(...)` call
|
||||
|
||||
fail_when_locked is useful when multiple threads/processes can race
|
||||
when creating a file. If set to true than the system will wait till
|
||||
the lock was acquired and then return an AlreadyLocked exception.
|
||||
|
||||
Note that the file is opened first and locked later. So using 'w' as
|
||||
mode will result in truncate _BEFORE_ the lock is checked.
|
||||
'''
|
||||
|
||||
if 'w' in mode:
|
||||
truncate = True
|
||||
mode = mode.replace('w', 'a')
|
||||
else:
|
||||
truncate = False
|
||||
|
||||
self.fh = None
|
||||
self.filename = filename
|
||||
self.mode = mode
|
||||
self.truncate = truncate
|
||||
self.timeout = timeout
|
||||
self.check_interval = check_interval
|
||||
self.fail_when_locked = fail_when_locked
|
||||
self.flags = flags
|
||||
self.file_open_kwargs = file_open_kwargs
|
||||
|
||||
def acquire(
|
||||
self, timeout=None, check_interval=None, fail_when_locked=None):
|
||||
'''Acquire the locked filehandle'''
|
||||
if timeout is None:
|
||||
timeout = self.timeout
|
||||
if timeout is None:
|
||||
timeout = 0
|
||||
|
||||
if check_interval is None:
|
||||
check_interval = self.check_interval
|
||||
|
||||
if fail_when_locked is None:
|
||||
fail_when_locked = self.fail_when_locked
|
||||
|
||||
# If we already have a filehandle, return it
|
||||
fh = self.fh
|
||||
if fh:
|
||||
return fh
|
||||
|
||||
# Get a new filehandler
|
||||
fh = self._get_fh()
|
||||
try:
|
||||
# Try to lock
|
||||
fh = self._get_lock(fh)
|
||||
except exceptions.LockException as exception:
|
||||
# Try till the timeout has passed
|
||||
timeoutend = current_time() + timeout
|
||||
while timeoutend > current_time():
|
||||
# Wait a bit
|
||||
time.sleep(check_interval)
|
||||
|
||||
# Try again
|
||||
try:
|
||||
|
||||
# We already tried to the get the lock
|
||||
# If fail_when_locked is true, then stop trying
|
||||
if fail_when_locked:
|
||||
raise exceptions.AlreadyLocked(exception)
|
||||
|
||||
else: # pragma: no cover
|
||||
# We've got the lock
|
||||
fh = self._get_lock(fh)
|
||||
break
|
||||
|
||||
except exceptions.LockException:
|
||||
pass
|
||||
|
||||
else:
|
||||
# We got a timeout... reraising
|
||||
raise exceptions.LockException(exception)
|
||||
|
||||
# Prepare the filehandle (truncate if needed)
|
||||
fh = self._prepare_fh(fh)
|
||||
|
||||
self.fh = fh
|
||||
return fh
|
||||
|
||||
def release(self):
|
||||
'''Releases the currently locked file handle'''
|
||||
if self.fh:
|
||||
try:
|
||||
portalocker.unlock(self.fh)
|
||||
except:
|
||||
pass
|
||||
try:
|
||||
self.fh.close()
|
||||
except:
|
||||
pass
|
||||
self.fh = None
|
||||
|
||||
def _get_fh(self):
|
||||
'''Get a new filehandle'''
|
||||
return open(self.filename, self.mode, **self.file_open_kwargs)
|
||||
|
||||
def _get_lock(self, fh):
|
||||
'''
|
||||
Try to lock the given filehandle
|
||||
|
||||
returns LockException if it fails'''
|
||||
portalocker.lock(fh, self.flags)
|
||||
return fh
|
||||
|
||||
def _prepare_fh(self, fh):
|
||||
'''
|
||||
Prepare the filehandle for usage
|
||||
|
||||
If truncate is a number, the file will be truncated to that amount of
|
||||
bytes
|
||||
'''
|
||||
if self.truncate:
|
||||
fh.seek(0)
|
||||
fh.truncate(0)
|
||||
|
||||
return fh
|
||||
|
||||
def __enter__(self):
|
||||
return self.acquire()
|
||||
|
||||
def __exit__(self, type_, value, tb):
|
||||
self.release()
|
||||
|
||||
def __delete__(self, instance): # pragma: no cover
|
||||
instance.release()
|
||||
|
||||
|
||||
class RLock(Lock):
|
||||
"""
|
||||
A reentrant lock, functions in a similar way to threading.RLock in that it
|
||||
can be acquired multiple times. When the corresponding number of release()
|
||||
calls are made the lock will finally release the underlying file lock.
|
||||
"""
|
||||
def __init__(
|
||||
self, filename, mode='a', timeout=DEFAULT_TIMEOUT,
|
||||
check_interval=DEFAULT_CHECK_INTERVAL, fail_when_locked=False,
|
||||
flags=LOCK_METHOD):
|
||||
super(RLock, self).__init__(filename, mode, timeout, check_interval,
|
||||
fail_when_locked, flags)
|
||||
self._acquire_count = 0
|
||||
self._lock = ProcessRLock()
|
||||
self._pid = os.getpid()
|
||||
|
||||
def acquire(self, timeout=None, check_interval=None, fail_when_locked=None):
|
||||
if self._lock:
|
||||
if not self._lock.acquire(timeout=timeout):
|
||||
# We got a timeout... reraising
|
||||
raise exceptions.LockException()
|
||||
|
||||
# check if we need to recreate the file lock on another subprocess
|
||||
if self._pid != os.getpid():
|
||||
self._pid = os.getpid()
|
||||
self._acquire_count = 0
|
||||
if self.fh:
|
||||
try:
|
||||
portalocker.unlock(self.fh)
|
||||
self.fh.close()
|
||||
except:
|
||||
pass
|
||||
self.fh = None
|
||||
|
||||
if self._acquire_count >= 1:
|
||||
fh = self.fh
|
||||
else:
|
||||
fh = super(RLock, self).acquire(timeout, check_interval,
|
||||
fail_when_locked)
|
||||
|
||||
self._acquire_count += 1
|
||||
return fh
|
||||
|
||||
def release(self):
|
||||
if self._acquire_count == 0:
|
||||
raise exceptions.LockException(
|
||||
"Cannot release more times than acquired")
|
||||
|
||||
if self._acquire_count == 1:
|
||||
super(RLock, self).release()
|
||||
|
||||
self._acquire_count -= 1
|
||||
if self._lock:
|
||||
self._lock.release()
|
||||
|
||||
def __del__(self):
|
||||
self._lock = None
|
||||
# try to remove the file when we are done
|
||||
if not os.path.isfile(self.filename):
|
||||
return
|
||||
try:
|
||||
self.acquire(timeout=0)
|
||||
try:
|
||||
os.unlink(self.filename)
|
||||
removed = True
|
||||
except Exception:
|
||||
removed = False
|
||||
self.release()
|
||||
if not removed:
|
||||
try:
|
||||
os.unlink(self.filename)
|
||||
except Exception:
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
class TemporaryFileLock(Lock):
|
||||
|
||||
def __init__(self, filename='.lock', timeout=DEFAULT_TIMEOUT,
|
||||
check_interval=DEFAULT_CHECK_INTERVAL, fail_when_locked=True,
|
||||
flags=LOCK_METHOD):
|
||||
|
||||
Lock.__init__(self, filename=filename, mode='w', timeout=timeout,
|
||||
check_interval=check_interval,
|
||||
fail_when_locked=fail_when_locked, flags=flags)
|
||||
atexit.register(self.release)
|
||||
|
||||
def release(self):
|
||||
Lock.release(self)
|
||||
if os.path.isfile(self.filename): # pragma: no branch
|
||||
os.unlink(self.filename)
|
Loading…
Reference in New Issue
Block a user