Allow moving a task from offline to online mode in a specific manner

This commit is contained in:
Alex Burlacu 2023-03-23 18:58:15 +02:00
parent dc5be02328
commit 43e73c5f0a
3 changed files with 146 additions and 76 deletions

View File

@ -4,6 +4,7 @@ import logging
import os import os
import sys import sys
import types import types
import weakref
from socket import gethostname from socket import gethostname
from time import sleep from time import sleep
@ -74,6 +75,8 @@ class Session(TokenManager):
_ssl_error_count_verbosity = 2 _ssl_error_count_verbosity = 2
_offline_mode = ENV_OFFLINE_MODE.get() _offline_mode = ENV_OFFLINE_MODE.get()
_offline_default_version = '2.9' _offline_default_version = '2.9'
# we want to keep track of sessions, but we also want to allow them to be collected by the GC if they are not used anymore
_sessions_weakrefs = []
_client = [(__package__.partition(".")[0], __version__)] _client = [(__package__.partition(".")[0], __version__)]
@ -130,26 +133,58 @@ class Session(TokenManager):
http_retries_config=None, http_retries_config=None,
**kwargs **kwargs
): ):
self.__class__._sessions_weakrefs.append(weakref.ref(self))
self._verbose = verbose if verbose is not None else ENV_VERBOSE.get()
self._logger = logger
if self._verbose and not self._logger:
level = resolve_logging_level(ENV_VERBOSE.get(converter=str))
self._logger = get_logger(level=level, stream=sys.stderr if level is logging.DEBUG else None)
self.__worker = worker or self.get_worker_host_name()
self.client = ", ".join("{}-{}".format(*x) for x in self._client)
self.__init_api_key = api_key
self.__init_secret_key = secret_key
self.__init_host = host
self.__init_http_retries_config = http_retries_config
self.__token_manager_kwargs = kwargs
if config is not None: if config is not None:
self.config = config self.config = config
else: else:
from clearml.config import ConfigWrapper from clearml.config import ConfigWrapper
self.config = ConfigWrapper._init() self.config = ConfigWrapper._init()
self._connect()
def _connect(self):
if self._offline_mode:
return
self._ssl_error_count_verbosity = self.config.get(
"api.ssl_error_count_verbosity", self._ssl_error_count_verbosity)
self.__host = self.__init_host or self.get_api_server_host(config=self.config)
if not self.__host:
raise ValueError("ClearML host was not set, check your configuration file or environment variable")
self.__host = self.__host.strip("/")
self.__http_retries_config = self.__init_http_retries_config or self.config.get(
"api.http.retries", ConfigTree()).as_plain_ordered_dict()
self.__http_retries_config["status_forcelist"] = self._get_retry_codes()
self.__http_retries_config["config"] = self.config
self.__http_session = get_http_session_with_retry(**self.__http_retries_config)
self.__http_session.write_timeout = self._write_session_timeout
self.__http_session.request_size_threshold = self._write_session_data_size
self.__max_req_size = self.config.get("api.http.max_req_size", None)
if not self.__max_req_size:
raise ValueError("missing max request size")
token_expiration_threshold_sec = self.config.get( token_expiration_threshold_sec = self.config.get(
"auth.token_expiration_threshold_sec", 60 "auth.token_expiration_threshold_sec", 60
) )
req_token_expiration_sec = self.config.get("api.auth.req_token_expiration_sec", None) req_token_expiration_sec = self.config.get("api.auth.req_token_expiration_sec", None)
self._verbose = verbose if verbose is not None else ENV_VERBOSE.get()
self._logger = logger
if self._verbose and not self._logger:
level = resolve_logging_level(ENV_VERBOSE.get(converter=str))
self._logger = get_logger(level=level, stream=sys.stderr if level is logging.DEBUG else None)
self.__auth_token = None self.__auth_token = None
self._update_default_api_method() self._update_default_api_method()
if ENV_AUTH_TOKEN.get(): if ENV_AUTH_TOKEN.get():
@ -158,52 +193,20 @@ class Session(TokenManager):
# if we use a token we override make sure we are at least 3600 seconds (1 hour) # if we use a token we override make sure we are at least 3600 seconds (1 hour)
# away from the token expiration date, ask for a new one. # away from the token expiration date, ask for a new one.
token_expiration_threshold_sec = max(token_expiration_threshold_sec, 3600) token_expiration_threshold_sec = max(token_expiration_threshold_sec, 3600)
elif not self._offline_mode: else:
self.__access_key = api_key or ENV_ACCESS_KEY.get( self.__access_key = self.__init_api_key or ENV_ACCESS_KEY.get(
default=(self.config.get("api.credentials.access_key", None) or self.default_key) default=(self.config.get("api.credentials.access_key", None) or self.default_key)
) )
self.__secret_key = secret_key or ENV_SECRET_KEY.get( self.__secret_key = self.__init_secret_key or ENV_SECRET_KEY.get(
default=(self.config.get("api.credentials.secret_key", None) or self.default_secret) default=(self.config.get("api.credentials.secret_key", None) or self.default_secret)
) )
# init the token manager if not self.secret_key and not self.access_key and not self.__auth_token:
super(Session, self).__init__(
token_expiration_threshold_sec=token_expiration_threshold_sec,
req_token_expiration_sec=req_token_expiration_sec,
**kwargs
)
host = host or self.get_api_server_host(config=self.config)
if not host:
raise ValueError("ClearML host was not set, check your configuration file or environment variable")
if not self._offline_mode and (not self.secret_key and not self.access_key and not self.__auth_token):
raise MissingConfigError() raise MissingConfigError()
super(Session, self).__init__(
self._ssl_error_count_verbosity = self.config.get( **self.__token_manager_kwargs,
"api.ssl_error_count_verbosity", self._ssl_error_count_verbosity) req_token_expiration_sec=req_token_expiration_sec,
)
self.__host = host.strip("/")
http_retries_config = http_retries_config or self.config.get(
"api.http.retries", ConfigTree()).as_plain_ordered_dict()
http_retries_config["status_forcelist"] = self._get_retry_codes()
http_retries_config["config"] = self.config
self.__http_session = get_http_session_with_retry(**http_retries_config)
self.__http_session.write_timeout = self._write_session_timeout
self.__http_session.request_size_threshold = self._write_session_data_size
self.__worker = worker or self.get_worker_host_name()
self.__max_req_size = self.config.get("api.http.max_req_size", None)
if not self.__max_req_size:
raise ValueError("missing max request size")
self.client = ", ".join("{}-{}".format(*x) for x in self._client)
if self._offline_mode:
return
self.refresh_token() self.refresh_token()
local_logger = self._LocalLogger(self._logger) local_logger = self._LocalLogger(self._logger)
@ -227,7 +230,7 @@ class Session(TokenManager):
# now setup the session reporting, so one consecutive retries will show warning # now setup the session reporting, so one consecutive retries will show warning
# we do that here, so if we have problems authenticating, we see them immediately # we do that here, so if we have problems authenticating, we see them immediately
# notice: this is across the board warning omission # notice: this is across the board warning omission
urllib_log_warning_setup(total_retries=http_retries_config.get('total', 0), display_warning_after=3) urllib_log_warning_setup(total_retries=self.__http_retries_config.get('total', 0), display_warning_after=3)
if self.force_max_api_version and self.check_min_api_version(self.force_max_api_version): if self.force_max_api_version and self.check_min_api_version(self.force_max_api_version):
Session.max_api_version = Session.api_version = str(self.force_max_api_version) Session.max_api_version = Session.api_version = str(self.force_max_api_version)
@ -630,6 +633,24 @@ class Session(TokenManager):
return call_result return call_result
def _make_all_sessions_go_online(cls):
for active_session in cls._get_all_active_sessions():
# noinspection PyProtectedMember
active_session._connect()
@classmethod
def _get_all_active_sessions(cls):
active_sessions = []
new_sessions_weakrefs = []
for session_weakref in cls._sessions_weakrefs:
session = session_weakref()
if session:
active_sessions.append(session)
new_sessions_weakrefs.append(session_weakref)
cls._sessions_weakrefs = session_weakref
return active_sessions
@classmethod @classmethod
def get_api_server_host(cls, config=None): def get_api_server_host(cls, config=None):
if not config: if not config:

View File

@ -171,6 +171,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
self._initial_iteration_offset = 0 self._initial_iteration_offset = 0
self._reload_skip_flag = False self._reload_skip_flag = False
self._calling_filename = None self._calling_filename = None
self._offline_dir = None
if not task_id: if not task_id:
# generate a new task # generate a new task
@ -2583,9 +2584,12 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
Return the folder where all the task outputs and logs are stored in the offline session. Return the folder where all the task outputs and logs are stored in the offline session.
:return: Path object, local folder, later to be used with `report_offline_session()` :return: Path object, local folder, later to be used with `report_offline_session()`
""" """
if self._offline_dir:
return self._offline_dir
if not self._offline_mode: if not self._offline_mode:
return None return None
return get_offline_dir(task_id=self.task_id) self._offline_dir = get_offline_dir(task_id=self.task_id)
return self._offline_dir
@classmethod @classmethod
def _clone_task( def _clone_task(
@ -2887,30 +2891,6 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
PROC_MASTER_ID_ENV_VAR.get().split(':')[0] != str(os.getpid()) PROC_MASTER_ID_ENV_VAR.get().split(':')[0] != str(os.getpid())
return is_subprocess return is_subprocess
@classmethod
def set_offline(cls, offline_mode=False):
# type: (bool) -> None
"""
Set offline mode, where all data and logs are stored into local folder, for later transmission
:param offline_mode: If True, offline-mode is turned on, and no communication to the backend is enabled.
:return:
"""
if not running_remotely():
ENV_OFFLINE_MODE.set(offline_mode)
InterfaceBase._offline_mode = bool(offline_mode)
Session._offline_mode = bool(offline_mode)
@classmethod
def is_offline(cls):
# type: () -> bool
"""
Return offline-mode state, If in offline-mode, no communication to the backend is enabled.
:return: boolean offline-mode state
"""
return cls._offline_mode
@classmethod @classmethod
def _get_task_status(cls, task_id): def _get_task_status(cls, task_id):
# type: (str) -> (Optional[str], Optional[str]) # type: (str) -> (Optional[str], Optional[str])

View File

@ -40,9 +40,10 @@ from .backend_config.defs import get_active_config_file, get_config_file
from .backend_api.services import tasks, projects, events from .backend_api.services import tasks, projects, events
from .backend_api.session.session import ( from .backend_api.session.session import (
Session, ENV_ACCESS_KEY, ENV_SECRET_KEY, ENV_HOST, ENV_WEB_HOST, ENV_FILES_HOST, ) Session, ENV_ACCESS_KEY, ENV_SECRET_KEY, ENV_HOST, ENV_WEB_HOST, ENV_FILES_HOST, )
from .backend_api.session.defs import ENV_DEFERRED_TASK_INIT, ENV_IGNORE_MISSING_CONFIG, MissingConfigError from .backend_api.session.defs import ENV_DEFERRED_TASK_INIT, ENV_IGNORE_MISSING_CONFIG, ENV_OFFLINE_MODE, MissingConfigError
from .backend_interface.metrics import Metrics from .backend_interface.metrics import Metrics
from .backend_interface.model import Model as BackendModel from .backend_interface.model import Model as BackendModel
from .backend_interface.base import InterfaceBase
from .backend_interface.task import Task as _Task from .backend_interface.task import Task as _Task
from .backend_interface.task.log import TaskHandler from .backend_interface.task.log import TaskHandler
from .backend_interface.task.development.worker import DevWorker from .backend_interface.task.development.worker import DevWorker
@ -844,6 +845,8 @@ class Task(_Task):
:return: The newly created Task (experiment) :return: The newly created Task (experiment)
:rtype: Task :rtype: Task
""" """
if cls.is_offline():
raise UsageError("Creating task in offline mode. Use 'Task.init' instead.")
if not project_name and not base_task_id: if not project_name and not base_task_id:
if not cls.__main_task: if not cls.__main_task:
raise ValueError("Please provide project_name, no global task context found " raise ValueError("Please provide project_name, no global task context found "
@ -2867,6 +2870,72 @@ class Task(_Task):
return target_task return target_task
@classmethod
def set_offline(cls, offline_mode=False):
# type: (bool) -> None
"""
Set offline mode, where all data and logs are stored into local folder, for later transmission
.. note::
`Task.set_offline` can't move the same task from offline to online, nor can it be applied before `Task.create`.
See below an example of **incorect** usage of `Task.set_offline`:
.. code-block:: py
from clearml import Task
Task.set_offline(True)
task = Task.create(project_name='DEBUG', task_name="offline")
# ^^^ an error or warning is emitted, telling us that `Task.set_offline(True)`
# is supported only for `Task.init`
Task.set_offline(False)
# ^^^ an error or warning is emitted, telling us that running `Task.set_offline(False)`
# while the current task is not closed is not something we support
data = task.export_task()
imported_task = Task.import_task(task_data=data)
The correct way to use `Task.set_offline` can be seen in the following example:
.. code-block:: py
from clearml import Task
Task.set_offline(True)
task = Task.init(project_name='DEBUG', task_name="offline")
task.upload_artifact("large_artifact", "test_strign")
task.close()
Task.set_offline(False)
imported_task = Task.import_offline_session(task.get_offline_mode_folder())
:param offline_mode: If True, offline-mode is turned on, and no communication to the backend is enabled.
:return:
"""
if running_remotely() or bool(offline_mode) == InterfaceBase._offline_mode:
return
if (
cls.current_task()
and cls.current_task().status != cls.TaskStatusEnum.closed
and not offline_mode
):
raise UsageError(
"Switching from offline mode to online mode, but the current task has not been closed. Use `Task.close` to close it."
)
ENV_OFFLINE_MODE.set(offline_mode)
InterfaceBase._offline_mode = bool(offline_mode)
Session._offline_mode = bool(offline_mode)
if not offline_mode:
# noinspection PyProtectedMember
Session._make_all_sessions_go_online()
@classmethod
def is_offline(cls):
# type: () -> bool
"""
Return offline-mode state, If in offline-mode, no communication to the backend is enabled.
:return: boolean offline-mode state
"""
return cls._offline_mode
@classmethod @classmethod
def import_offline_session(cls, session_folder_zip, previous_task_id=None, iteration_offset=0): def import_offline_session(cls, session_folder_zip, previous_task_id=None, iteration_offset=0):
# type: (str, Optional[str], Optional[int]) -> (Optional[str]) # type: (str, Optional[str], Optional[int]) -> (Optional[str])