From b7358d7fef39fcd53619ad73fa386858db3a7526 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Thu, 5 Mar 2020 12:31:22 +0200 Subject: [PATCH] Add portalocker for inter-process lock --- trains/utilities/locks/__init__.py | 1 + trains/utilities/locks/constants.py | 41 ++++ trains/utilities/locks/exceptions.py | 19 ++ trains/utilities/locks/portalocker.py | 151 +++++++++++++ trains/utilities/locks/utils.py | 306 ++++++++++++++++++++++++++ 5 files changed, 518 insertions(+) create mode 100644 trains/utilities/locks/__init__.py create mode 100644 trains/utilities/locks/constants.py create mode 100644 trains/utilities/locks/exceptions.py create mode 100644 trains/utilities/locks/portalocker.py create mode 100644 trains/utilities/locks/utils.py diff --git a/trains/utilities/locks/__init__.py b/trains/utilities/locks/__init__.py new file mode 100644 index 00000000..15fc11e1 --- /dev/null +++ b/trains/utilities/locks/__init__.py @@ -0,0 +1 @@ +from .utils import RLock, Lock diff --git a/trains/utilities/locks/constants.py b/trains/utilities/locks/constants.py new file mode 100644 index 00000000..1039ebc5 --- /dev/null +++ b/trains/utilities/locks/constants.py @@ -0,0 +1,41 @@ +""" +Locking constants + +Lock types: + +- `LOCK_EX` exclusive lock +- `LOCK_SH` shared lock + +Lock flags: + +- `LOCK_NB` non-blocking + +Manually unlock, only needed internally + +- `LOCK_UN` unlock +""" +import os + +# The actual tests will execute the code anyhow so the following code can +# safely be ignored from the coverage tests +if os.name == 'nt': # pragma: no cover + import msvcrt + + LOCK_EX = 0x1 #: exclusive lock + LOCK_SH = 0x2 #: shared lock + LOCK_NB = 0x4 #: non-blocking + LOCK_UN = msvcrt.LK_UNLCK #: unlock + + LOCKFILE_FAIL_IMMEDIATELY = 1 + LOCKFILE_EXCLUSIVE_LOCK = 2 + +elif os.name == 'posix': # pragma: no cover + import fcntl + + LOCK_EX = fcntl.LOCK_EX #: exclusive lock + LOCK_SH = fcntl.LOCK_SH #: shared lock + LOCK_NB = fcntl.LOCK_NB #: non-blocking + LOCK_UN = fcntl.LOCK_UN #: unlock + +else: # pragma: no cover + raise RuntimeError('PortaLocker only defined for nt and posix platforms') diff --git a/trains/utilities/locks/exceptions.py b/trains/utilities/locks/exceptions.py new file mode 100644 index 00000000..bb2b35eb --- /dev/null +++ b/trains/utilities/locks/exceptions.py @@ -0,0 +1,19 @@ +class BaseLockException(Exception): + # Error codes: + LOCK_FAILED = 1 + + def __init__(self, *args, **kwargs): + self.fh = kwargs.pop('fh', None) + Exception.__init__(self, *args, **kwargs) + + +class LockException(BaseLockException): + pass + + +class AlreadyLocked(BaseLockException): + pass + + +class FileToLarge(BaseLockException): + pass diff --git a/trains/utilities/locks/portalocker.py b/trains/utilities/locks/portalocker.py new file mode 100644 index 00000000..1d095371 --- /dev/null +++ b/trains/utilities/locks/portalocker.py @@ -0,0 +1,151 @@ +import os +import sys +from . import exceptions +from . import constants + + +if os.name == 'nt': # pragma: no cover + import msvcrt + + if sys.version_info.major == 2: + lock_length = -1 + else: + lock_length = int(2**31 - 1) + + def lock(file_, flags): + if flags & constants.LOCK_SH: + import win32file + import pywintypes + import winerror + __overlapped = pywintypes.OVERLAPPED() + if sys.version_info.major == 2: + if flags & constants.LOCK_NB: + mode = constants.LOCKFILE_FAIL_IMMEDIATELY + else: + mode = 0 + + else: + if flags & constants.LOCK_NB: + mode = msvcrt.LK_NBRLCK + else: + mode = msvcrt.LK_RLCK + + # is there any reason not to reuse the following structure? + hfile = win32file._get_osfhandle(file_.fileno()) + try: + win32file.LockFileEx(hfile, mode, 0, -0x10000, __overlapped) + except pywintypes.error as exc_value: + # error: (33, 'LockFileEx', 'The process cannot access the file + # because another process has locked a portion of the file.') + if exc_value.winerror == winerror.ERROR_LOCK_VIOLATION: + raise exceptions.LockException( + exceptions.LockException.LOCK_FAILED, + exc_value.strerror, + fh=file_) + else: + # Q: Are there exceptions/codes we should be dealing with + # here? + raise + else: + mode = constants.LOCKFILE_EXCLUSIVE_LOCK + if flags & constants.LOCK_NB: + mode |= constants.LOCKFILE_FAIL_IMMEDIATELY + + if flags & constants.LOCK_NB: + mode = msvcrt.LK_NBLCK + else: + mode = msvcrt.LK_LOCK + + # windows locks byte ranges, so make sure to lock from file start + try: + savepos = file_.tell() + if savepos: + # [ ] test exclusive lock fails on seek here + # [ ] test if shared lock passes this point + file_.seek(0) + # [x] check if 0 param locks entire file (not documented in + # Python) + # [x] fails with "IOError: [Errno 13] Permission denied", + # but -1 seems to do the trick + + try: + msvcrt.locking(file_.fileno(), mode, lock_length) + except IOError as exc_value: + # [ ] be more specific here + raise exceptions.LockException( + exceptions.LockException.LOCK_FAILED, + exc_value.strerror, + fh=file_) + finally: + if savepos: + file_.seek(savepos) + except IOError as exc_value: + raise exceptions.LockException( + exceptions.LockException.LOCK_FAILED, exc_value.strerror, + fh=file_) + + def unlock(file_): + try: + savepos = file_.tell() + if savepos: + file_.seek(0) + + try: + msvcrt.locking(file_.fileno(), constants.LOCK_UN, lock_length) + except IOError as exc_value: + if exc_value.strerror == 'Permission denied': + import pywintypes + import win32file + import winerror + __overlapped = pywintypes.OVERLAPPED() + hfile = win32file._get_osfhandle(file_.fileno()) + try: + win32file.UnlockFileEx( + hfile, 0, -0x10000, __overlapped) + except pywintypes.error as exc_value: + if exc_value.winerror == winerror.ERROR_NOT_LOCKED: + # error: (158, 'UnlockFileEx', + # 'The segment is already unlocked.') + # To match the 'posix' implementation, silently + # ignore this error + pass + else: + # Q: Are there exceptions/codes we should be + # dealing with here? + raise + else: + raise exceptions.LockException( + exceptions.LockException.LOCK_FAILED, + exc_value.strerror, + fh=file_) + finally: + if savepos: + file_.seek(savepos) + except IOError as exc_value: + raise exceptions.LockException( + exceptions.LockException.LOCK_FAILED, exc_value.strerror, + fh=file_) + +elif os.name == 'posix': # pragma: no cover + import fcntl + + def lock(file_, flags): + locking_exceptions = IOError, + try: # pragma: no cover + locking_exceptions += BlockingIOError, + except NameError: # pragma: no cover + pass + + try: + fcntl.flock(file_.fileno(), flags) + except locking_exceptions as exc_value: + # The exception code varies on different systems so we'll catch + # every IO error + raise exceptions.LockException(exc_value, fh=file_) + + def unlock(file_): + fcntl.flock(file_.fileno(), constants.LOCK_UN) + +else: # pragma: no cover + raise RuntimeError('PortaLocker only defined for nt and posix platforms') + diff --git a/trains/utilities/locks/utils.py b/trains/utilities/locks/utils.py new file mode 100644 index 00000000..8c869d38 --- /dev/null +++ b/trains/utilities/locks/utils.py @@ -0,0 +1,306 @@ +import os +import time +import atexit +import tempfile +import contextlib +from multiprocessing import RLock as ProcessRLock +from . import exceptions +from . import constants +from . import portalocker + +current_time = getattr(time, "monotonic", time.time) + +DEFAULT_TIMEOUT = 10 ** 8 +DEFAULT_CHECK_INTERVAL = 0.25 +LOCK_METHOD = constants.LOCK_EX | constants.LOCK_NB + +__all__ = [ + 'Lock', + 'RLock', + 'open_atomic', +] + + +@contextlib.contextmanager +def open_atomic(filename, binary=True): + '''Open a file for atomic writing. Instead of locking this method allows + you to write the entire file and move it to the actual location. Note that + this makes the assumption that a rename is atomic on your platform which + is generally the case but not a guarantee. + + http://docs.python.org/library/os.html#os.rename + + >>> filename = 'test_file.txt' + >>> if os.path.exists(filename): + ... os.remove(filename) + + >>> with open_atomic(filename) as fh: + ... written = fh.write(b'test') + >>> assert os.path.exists(filename) + >>> os.remove(filename) + + ''' + assert not os.path.exists(filename), '%r exists' % filename + path, name = os.path.split(filename) + + # Create the parent directory if it doesn't exist + if path and not os.path.isdir(path): # pragma: no cover + os.makedirs(path) + + temp_fh = tempfile.NamedTemporaryFile( + mode=binary and 'wb' or 'w', + dir=path, + delete=False, + ) + yield temp_fh + temp_fh.flush() + os.fsync(temp_fh.fileno()) + temp_fh.close() + try: + os.rename(temp_fh.name, filename) + finally: + try: + os.remove(temp_fh.name) + except Exception: + pass + + +class Lock(object): + + def __init__( + self, filename, mode='a', timeout=DEFAULT_TIMEOUT, + check_interval=DEFAULT_CHECK_INTERVAL, fail_when_locked=False, + flags=LOCK_METHOD, **file_open_kwargs): + '''Lock manager with build-in timeout + + filename -- filename + mode -- the open mode, 'a' or 'ab' should be used for writing + truncate -- use truncate to emulate 'w' mode, None is disabled, 0 is + truncate to 0 bytes + timeout -- timeout when trying to acquire a lock + check_interval -- check interval while waiting + fail_when_locked -- after the initial lock failed, return an error + or lock the file + **file_open_kwargs -- The kwargs for the `open(...)` call + + fail_when_locked is useful when multiple threads/processes can race + when creating a file. If set to true than the system will wait till + the lock was acquired and then return an AlreadyLocked exception. + + Note that the file is opened first and locked later. So using 'w' as + mode will result in truncate _BEFORE_ the lock is checked. + ''' + + if 'w' in mode: + truncate = True + mode = mode.replace('w', 'a') + else: + truncate = False + + self.fh = None + self.filename = filename + self.mode = mode + self.truncate = truncate + self.timeout = timeout + self.check_interval = check_interval + self.fail_when_locked = fail_when_locked + self.flags = flags + self.file_open_kwargs = file_open_kwargs + + def acquire( + self, timeout=None, check_interval=None, fail_when_locked=None): + '''Acquire the locked filehandle''' + if timeout is None: + timeout = self.timeout + if timeout is None: + timeout = 0 + + if check_interval is None: + check_interval = self.check_interval + + if fail_when_locked is None: + fail_when_locked = self.fail_when_locked + + # If we already have a filehandle, return it + fh = self.fh + if fh: + return fh + + # Get a new filehandler + fh = self._get_fh() + try: + # Try to lock + fh = self._get_lock(fh) + except exceptions.LockException as exception: + # Try till the timeout has passed + timeoutend = current_time() + timeout + while timeoutend > current_time(): + # Wait a bit + time.sleep(check_interval) + + # Try again + try: + + # We already tried to the get the lock + # If fail_when_locked is true, then stop trying + if fail_when_locked: + raise exceptions.AlreadyLocked(exception) + + else: # pragma: no cover + # We've got the lock + fh = self._get_lock(fh) + break + + except exceptions.LockException: + pass + + else: + # We got a timeout... reraising + raise exceptions.LockException(exception) + + # Prepare the filehandle (truncate if needed) + fh = self._prepare_fh(fh) + + self.fh = fh + return fh + + def release(self): + '''Releases the currently locked file handle''' + if self.fh: + try: + portalocker.unlock(self.fh) + except: + pass + try: + self.fh.close() + except: + pass + self.fh = None + + def _get_fh(self): + '''Get a new filehandle''' + return open(self.filename, self.mode, **self.file_open_kwargs) + + def _get_lock(self, fh): + ''' + Try to lock the given filehandle + + returns LockException if it fails''' + portalocker.lock(fh, self.flags) + return fh + + def _prepare_fh(self, fh): + ''' + Prepare the filehandle for usage + + If truncate is a number, the file will be truncated to that amount of + bytes + ''' + if self.truncate: + fh.seek(0) + fh.truncate(0) + + return fh + + def __enter__(self): + return self.acquire() + + def __exit__(self, type_, value, tb): + self.release() + + def __delete__(self, instance): # pragma: no cover + instance.release() + + +class RLock(Lock): + """ + A reentrant lock, functions in a similar way to threading.RLock in that it + can be acquired multiple times. When the corresponding number of release() + calls are made the lock will finally release the underlying file lock. + """ + def __init__( + self, filename, mode='a', timeout=DEFAULT_TIMEOUT, + check_interval=DEFAULT_CHECK_INTERVAL, fail_when_locked=False, + flags=LOCK_METHOD): + super(RLock, self).__init__(filename, mode, timeout, check_interval, + fail_when_locked, flags) + self._acquire_count = 0 + self._lock = ProcessRLock() + self._pid = os.getpid() + + def acquire(self, timeout=None, check_interval=None, fail_when_locked=None): + if self._lock: + if not self._lock.acquire(timeout=timeout): + # We got a timeout... reraising + raise exceptions.LockException() + + # check if we need to recreate the file lock on another subprocess + if self._pid != os.getpid(): + self._pid = os.getpid() + self._acquire_count = 0 + if self.fh: + try: + portalocker.unlock(self.fh) + self.fh.close() + except: + pass + self.fh = None + + if self._acquire_count >= 1: + fh = self.fh + else: + fh = super(RLock, self).acquire(timeout, check_interval, + fail_when_locked) + + self._acquire_count += 1 + return fh + + def release(self): + if self._acquire_count == 0: + raise exceptions.LockException( + "Cannot release more times than acquired") + + if self._acquire_count == 1: + super(RLock, self).release() + + self._acquire_count -= 1 + if self._lock: + self._lock.release() + + def __del__(self): + self._lock = None + # try to remove the file when we are done + if not os.path.isfile(self.filename): + return + try: + self.acquire(timeout=0) + try: + os.unlink(self.filename) + removed = True + except Exception: + removed = False + self.release() + if not removed: + try: + os.unlink(self.filename) + except Exception: + pass + except Exception: + pass + + +class TemporaryFileLock(Lock): + + def __init__(self, filename='.lock', timeout=DEFAULT_TIMEOUT, + check_interval=DEFAULT_CHECK_INTERVAL, fail_when_locked=True, + flags=LOCK_METHOD): + + Lock.__init__(self, filename=filename, mode='w', timeout=timeout, + check_interval=check_interval, + fail_when_locked=fail_when_locked, flags=flags) + atexit.register(self.release) + + def release(self): + Lock.release(self) + if os.path.isfile(self.filename): # pragma: no branch + os.unlink(self.filename)