diff --git a/clearml/backend_api/session/session.py b/clearml/backend_api/session/session.py index 1d3376f9..91bfb671 100644 --- a/clearml/backend_api/session/session.py +++ b/clearml/backend_api/session/session.py @@ -4,6 +4,7 @@ import logging import os import sys import types +import weakref from socket import gethostname from time import sleep @@ -74,6 +75,8 @@ class Session(TokenManager): _ssl_error_count_verbosity = 2 _offline_mode = ENV_OFFLINE_MODE.get() _offline_default_version = '2.9' + # we want to keep track of sessions, but we also want to allow them to be collected by the GC if they are not used anymore + _sessions_weakrefs = [] _client = [(__package__.partition(".")[0], __version__)] @@ -130,26 +133,58 @@ class Session(TokenManager): http_retries_config=None, **kwargs ): + self.__class__._sessions_weakrefs.append(weakref.ref(self)) + self._verbose = verbose if verbose is not None else ENV_VERBOSE.get() + self._logger = logger + if self._verbose and not self._logger: + level = resolve_logging_level(ENV_VERBOSE.get(converter=str)) + self._logger = get_logger(level=level, stream=sys.stderr if level is logging.DEBUG else None) + self.__worker = worker or self.get_worker_host_name() + self.client = ", ".join("{}-{}".format(*x) for x in self._client) + + self.__init_api_key = api_key + self.__init_secret_key = secret_key + self.__init_host = host + self.__init_http_retries_config = http_retries_config + self.__token_manager_kwargs = kwargs + if config is not None: self.config = config else: from clearml.config import ConfigWrapper self.config = ConfigWrapper._init() + self._connect() + + def _connect(self): + if self._offline_mode: + return + + self._ssl_error_count_verbosity = self.config.get( + "api.ssl_error_count_verbosity", self._ssl_error_count_verbosity) + self.__host = self.__init_host or self.get_api_server_host(config=self.config) + + if not self.__host: + raise ValueError("ClearML host was not set, check your configuration file or environment variable") + + self.__host = self.__host.strip("/") + self.__http_retries_config = self.__init_http_retries_config or self.config.get( + "api.http.retries", ConfigTree()).as_plain_ordered_dict() + self.__http_retries_config["status_forcelist"] = self._get_retry_codes() + self.__http_retries_config["config"] = self.config + self.__http_session = get_http_session_with_retry(**self.__http_retries_config) + self.__http_session.write_timeout = self._write_session_timeout + self.__http_session.request_size_threshold = self._write_session_data_size + self.__max_req_size = self.config.get("api.http.max_req_size", None) + + if not self.__max_req_size: + raise ValueError("missing max request size") + token_expiration_threshold_sec = self.config.get( "auth.token_expiration_threshold_sec", 60 ) - req_token_expiration_sec = self.config.get("api.auth.req_token_expiration_sec", None) - - self._verbose = verbose if verbose is not None else ENV_VERBOSE.get() - self._logger = logger - if self._verbose and not self._logger: - level = resolve_logging_level(ENV_VERBOSE.get(converter=str)) - self._logger = get_logger(level=level, stream=sys.stderr if level is logging.DEBUG else None) - self.__auth_token = None - self._update_default_api_method() if ENV_AUTH_TOKEN.get(): @@ -158,52 +193,20 @@ class Session(TokenManager): # if we use a token we override make sure we are at least 3600 seconds (1 hour) # away from the token expiration date, ask for a new one. token_expiration_threshold_sec = max(token_expiration_threshold_sec, 3600) - elif not self._offline_mode: - self.__access_key = api_key or ENV_ACCESS_KEY.get( + else: + self.__access_key = self.__init_api_key or ENV_ACCESS_KEY.get( default=(self.config.get("api.credentials.access_key", None) or self.default_key) ) - self.__secret_key = secret_key or ENV_SECRET_KEY.get( + self.__secret_key = self.__init_secret_key or ENV_SECRET_KEY.get( default=(self.config.get("api.credentials.secret_key", None) or self.default_secret) ) - # init the token manager - super(Session, self).__init__( - token_expiration_threshold_sec=token_expiration_threshold_sec, - req_token_expiration_sec=req_token_expiration_sec, - **kwargs - ) - - host = host or self.get_api_server_host(config=self.config) - if not host: - raise ValueError("ClearML host was not set, check your configuration file or environment variable") - - if not self._offline_mode and (not self.secret_key and not self.access_key and not self.__auth_token): + if not self.secret_key and not self.access_key and not self.__auth_token: raise MissingConfigError() - - self._ssl_error_count_verbosity = self.config.get( - "api.ssl_error_count_verbosity", self._ssl_error_count_verbosity) - - self.__host = host.strip("/") - http_retries_config = http_retries_config or self.config.get( - "api.http.retries", ConfigTree()).as_plain_ordered_dict() - - http_retries_config["status_forcelist"] = self._get_retry_codes() - http_retries_config["config"] = self.config - self.__http_session = get_http_session_with_retry(**http_retries_config) - self.__http_session.write_timeout = self._write_session_timeout - self.__http_session.request_size_threshold = self._write_session_data_size - - self.__worker = worker or self.get_worker_host_name() - - self.__max_req_size = self.config.get("api.http.max_req_size", None) - if not self.__max_req_size: - raise ValueError("missing max request size") - - self.client = ", ".join("{}-{}".format(*x) for x in self._client) - - if self._offline_mode: - return - + super(Session, self).__init__( + **self.__token_manager_kwargs, + req_token_expiration_sec=req_token_expiration_sec, + ) self.refresh_token() local_logger = self._LocalLogger(self._logger) @@ -227,7 +230,7 @@ class Session(TokenManager): # now setup the session reporting, so one consecutive retries will show warning # we do that here, so if we have problems authenticating, we see them immediately # notice: this is across the board warning omission - urllib_log_warning_setup(total_retries=http_retries_config.get('total', 0), display_warning_after=3) + urllib_log_warning_setup(total_retries=self.__http_retries_config.get('total', 0), display_warning_after=3) if self.force_max_api_version and self.check_min_api_version(self.force_max_api_version): Session.max_api_version = Session.api_version = str(self.force_max_api_version) @@ -630,6 +633,24 @@ class Session(TokenManager): return call_result + def _make_all_sessions_go_online(cls): + for active_session in cls._get_all_active_sessions(): + # noinspection PyProtectedMember + active_session._connect() + + @classmethod + def _get_all_active_sessions(cls): + active_sessions = [] + new_sessions_weakrefs = [] + for session_weakref in cls._sessions_weakrefs: + session = session_weakref() + if session: + active_sessions.append(session) + new_sessions_weakrefs.append(session_weakref) + + cls._sessions_weakrefs = session_weakref + return active_sessions + @classmethod def get_api_server_host(cls, config=None): if not config: diff --git a/clearml/backend_interface/task/task.py b/clearml/backend_interface/task/task.py index ae23291b..47128aac 100644 --- a/clearml/backend_interface/task/task.py +++ b/clearml/backend_interface/task/task.py @@ -171,6 +171,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): self._initial_iteration_offset = 0 self._reload_skip_flag = False self._calling_filename = None + self._offline_dir = None if not task_id: # generate a new task @@ -2583,9 +2584,12 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): Return the folder where all the task outputs and logs are stored in the offline session. :return: Path object, local folder, later to be used with `report_offline_session()` """ + if self._offline_dir: + return self._offline_dir if not self._offline_mode: return None - return get_offline_dir(task_id=self.task_id) + self._offline_dir = get_offline_dir(task_id=self.task_id) + return self._offline_dir @classmethod def _clone_task( @@ -2887,30 +2891,6 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): PROC_MASTER_ID_ENV_VAR.get().split(':')[0] != str(os.getpid()) return is_subprocess - @classmethod - def set_offline(cls, offline_mode=False): - # type: (bool) -> None - """ - Set offline mode, where all data and logs are stored into local folder, for later transmission - - :param offline_mode: If True, offline-mode is turned on, and no communication to the backend is enabled. - :return: - """ - if not running_remotely(): - ENV_OFFLINE_MODE.set(offline_mode) - InterfaceBase._offline_mode = bool(offline_mode) - Session._offline_mode = bool(offline_mode) - - @classmethod - def is_offline(cls): - # type: () -> bool - """ - Return offline-mode state, If in offline-mode, no communication to the backend is enabled. - - :return: boolean offline-mode state - """ - return cls._offline_mode - @classmethod def _get_task_status(cls, task_id): # type: (str) -> (Optional[str], Optional[str]) diff --git a/clearml/task.py b/clearml/task.py index 47c90937..356af1a1 100644 --- a/clearml/task.py +++ b/clearml/task.py @@ -40,9 +40,10 @@ from .backend_config.defs import get_active_config_file, get_config_file from .backend_api.services import tasks, projects, events from .backend_api.session.session import ( Session, ENV_ACCESS_KEY, ENV_SECRET_KEY, ENV_HOST, ENV_WEB_HOST, ENV_FILES_HOST, ) -from .backend_api.session.defs import ENV_DEFERRED_TASK_INIT, ENV_IGNORE_MISSING_CONFIG, MissingConfigError +from .backend_api.session.defs import ENV_DEFERRED_TASK_INIT, ENV_IGNORE_MISSING_CONFIG, ENV_OFFLINE_MODE, MissingConfigError from .backend_interface.metrics import Metrics from .backend_interface.model import Model as BackendModel +from .backend_interface.base import InterfaceBase from .backend_interface.task import Task as _Task from .backend_interface.task.log import TaskHandler from .backend_interface.task.development.worker import DevWorker @@ -844,6 +845,8 @@ class Task(_Task): :return: The newly created Task (experiment) :rtype: Task """ + if cls.is_offline(): + raise UsageError("Creating task in offline mode. Use 'Task.init' instead.") if not project_name and not base_task_id: if not cls.__main_task: raise ValueError("Please provide project_name, no global task context found " @@ -2867,6 +2870,72 @@ class Task(_Task): return target_task + @classmethod + def set_offline(cls, offline_mode=False): + # type: (bool) -> None + """ + Set offline mode, where all data and logs are stored into local folder, for later transmission + + .. note:: + + `Task.set_offline` can't move the same task from offline to online, nor can it be applied before `Task.create`. + See below an example of **incorect** usage of `Task.set_offline`: + + .. code-block:: py + + from clearml import Task + Task.set_offline(True) + task = Task.create(project_name='DEBUG', task_name="offline") + # ^^^ an error or warning is emitted, telling us that `Task.set_offline(True)` + # is supported only for `Task.init` + Task.set_offline(False) + # ^^^ an error or warning is emitted, telling us that running `Task.set_offline(False)` + # while the current task is not closed is not something we support + data = task.export_task() + imported_task = Task.import_task(task_data=data) + + The correct way to use `Task.set_offline` can be seen in the following example: + + .. code-block:: py + + from clearml import Task + Task.set_offline(True) + task = Task.init(project_name='DEBUG', task_name="offline") + task.upload_artifact("large_artifact", "test_strign") + task.close() + Task.set_offline(False) + imported_task = Task.import_offline_session(task.get_offline_mode_folder()) + + :param offline_mode: If True, offline-mode is turned on, and no communication to the backend is enabled. + + :return: + """ + if running_remotely() or bool(offline_mode) == InterfaceBase._offline_mode: + return + if ( + cls.current_task() + and cls.current_task().status != cls.TaskStatusEnum.closed + and not offline_mode + ): + raise UsageError( + "Switching from offline mode to online mode, but the current task has not been closed. Use `Task.close` to close it." + ) + ENV_OFFLINE_MODE.set(offline_mode) + InterfaceBase._offline_mode = bool(offline_mode) + Session._offline_mode = bool(offline_mode) + if not offline_mode: + # noinspection PyProtectedMember + Session._make_all_sessions_go_online() + + @classmethod + def is_offline(cls): + # type: () -> bool + """ + Return offline-mode state, If in offline-mode, no communication to the backend is enabled. + :return: boolean offline-mode state + """ + return cls._offline_mode + @classmethod def import_offline_session(cls, session_folder_zip, previous_task_id=None, iteration_offset=0): # type: (str, Optional[str], Optional[int]) -> (Optional[str])