Improve venv cache handling:

- Add FileLock readonly mode, default is write mode (i.e. exclusive lock, preserving behavior)
- Add venv cache now uses readonly lock when copying folders from venv cache into target folder. This enables multiple read, single write operation
- Do not lock the cache folder if we do not need to delete old entries
This commit is contained in:
allegroai 2024-02-29 14:19:24 +02:00
parent 74edf6aa36
commit 01e8ffd854
6 changed files with 59 additions and 23 deletions

View File

@ -1680,7 +1680,8 @@ class Worker(ServiceCommandSection):
open_kwargs={
"buffering": self._session.config.get("agent.log_files_buffering", 1)
},
dir=(ENV_TEMP_STDOUT_FILE_DIR.get() or None)
dir=(ENV_TEMP_STDOUT_FILE_DIR.get() or None),
mode="a",
)
print(
"Running CLEARML-AGENT daemon in background mode, writing stdout/stderr to {}".format(
@ -1875,20 +1876,27 @@ class Worker(ServiceCommandSection):
):
# type: (...) -> Tuple[Optional[int], Optional[TaskStopReason]]
def _print_file(file_path, prev_pos=0):
with open(file_path, "ab+") as f:
f.seek(prev_pos)
binary_text = f.read()
if not self._truncate_task_output_files:
# non-buffered behavior
mode = "rb+" if self._truncate_task_output_files else "rb"
# noinspection PyBroadException
try:
with open(file_path, mode) as f:
f.seek(prev_pos)
binary_text = f.read()
pos = f.tell()
else:
# buffered - read everything and truncate
# noinspection PyBroadException
try:
f.truncate(0)
if self._truncate_task_output_files:
# buffered - read everything and truncate
# noinspection PyBroadException
try:
# we must seek to the beginning otherwise truncate will add \00
f.seek(0)
# os level truncate because f.truncate will push \00 at the end of the file
os.ftruncate(f.fileno(), 0)
os.fsync(f.fileno())
except Exception:
pass
pos = 0
except Exception:
pos = f.tell()
except Exception:
return [], prev_pos
# skip the previously printed lines,
blines = binary_text.split(b'\n') if binary_text else []
@ -1901,8 +1909,13 @@ class Worker(ServiceCommandSection):
pos
)
stdout = open(stdout_path, "wt")
stderr = open(stderr_path, "wt") if stderr_path else stdout
safe_remove_file(stdout_path)
stdout = open(stdout_path, "at")
if stderr_path:
safe_remove_file(stderr_path)
stderr = open(stderr_path, "at")
else:
stderr = stdout
stdout_line_count, stdout_pos_count, stdout_last_lines = 0, 0, []
stderr_line_count, stderr_pos_count, stderr_last_lines = 0, 0, []
lines_buffer = defaultdict(list)

View File

@ -420,6 +420,7 @@ def mkstemp(
open_kwargs=None, # type: Optional[Dict[Text, Any]]
text=True, # type: bool
name_only=False, # type: bool
mode=None, # type: str
*args,
**kwargs):
# type: (...) -> Union[(IO[AnyStr], Text), Text]
@ -429,12 +430,14 @@ def mkstemp(
:param open_kwargs: keyword arguments for ``io.open``
:param text: open in text mode
:param name_only: close the file and return its name
:param mode: open file mode
:param args: tempfile.mkstemp args
:param kwargs: tempfile.mkstemp kwargs
"""
fd, name = tempfile.mkstemp(text=text, *args, **kwargs)
mode = 'w+'
if not text:
if not mode:
mode = 'w+'
if not text and 'b' not in mode:
mode += 'b'
if name_only:
os.close(fd)

View File

@ -47,9 +47,11 @@ class FolderCache(object):
# lock so we make sure no one deletes it before we copy it
# noinspection PyBroadException
try:
self._lock.acquire(timeout=self._lock_timeout_seconds)
self._lock.acquire(timeout=self._lock_timeout_seconds, readonly=True)
except BaseException as ex:
warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex))
import traceback
warning('DEBUG: Exception {}: {}'.format(ex, traceback.format_exc()))
return None
src = None
@ -116,6 +118,8 @@ class FolderCache(object):
self._lock.acquire(timeout=self._lock_timeout_seconds)
except BaseException as ex:
warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex))
import traceback
warning('DEBUG: Exception {}: {}'.format(ex, traceback.format_exc()))
# failed locking do nothing
return True
keys = sorted(list(set(keys) | set(cached_keys)))
@ -195,16 +199,23 @@ class FolderCache(object):
if cache_folder.is_dir() and not cache_folder.name.startswith(self._temp_entry_prefix)]
folder_entries = sorted(folder_entries, key=lambda x: x[1], reverse=True)
number_of_entries_to_keep = self._max_cache_entries - 1 \
if max_cache_entries is None else max(0, int(max_cache_entries))
# if nothing to do, leave
if not folder_entries[number_of_entries_to_keep:]:
return
# lock so we make sure no one deletes it before we copy it
# noinspection PyBroadException
try:
self._lock.acquire(timeout=self._lock_timeout_seconds)
except BaseException as ex:
warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex))
import traceback
warning('DEBUG: Exception {}: {}'.format(ex, traceback.format_exc()))
return
number_of_entries_to_keep = self._max_cache_entries - 1 \
if max_cache_entries is None else max(0, int(max_cache_entries))
for folder, ts in folder_entries[number_of_entries_to_keep:]:
try:
shutil.rmtree(folder.as_posix(), ignore_errors=True)

View File

@ -20,6 +20,9 @@ class exceptions:
class FileToLarge(BaseLockException):
pass
class LockTimeout(BaseLockException):
pass
class constants:
# The actual tests will execute the code anyhow so the following code can
@ -185,6 +188,10 @@ elif os.name == 'posix': # pragma: no cover
# The exception code varies on different systems so we'll catch
# every IO error
raise exceptions.LockException(exc_value, fh=file_)
except BaseException as ex:
# DEBUG
print("Uncaught [{}] Exception [{}] in portalock: {}".format(locking_exceptions, type(ex), ex))
raise
def unlock(file_):
fcntl.flock(file_.fileno(), constants.LOCK_UN)

View File

@ -28,6 +28,7 @@ class PackageManager(object):
_config_cache_folder = 'agent.venvs_cache.path'
_config_cache_max_entries = 'agent.venvs_cache.max_entries'
_config_cache_free_space_threshold = 'agent.venvs_cache.free_space_threshold_gb'
_config_cache_lock_timeout = 'agent.venvs_cache.lock_timeout'
def __init__(self):
self._cache_manager = None
@ -302,7 +303,9 @@ class PackageManager(object):
max_entries = int(self.session.config.get(self._config_cache_max_entries, 10))
free_space_threshold = float(self.session.config.get(self._config_cache_free_space_threshold, 0))
self._cache_manager = FolderCache(
cache_folder, max_cache_entries=max_entries, min_free_space_gb=free_space_threshold)
cache_folder, max_cache_entries=max_entries,
min_free_space_gb=free_space_threshold,
lock_timeout_seconds=self.session.config.get(self._config_cache_lock_timeout, None))
except Exception as ex:
print("WARNING: Failed accessing venvs cache at {}: {}".format(cache_folder, ex))
print("WARNING: Skipping venv cache - folder not accessible!")

View File

@ -6,7 +6,6 @@ import stat
import subprocess
import sys
import tempfile
from distutils.spawn import find_executable
from hashlib import md5
from os import environ
from random import random
@ -30,7 +29,7 @@ from clearml_agent.helper.base import (
create_file_if_not_exists, safe_remove_file,
)
from clearml_agent.helper.os.locks import FileLock
from clearml_agent.helper.process import DEVNULL, Argv, PathLike, COMMAND_SUCCESS
from clearml_agent.helper.process import DEVNULL, Argv, PathLike, COMMAND_SUCCESS, find_executable
from clearml_agent.session import Session