diff --git a/clearml_agent/commands/worker.py b/clearml_agent/commands/worker.py index af0dd8b..9b3589a 100644 --- a/clearml_agent/commands/worker.py +++ b/clearml_agent/commands/worker.py @@ -1680,7 +1680,8 @@ class Worker(ServiceCommandSection): open_kwargs={ "buffering": self._session.config.get("agent.log_files_buffering", 1) }, - dir=(ENV_TEMP_STDOUT_FILE_DIR.get() or None) + dir=(ENV_TEMP_STDOUT_FILE_DIR.get() or None), + mode="a", ) print( "Running CLEARML-AGENT daemon in background mode, writing stdout/stderr to {}".format( @@ -1875,20 +1876,27 @@ class Worker(ServiceCommandSection): ): # type: (...) -> Tuple[Optional[int], Optional[TaskStopReason]] def _print_file(file_path, prev_pos=0): - with open(file_path, "ab+") as f: - f.seek(prev_pos) - binary_text = f.read() - if not self._truncate_task_output_files: - # non-buffered behavior + mode = "rb+" if self._truncate_task_output_files else "rb" + # noinspection PyBroadException + try: + with open(file_path, mode) as f: + f.seek(prev_pos) + binary_text = f.read() pos = f.tell() - else: - # buffered - read everything and truncate - # noinspection PyBroadException - try: - f.truncate(0) + if self._truncate_task_output_files: + # buffered - read everything and truncate + # noinspection PyBroadException + try: + # we must seek to the beginning otherwise truncate will add \00 + f.seek(0) + # os level truncate because f.truncate will push \00 at the end of the file + os.ftruncate(f.fileno(), 0) + os.fsync(f.fileno()) + except Exception: + pass pos = 0 - except Exception: - pos = f.tell() + except Exception: + return [], prev_pos # skip the previously printed lines, blines = binary_text.split(b'\n') if binary_text else [] @@ -1901,8 +1909,13 @@ class Worker(ServiceCommandSection): pos ) - stdout = open(stdout_path, "wt") - stderr = open(stderr_path, "wt") if stderr_path else stdout + safe_remove_file(stdout_path) + stdout = open(stdout_path, "at") + if stderr_path: + safe_remove_file(stderr_path) + stderr = open(stderr_path, "at") + else: + stderr = stdout stdout_line_count, stdout_pos_count, stdout_last_lines = 0, 0, [] stderr_line_count, stderr_pos_count, stderr_last_lines = 0, 0, [] lines_buffer = defaultdict(list) diff --git a/clearml_agent/helper/base.py b/clearml_agent/helper/base.py index bee1cb3..ac9c671 100644 --- a/clearml_agent/helper/base.py +++ b/clearml_agent/helper/base.py @@ -420,6 +420,7 @@ def mkstemp( open_kwargs=None, # type: Optional[Dict[Text, Any]] text=True, # type: bool name_only=False, # type: bool + mode=None, # type: str *args, **kwargs): # type: (...) -> Union[(IO[AnyStr], Text), Text] @@ -429,12 +430,14 @@ def mkstemp( :param open_kwargs: keyword arguments for ``io.open`` :param text: open in text mode :param name_only: close the file and return its name + :param mode: open file mode :param args: tempfile.mkstemp args :param kwargs: tempfile.mkstemp kwargs """ fd, name = tempfile.mkstemp(text=text, *args, **kwargs) - mode = 'w+' - if not text: + if not mode: + mode = 'w+' + if not text and 'b' not in mode: mode += 'b' if name_only: os.close(fd) diff --git a/clearml_agent/helper/os/folder_cache.py b/clearml_agent/helper/os/folder_cache.py index 15355d3..e192615 100644 --- a/clearml_agent/helper/os/folder_cache.py +++ b/clearml_agent/helper/os/folder_cache.py @@ -47,9 +47,11 @@ class FolderCache(object): # lock so we make sure no one deletes it before we copy it # noinspection PyBroadException try: - self._lock.acquire(timeout=self._lock_timeout_seconds) + self._lock.acquire(timeout=self._lock_timeout_seconds, readonly=True) except BaseException as ex: warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex)) + import traceback + warning('DEBUG: Exception {}: {}'.format(ex, traceback.format_exc())) return None src = None @@ -116,6 +118,8 @@ class FolderCache(object): self._lock.acquire(timeout=self._lock_timeout_seconds) except BaseException as ex: warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex)) + import traceback + warning('DEBUG: Exception {}: {}'.format(ex, traceback.format_exc())) # failed locking do nothing return True keys = sorted(list(set(keys) | set(cached_keys))) @@ -195,16 +199,23 @@ class FolderCache(object): if cache_folder.is_dir() and not cache_folder.name.startswith(self._temp_entry_prefix)] folder_entries = sorted(folder_entries, key=lambda x: x[1], reverse=True) + number_of_entries_to_keep = self._max_cache_entries - 1 \ + if max_cache_entries is None else max(0, int(max_cache_entries)) + + # if nothing to do, leave + if not folder_entries[number_of_entries_to_keep:]: + return + # lock so we make sure no one deletes it before we copy it # noinspection PyBroadException try: self._lock.acquire(timeout=self._lock_timeout_seconds) except BaseException as ex: warning('Could not lock cache folder {}: {}'.format(self._cache_folder, ex)) + import traceback + warning('DEBUG: Exception {}: {}'.format(ex, traceback.format_exc())) return - number_of_entries_to_keep = self._max_cache_entries - 1 \ - if max_cache_entries is None else max(0, int(max_cache_entries)) for folder, ts in folder_entries[number_of_entries_to_keep:]: try: shutil.rmtree(folder.as_posix(), ignore_errors=True) diff --git a/clearml_agent/helper/os/portalocker.py b/clearml_agent/helper/os/portalocker.py index 634bf58..81a1544 100644 --- a/clearml_agent/helper/os/portalocker.py +++ b/clearml_agent/helper/os/portalocker.py @@ -20,6 +20,9 @@ class exceptions: class FileToLarge(BaseLockException): pass + class LockTimeout(BaseLockException): + pass + class constants: # The actual tests will execute the code anyhow so the following code can @@ -185,6 +188,10 @@ elif os.name == 'posix': # pragma: no cover # The exception code varies on different systems so we'll catch # every IO error raise exceptions.LockException(exc_value, fh=file_) + except BaseException as ex: + # DEBUG + print("Uncaught [{}] Exception [{}] in portalock: {}".format(locking_exceptions, type(ex), ex)) + raise def unlock(file_): fcntl.flock(file_.fileno(), constants.LOCK_UN) diff --git a/clearml_agent/helper/package/base.py b/clearml_agent/helper/package/base.py index 8fea888..09d8147 100644 --- a/clearml_agent/helper/package/base.py +++ b/clearml_agent/helper/package/base.py @@ -28,6 +28,7 @@ class PackageManager(object): _config_cache_folder = 'agent.venvs_cache.path' _config_cache_max_entries = 'agent.venvs_cache.max_entries' _config_cache_free_space_threshold = 'agent.venvs_cache.free_space_threshold_gb' + _config_cache_lock_timeout = 'agent.venvs_cache.lock_timeout' def __init__(self): self._cache_manager = None @@ -302,7 +303,9 @@ class PackageManager(object): max_entries = int(self.session.config.get(self._config_cache_max_entries, 10)) free_space_threshold = float(self.session.config.get(self._config_cache_free_space_threshold, 0)) self._cache_manager = FolderCache( - cache_folder, max_cache_entries=max_entries, min_free_space_gb=free_space_threshold) + cache_folder, max_cache_entries=max_entries, + min_free_space_gb=free_space_threshold, + lock_timeout_seconds=self.session.config.get(self._config_cache_lock_timeout, None)) except Exception as ex: print("WARNING: Failed accessing venvs cache at {}: {}".format(cache_folder, ex)) print("WARNING: Skipping venv cache - folder not accessible!") diff --git a/clearml_agent/helper/repo.py b/clearml_agent/helper/repo.py index 6f08e0e..c191093 100644 --- a/clearml_agent/helper/repo.py +++ b/clearml_agent/helper/repo.py @@ -6,7 +6,6 @@ import stat import subprocess import sys import tempfile -from distutils.spawn import find_executable from hashlib import md5 from os import environ from random import random @@ -30,7 +29,7 @@ from clearml_agent.helper.base import ( create_file_if_not_exists, safe_remove_file, ) from clearml_agent.helper.os.locks import FileLock -from clearml_agent.helper.process import DEVNULL, Argv, PathLike, COMMAND_SUCCESS +from clearml_agent.helper.process import DEVNULL, Argv, PathLike, COMMAND_SUCCESS, find_executable from clearml_agent.session import Session