Add support for resource_applied() callback in k8s glue

Add support for sending log events with k8s-provided timestamps
Refactor env vars infrastructure
This commit is contained in:
allegroai 2023-11-01 15:10:08 +02:00
parent d2384a9a95
commit 0131db8b7d
11 changed files with 354 additions and 242 deletions

View File

@ -1,5 +1,5 @@
from ...backend_config.converters import safe_text_to_bool
from ...backend_config.environment import EnvEntry
from clearml_agent.helper.environment import EnvEntry
from clearml_agent.helper.environment.converters import safe_text_to_bool
ENV_HOST = EnvEntry("CLEARML_API_HOST", "TRAINS_API_HOST")

View File

@ -1,69 +1,8 @@
import base64
from distutils.util import strtobool
from typing import Union, Optional, Any, TypeVar, Callable, Tuple
import six
try:
from typing import Text
except ImportError:
# windows conda-less hack
Text = Any
ConverterType = TypeVar("ConverterType", bound=Callable[[Any], Any])
def text_to_int(value, default=0):
# type: (Any, int) -> int
try:
return int(value)
except (ValueError, TypeError):
return default
def base64_to_text(value):
# type: (Any) -> Text
return base64.b64decode(value).decode("utf-8")
def text_to_bool(value):
# type: (Text) -> bool
return bool(strtobool(value))
def safe_text_to_bool(value):
# type: (Text) -> bool
try:
return text_to_bool(value)
except ValueError:
return bool(value)
def any_to_bool(value):
# type: (Optional[Union[int, float, Text]]) -> bool
if isinstance(value, six.text_type):
return text_to_bool(value)
return bool(value)
def or_(*converters, **kwargs):
# type: (ConverterType, Tuple[Exception, ...]) -> ConverterType
"""
Wrapper that implements an "optional converter" pattern. Allows specifying a converter
for which a set of exceptions is ignored (and the original value is returned)
:param converters: A converter callable
:param exceptions: A tuple of exception types to ignore
"""
# noinspection PyUnresolvedReferences
exceptions = kwargs.get("exceptions", (ValueError, TypeError))
def wrapper(value):
for converter in converters:
try:
return converter(value)
except exceptions:
pass
return value
return wrapper
from clearml_agent.helper.environment.converters import (
base64_to_text,
text_to_bool,
text_to_int,
safe_text_to_bool,
any_to_bool,
or_,
)

View File

@ -1,111 +1,6 @@
import abc
from typing import Optional, Any, Tuple, Callable, Dict
from clearml_agent.helper.environment import Entry, NotSet
import six
from .converters import any_to_bool
try:
from typing import Text
except ImportError:
# windows conda-less hack
Text = Any
NotSet = object()
Converter = Callable[[Any], Any]
@six.add_metaclass(abc.ABCMeta)
class Entry(object):
"""
Configuration entry definition
"""
@classmethod
def default_conversions(cls):
# type: () -> Dict[Any, Converter]
return {
bool: any_to_bool,
six.text_type: lambda s: six.text_type(s).strip(),
}
def __init__(self, key, *more_keys, **kwargs):
# type: (Text, Text, Any) -> None
"""
:param key: Entry's key (at least one).
:param more_keys: More alternate keys for this entry.
:param type: Value type. If provided, will be used choosing a default conversion or
(if none exists) for casting the environment value.
:param converter: Value converter. If provided, will be used to convert the environment value.
:param default: Default value. If provided, will be used as the default value on calls to get() and get_pair()
in case no value is found for any key and no specific default value was provided in the call.
Default value is None.
:param help: Help text describing this entry
"""
self.keys = (key,) + more_keys
self.type = kwargs.pop("type", six.text_type)
self.converter = kwargs.pop("converter", None)
self.default = kwargs.pop("default", None)
self.help = kwargs.pop("help", None)
def __str__(self):
return str(self.key)
@property
def key(self):
return self.keys[0]
def convert(self, value, converter=None):
# type: (Any, Converter) -> Optional[Any]
converter = converter or self.converter
if not converter:
converter = self.default_conversions().get(self.type, self.type)
return converter(value)
def get_pair(self, default=NotSet, converter=None, value_cb=None):
# type: (Any, Converter, Callable[[str, Any], None]) -> Optional[Tuple[Text, Any]]
for key in self.keys:
value = self._get(key)
if value is NotSet:
continue
try:
value = self.convert(value, converter)
except Exception as ex:
self.error("invalid value {key}={value}: {ex}".format(**locals()))
break
# noinspection PyBroadException
try:
if value_cb:
value_cb(key, value)
except Exception:
pass
return key, value
result = self.default if default is NotSet else default
return self.key, result
def get(self, default=NotSet, converter=None, value_cb=None):
# type: (Any, Converter, Callable[[str, Any], None]) -> Optional[Any]
return self.get_pair(default=default, converter=converter, value_cb=value_cb)[1]
def set(self, value):
# type: (Any, Any) -> (Text, Any)
# key, _ = self.get_pair(default=None, converter=None)
for k in self.keys:
self._set(k, str(value))
def _set(self, key, value):
# type: (Text, Text) -> None
pass
@abc.abstractmethod
def _get(self, key):
# type: (Text) -> Any
pass
@abc.abstractmethod
def error(self, message):
# type: (Text) -> None
pass
__all__ = [
"Entry",
"NotSet"
]

View File

@ -1,32 +1,6 @@
from os import getenv, environ
from os import environ
from .converters import text_to_bool
from .entry import Entry, NotSet
class EnvEntry(Entry):
@classmethod
def default_conversions(cls):
conversions = super(EnvEntry, cls).default_conversions().copy()
conversions[bool] = text_to_bool
return conversions
def pop(self):
for k in self.keys:
environ.pop(k, None)
def _get(self, key):
value = getenv(key, "").strip()
return value or NotSet
def _set(self, key, value):
environ[key] = value
def __str__(self):
return "env:{}".format(super(EnvEntry, self).__str__())
def error(self, message):
print("Environment configuration: {}".format(message))
from clearml_agent.helper.environment import EnvEntry
def backward_compatibility_support():
@ -34,6 +8,7 @@ def backward_compatibility_support():
if ENVIRONMENT_BACKWARD_COMPATIBLE.get():
# Add TRAINS_ prefix on every CLEARML_ os environment we support
for k, v in ENVIRONMENT_CONFIG.items():
# noinspection PyBroadException
try:
trains_vars = [var for var in v.vars if var.startswith('CLEARML_')]
if not trains_vars:
@ -44,6 +19,7 @@ def backward_compatibility_support():
except:
continue
for k, v in ENVIRONMENT_SDK_PARAMS.items():
# noinspection PyBroadException
try:
trains_vars = [var for var in v if var.startswith('CLEARML_')]
if not trains_vars:
@ -62,3 +38,9 @@ def backward_compatibility_support():
backwards_k = k.replace('CLEARML_', 'TRAINS_', 1)
if backwards_k not in keys:
environ[backwards_k] = environ[k]
__all__ = [
"EnvEntry",
"backward_compatibility_support"
]

View File

@ -2,6 +2,7 @@ from __future__ import print_function
import json
import time
from typing import List, Tuple
from clearml_agent.commands.base import ServiceCommandSection
from clearml_agent.helper.base import return_list
@ -57,6 +58,42 @@ class Events(ServiceCommandSection):
# print('Sending events done: %d / %d events sent' % (sent_events, len(list_events)))
return sent_events
def send_log_events_with_timestamps(
self, worker_id, task_id, lines_with_ts: List[Tuple[str, str]], level="DEBUG", session=None
):
log_events = []
# break log lines into event packets
for ts, line in return_list(lines_with_ts):
# HACK ignore terminal reset ANSI code
if line == '\x1b[0m':
continue
while line:
if len(line) <= self.max_event_size:
msg = line
line = None
else:
msg = line[:self.max_event_size]
line = line[self.max_event_size:]
log_events.append(
{
"type": "log",
"level": level,
"task": task_id,
"worker": worker_id,
"msg": msg,
"timestamp": ts,
}
)
if line and ts is not None:
# advance timestamp in case we break a line to more than one part
ts += 1
# now send the events
return self.send_events(list_events=log_events, session=session)
def send_log_events(self, worker_id, task_id, lines, level='DEBUG', session=None):
log_events = []
base_timestamp = int(time.time() * 1000)

View File

@ -1,7 +1,11 @@
from clearml_agent.definitions import EnvironmentConfig
from clearml_agent.helper.environment import EnvEntry
ENV_START_AGENT_SCRIPT_PATH = EnvironmentConfig('CLEARML_K8S_GLUE_START_AGENT_SCRIPT_PATH')
ENV_START_AGENT_SCRIPT_PATH = EnvEntry("CLEARML_K8S_GLUE_START_AGENT_SCRIPT_PATH", default="~/__start_agent__.sh")
"""
Script path to use when creating the bash script to run the agent inside the scheduled pod's docker container.
Script will be appended to the specified file.
"""
ENV_DEFAULT_EXECUTION_AGENT_ARGS = EnvEntry("K8S_GLUE_DEF_EXEC_AGENT_ARGS", default="--full-monitoring --require-queue")
ENV_POD_AGENT_INSTALL_ARGS = EnvEntry("K8S_GLUE_POD_AGENT_INSTALL_ARGS", default="", lstrip=False)
ENV_POD_MONITOR_LOG_BATCH_SIZE = EnvEntry("K8S_GLUE_POD_MONITOR_LOG_BATCH_SIZE", default=5, converter=int)

View File

@ -18,7 +18,6 @@ from typing import Text, List, Callable, Any, Collection, Optional, Union, Itera
import yaml
from clearml_agent.backend_api.session import Request
from clearml_agent.commands.events import Events
from clearml_agent.commands.worker import Worker, get_task_container, set_task_container, get_next_task
from clearml_agent.definitions import (
@ -28,7 +27,6 @@ from clearml_agent.definitions import (
ENV_FORCE_SYSTEM_SITE_PACKAGES,
)
from clearml_agent.errors import APIError, UsageError
from clearml_agent.glue.definitions import ENV_START_AGENT_SCRIPT_PATH
from clearml_agent.glue.errors import GetPodCountError
from clearml_agent.glue.utilities import get_path, get_bash_output
from clearml_agent.glue.pending_pods_daemon import PendingPodsDaemon
@ -37,12 +35,17 @@ from clearml_agent.helper.dicts import merge_dicts
from clearml_agent.helper.process import get_bash_output, stringify_bash_output
from clearml_agent.helper.resource_monitor import ResourceMonitor
from clearml_agent.interface.base import ObjectID
from clearml_agent.backend_api.session import Request
from clearml_agent.glue.definitions import (
ENV_START_AGENT_SCRIPT_PATH,
ENV_DEFAULT_EXECUTION_AGENT_ARGS,
ENV_POD_AGENT_INSTALL_ARGS,
)
class K8sIntegration(Worker):
SUPPORTED_KIND = ("pod", "job")
K8S_PENDING_QUEUE = "k8s_scheduler"
K8S_DEFAULT_NAMESPACE = "clearml"
AGENT_LABEL = "CLEARML=agent"
QUEUE_LABEL = "clearml-agent-queue"
@ -64,9 +67,6 @@ class K8sIntegration(Worker):
'echo "ldconfig" >> /etc/profile',
"/usr/sbin/sshd -p {port}"]
DEFAULT_EXECUTION_AGENT_ARGS = os.getenv("K8S_GLUE_DEF_EXEC_AGENT_ARGS", "--full-monitoring --require-queue")
POD_AGENT_INSTALL_ARGS = os.getenv("K8S_GLUE_POD_AGENT_INSTALL_ARGS", "")
CONTAINER_BASH_SCRIPT = [
"export DEBIAN_FRONTEND='noninteractive'",
"echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean",
@ -181,7 +181,7 @@ class K8sIntegration(Worker):
self._agent_label = None
self._pending_pods_daemon = self._create_pending_pods_daemon(
self._pending_pods_daemon = self._create_daemon_instance(
cls_=PendingPodsDaemon,
polling_interval=self._polling_interval
)
@ -190,7 +190,7 @@ class K8sIntegration(Worker):
self._min_cleanup_interval_per_ns_sec = 1.0
self._last_pod_cleanup_per_ns = defaultdict(lambda: 0.)
def _create_pending_pods_daemon(self, cls_, **kwargs):
def _create_daemon_instance(self, cls_, **kwargs):
return cls_(agent=self, **kwargs)
def _load_overrides_yaml(self, overrides_yaml):
@ -417,6 +417,10 @@ class K8sIntegration(Worker):
)
raise GetPodCountError()
def resource_applied(self, resource_name: str, namespace: str, task_id: str, session):
""" Called when a resource (pod/job) was applied """
pass
def run_one_task(self, queue: Text, task_id: Text, worker_args=None, task_session=None, **_):
print('Pulling task {} launching on kubernetes cluster'.format(task_id))
session = task_session or self._session
@ -573,25 +577,34 @@ class K8sIntegration(Worker):
except (KeyError, TypeError, AttributeError):
namespace = self.namespace
if template:
output, error = self._kubectl_apply(
template=template,
pod_number=pod_number,
clearml_conf_create_script=clearml_conf_create_script,
labels=labels,
docker_image=container['image'],
docker_args=container['arguments'],
docker_bash=container.get('setup_shell_script'),
task_id=task_id,
queue=queue,
namespace=namespace,
)
if not template:
print("ERROR: no template for task {}, skipping".format(task_id))
return
print('kubectl output:\n{}\n{}'.format(error, output))
if error:
send_log = "Running kubectl encountered an error: {}".format(error)
self.log.error(send_log)
self.send_logs(task_id, send_log.splitlines())
output, error, pod_name = self._kubectl_apply(
template=template,
pod_number=pod_number,
clearml_conf_create_script=clearml_conf_create_script,
labels=labels,
docker_image=container['image'],
docker_args=container['arguments'],
docker_bash=container.get('setup_shell_script'),
task_id=task_id,
queue=queue,
namespace=namespace,
)
print('kubectl output:\n{}\n{}'.format(error, output))
if error:
send_log = "Running kubectl encountered an error: {}".format(error)
self.log.error(send_log)
self.send_logs(task_id, send_log.splitlines())
return
if pod_name:
self.resource_applied(
resource_name=pod_name, namespace=namespace, task_id=task_id, session=session
)
user_props = {"k8s-queue": str(queue_name)}
if self.ports_mode:
@ -675,8 +688,8 @@ class K8sIntegration(Worker):
[line.format(extra_bash_init_cmd=self.extra_bash_init_script or '',
task_id=task_id,
extra_docker_bash_script=extra_docker_bash_script,
default_execution_agent_args=self.DEFAULT_EXECUTION_AGENT_ARGS,
agent_install_args=self.POD_AGENT_INSTALL_ARGS)
default_execution_agent_args=ENV_DEFAULT_EXECUTION_AGENT_ARGS.get(),
agent_install_args=ENV_POD_AGENT_INSTALL_ARGS.get())
for line in container_bash_script])
extra_bash_commands = list(clearml_conf_create_script or [])
@ -718,7 +731,9 @@ class K8sIntegration(Worker):
if "kind" in template:
if template["kind"].lower() != self.kind:
return (
"", f"Template kind {template['kind']} does not maych kind {self.kind.capitalize()} set for agent"
"",
f"Template kind {template['kind']} does not maych kind {self.kind.capitalize()} set for agent",
None
)
else:
template["kind"] = self.kind.capitalize()
@ -789,11 +804,11 @@ class K8sIntegration(Worker):
process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
except Exception as ex:
return None, str(ex)
return None, str(ex), None
finally:
safe_remove_file(yaml_file)
return stringify_bash_output(output), stringify_bash_output(error)
return stringify_bash_output(output), stringify_bash_output(error), name
def _process_bash_lines_response(self, bash_cmd: str, raise_error=True):
res = get_bash_output(bash_cmd, raise_error=raise_error)

View File

@ -0,0 +1,8 @@
from .entry import Entry, NotSet
from .environment import EnvEntry
__all__ = [
'Entry',
'NotSet',
'EnvEntry',
]

View File

@ -0,0 +1,70 @@
import base64
from distutils.util import strtobool
from typing import Union, Optional, Any, TypeVar, Callable, Tuple
import six
try:
from typing import Text
except ImportError:
# windows conda-less hack
Text = Any
ConverterType = TypeVar("ConverterType", bound=Callable[[Any], Any])
def base64_to_text(value):
# type: (Any) -> Text
return base64.b64decode(value).decode("utf-8")
def text_to_int(value, default=0):
# type: (Any, int) -> int
try:
return int(value)
except (ValueError, TypeError):
return default
def text_to_bool(value):
# type: (Text) -> bool
return bool(strtobool(value))
def safe_text_to_bool(value):
# type: (Text) -> bool
try:
return text_to_bool(value)
except ValueError:
return bool(value)
def any_to_bool(value):
# type: (Optional[Union[int, float, Text]]) -> bool
if isinstance(value, six.text_type):
return text_to_bool(value)
return bool(value)
# noinspection PyIncorrectDocstring
def or_(*converters, **kwargs):
# type: (ConverterType, Tuple[Exception, ...]) -> ConverterType
"""
Wrapper that implements an "optional converter" pattern. Allows specifying a converter
for which a set of exceptions is ignored (and the original value is returned)
:param converters: A converter callable
:param exceptions: A tuple of exception types to ignore
"""
# noinspection PyUnresolvedReferences
exceptions = kwargs.get("exceptions", (ValueError, TypeError))
def wrapper(value):
for converter in converters:
try:
return converter(value)
except exceptions:
pass
return value
return wrapper

View File

@ -0,0 +1,134 @@
import abc
from typing import Optional, Any, Tuple, Callable, Dict
import six
from .converters import any_to_bool
try:
from typing import Text
except ImportError:
# windows conda-less hack
Text = Any
NotSet = object()
Converter = Callable[[Any], Any]
@six.add_metaclass(abc.ABCMeta)
class Entry(object):
"""
Configuration entry definition
"""
def default_conversions(self):
# type: () -> Dict[Any, Converter]
if self.lstrip and self.rstrip:
def str_convert(s):
return six.text_type(s).strip()
elif self.lstrip:
def str_convert(s):
return six.text_type(s).lstrip()
elif self.rstrip:
def str_convert(s):
return six.text_type(s).rstrip()
else:
def str_convert(s):
return six.text_type(s)
return {
bool: lambda x: any_to_bool(x.strip()),
six.text_type: str_convert,
}
def __init__(self, key, *more_keys, **kwargs):
# type: (Text, Text, Any) -> None
"""
:rtype: object
:param key: Entry's key (at least one).
:param more_keys: More alternate keys for this entry.
:param type: Value type. If provided, will be used choosing a default conversion or
(if none exists) for casting the environment value.
:param converter: Value converter. If provided, will be used to convert the environment value.
:param default: Default value. If provided, will be used as the default value on calls to get() and get_pair()
in case no value is found for any key and no specific default value was provided in the call.
Default value is None.
:param help: Help text describing this entry
"""
self.keys = (key,) + more_keys
self.type = kwargs.pop("type", six.text_type)
self.converter = kwargs.pop("converter", None)
self.default = kwargs.pop("default", None)
self.help = kwargs.pop("help", None)
self.lstrip = kwargs.pop("lstrip", True)
self.rstrip = kwargs.pop("rstrip", True)
def __str__(self):
return str(self.key)
@property
def key(self):
return self.keys[0]
def convert(self, value, converter=None):
# type: (Any, Converter) -> Optional[Any]
converter = converter or self.converter
if not converter:
converter = self.default_conversions().get(self.type, self.type)
return converter(value)
def get_pair(self, default=NotSet, converter=None, value_cb=None):
# type: (Any, Converter, Callable[[str, Any], None]) -> Optional[Tuple[Text, Any]]
for key in self.keys:
value = self._get(key)
if value is NotSet:
continue
try:
value = self.convert(value, converter)
except Exception as ex:
self.error("invalid value {key}={value}: {ex}".format(**locals()))
break
# noinspection PyBroadException
try:
if value_cb:
value_cb(key, value)
except Exception:
pass
return key, value
result = self.default if default is NotSet else default
return self.key, result
def get(self, default=NotSet, converter=None, value_cb=None):
# type: (Any, Converter, Callable[[str, Any], None]) -> Optional[Any]
return self.get_pair(default=default, converter=converter, value_cb=value_cb)[1]
def set(self, value):
# type: (Any, Any) -> (Text, Any)
# key, _ = self.get_pair(default=None, converter=None)
for k in self.keys:
self._set(k, str(value))
def _set(self, key, value):
# type: (Text, Text) -> None
pass
@abc.abstractmethod
def _get(self, key):
# type: (Text) -> Any
pass
@abc.abstractmethod
def error(self, message):
# type: (Text) -> None
pass

View File

@ -0,0 +1,28 @@
from os import getenv, environ
from .converters import text_to_bool
from .entry import Entry, NotSet
class EnvEntry(Entry):
def default_conversions(self):
conversions = super(EnvEntry, self).default_conversions().copy()
conversions[bool] = lambda x: text_to_bool(x.strip())
return conversions
def pop(self):
for k in self.keys:
environ.pop(k, None)
def _get(self, key):
value = getenv(key, "")
return value or NotSet
def _set(self, key, value):
environ[key] = value
def __str__(self):
return "env:{}".format(super(EnvEntry, self).__str__())
def error(self, message):
print("Environment configuration: {}".format(message))