Add Task.debug_simulate_remote_task() to simulate task execution by ClearML Agent

This commit is contained in:
allegroai 2021-01-10 12:50:43 +02:00
parent 2b9a28f908
commit 9da307730c
6 changed files with 54 additions and 11 deletions

View File

@ -4,7 +4,7 @@ import threading
from time import time
from ..binding.frameworks import _patched_call # noqa
from ..config import running_remotely, config
from ..config import running_remotely, config, DEBUG_SIMULATE_REMOTE_TASK
class StdStreamPatch(object):
@ -15,7 +15,8 @@ class StdStreamPatch(object):
@staticmethod
def patch_std_streams(a_logger, connect_stdout=True, connect_stderr=True):
if (connect_stdout or connect_stderr) and not PrintPatchLogger.patched and not running_remotely():
if (connect_stdout or connect_stderr) and not PrintPatchLogger.patched and \
(not running_remotely() or DEBUG_SIMULATE_REMOTE_TASK.get()):
StdStreamPatch._stdout_proxy = PrintPatchLogger(sys.stdout, a_logger, level=logging.INFO) \
if connect_stdout else None
StdStreamPatch._stderr_proxy = PrintPatchLogger(sys.stderr, a_logger, level=logging.ERROR) \

View File

@ -46,7 +46,7 @@ from ..util import (
exact_match_regex, mutually_exclusive, )
from ...config import (
get_config_for_bucket, get_remote_task_id, TASK_ID_ENV_VAR,
running_remotely, get_cache_dir, DOCKER_IMAGE_ENV_VAR, get_offline_dir)
running_remotely, get_cache_dir, DOCKER_IMAGE_ENV_VAR, get_offline_dir, get_log_to_backend, )
from ...debugging import get_logger
from ...storage.helper import StorageHelper, StorageError
from .access import AccessMixin
@ -174,7 +174,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
if running_remotely() or DevWorker.report_stdout:
log_to_backend = False
self._log_to_backend = log_to_backend
self._log_to_backend = get_log_to_backend(default=log_to_backend)
self._artifacts_manager = Artifacts(self)
self._hyper_params_manager = HyperParams(self)

View File

@ -17,6 +17,7 @@ DEV_WORKER_NAME = EnvEntry("CLEARML_WORKER_NAME", "TRAINS_WORKER_NAME")
DEV_TASK_NO_REUSE = EnvEntry("CLEARML_TASK_NO_REUSE", "TRAINS_TASK_NO_REUSE", type=bool)
TASK_LOG_ENVIRONMENT = EnvEntry("CLEARML_LOG_ENVIRONMENT", "TRAINS_LOG_ENVIRONMENT", type=str)
TRAINS_CACHE_DIR = EnvEntry("CLEARML_CACHE_DIR", "TRAINS_CACHE_DIR")
DEBUG_SIMULATE_REMOTE_TASK = EnvEntry("CLEARML_SIMULATE_REMOTE_TASK", type=bool)
LOG_LEVEL_ENV_VAR = EnvEntry("CLEARML_LOG_LEVEL", "TRAINS_LOG_LEVEL", converter=or_(int, str))

View File

@ -15,3 +15,6 @@ def override_current_task_id(task_id):
global running_remotely_task_id
running_remotely_task_id = task_id
# make sure we change the cached value as well.
import clearml
clearml.config._running_remotely_task_id = task_id

View File

@ -20,7 +20,7 @@ from .backend_interface.task import Task as _Task
from .backend_interface.task.development.worker import DevWorker
from .backend_interface.task.log import TaskHandler
from .backend_interface.util import mutually_exclusive
from .config import running_remotely, get_cache_dir, config
from .config import running_remotely, get_cache_dir, config, DEBUG_SIMULATE_REMOTE_TASK
from .errors import UsageError
from .storage.helper import StorageHelper
from .utilities.plotly_reporter import SeriesInfo
@ -1085,7 +1085,7 @@ class Logger(object):
specify ``None`` or ``0``.
"""
if self._task.is_main_task() and self._task_handler and DevWorker.report_period and \
not running_remotely() and period is not None:
not self._skip_console_log() and period is not None:
period = min(period or DevWorker.report_period, DevWorker.report_period)
if not period:
@ -1199,7 +1199,7 @@ class Logger(object):
level = logging.INFO
# noinspection PyProtectedMember
if not running_remotely() or not self._task._is_remote_main_task():
if not self._skip_console_log() or not self._task._is_remote_main_task():
if self._task_handler:
# noinspection PyBroadException
try:
@ -1223,7 +1223,7 @@ class Logger(object):
if not omit_console:
# if we are here and we grabbed the stdout, we need to print the real thing
if self._connect_std_streams and not running_remotely():
if self._connect_std_streams and not self._skip_console_log():
# noinspection PyBroadException
try:
# make sure we are writing to the original stdout
@ -1404,3 +1404,7 @@ class Logger(object):
default is False: Tensorboard scalar series will be grouped according to their title
"""
return cls._tensorboard_single_series_per_graph
@classmethod
def _skip_console_log(cls):
return bool(running_remotely() and not DEBUG_SIMULATE_REMOTE_TASK.get())

View File

@ -43,7 +43,7 @@ from .binding.frameworks.xgboost_bind import PatchXGBoostModelIO
from .binding.joblib_bind import PatchedJoblib
from .binding.matplotlib_bind import PatchedMatplotlib
from .binding.hydra_bind import PatchHydra
from .config import config, DEV_TASK_NO_REUSE, get_is_master_node
from .config import config, DEV_TASK_NO_REUSE, get_is_master_node, DEBUG_SIMULATE_REMOTE_TASK
from .config import running_remotely, get_remote_task_id
from .config.cache import SessionCache
from .debugging.log import LoggerRoot
@ -1219,7 +1219,7 @@ class Task(_Task):
:return: The Logger for the Task (experiment).
"""
return self._get_logger()
return self._get_logger(auto_connect_streams=self._log_to_backend)
def mark_started(self, force=False):
# type: (bool) -> ()
@ -2166,6 +2166,39 @@ class Task(_Task):
Session.default_web = web_host or ''
Session.default_files = files_host or ''
@classmethod
def debug_simulate_remote_task(cls, task_id, reset_task=False):
# type: (str, bool) -> ()
"""
Simulate remote execution of a specified Task.
This call will simulate the behaviour of your Task as if executed by the ClearML-Agent
This means configurations will be coming from the backend server into the code
(the opposite from manual execution, where the backend logs the code arguments)
Use with care.
:param task_id: Task ID to simulate, notice that all configuration will be taken from the specified
Task, regardless of the code initial values, just like it as if executed by ClearML agent
:param reset_task: If True target Task, is automatically cleared / reset.
"""
# if we are already running remotely, do nothing
if running_remotely():
return
# verify Task ID exists
task = Task.get_task(task_id=task_id)
if not task:
raise ValueError("Task ID '{}' could not be found".format(task_id))
if reset_task:
task.reset(set_started_on_success=False, force=True)
from .config.remote import override_current_task_id
from .config.defs import LOG_TO_BACKEND_ENV_VAR
override_current_task_id(task_id)
LOG_TO_BACKEND_ENV_VAR.set(True)
DEBUG_SIMULATE_REMOTE_TASK.set(True)
@classmethod
def _create(cls, project_name=None, task_name=None, task_type=TaskTypes.training):
# type: (Optional[str], Optional[str], Task.TaskTypes) -> Task
@ -2771,7 +2804,8 @@ class Task(_Task):
# if we are running remotely, the daemon will take care of it
task_status = None
wait_for_std_log = True
if not running_remotely() and self.is_main_task() and not is_sub_process:
if (not running_remotely() or DEBUG_SIMULATE_REMOTE_TASK.get()) \
and self.is_main_task() and not is_sub_process:
# check if we crashed, ot the signal is not interrupt (manual break)
task_status = ('stopped', )
if self.__exit_hook: