clearml-agent/clearml_agent/helper/environment/converters.py
allegroai 0131db8b7d Add support for resource_applied() callback in k8s glue
Add support for sending log events with k8s-provided timestamps
Refactor env vars infrastructure
2023-11-01 15:10:08 +02:00

71 lines
1.7 KiB
Python

import base64
from distutils.util import strtobool
from typing import Union, Optional, Any, TypeVar, Callable, Tuple
import six
try:
from typing import Text
except ImportError:
# windows conda-less hack
Text = Any
ConverterType = TypeVar("ConverterType", bound=Callable[[Any], Any])
def base64_to_text(value):
# type: (Any) -> Text
return base64.b64decode(value).decode("utf-8")
def text_to_int(value, default=0):
# type: (Any, int) -> int
try:
return int(value)
except (ValueError, TypeError):
return default
def text_to_bool(value):
# type: (Text) -> bool
return bool(strtobool(value))
def safe_text_to_bool(value):
# type: (Text) -> bool
try:
return text_to_bool(value)
except ValueError:
return bool(value)
def any_to_bool(value):
# type: (Optional[Union[int, float, Text]]) -> bool
if isinstance(value, six.text_type):
return text_to_bool(value)
return bool(value)
# noinspection PyIncorrectDocstring
def or_(*converters, **kwargs):
# type: (ConverterType, Tuple[Exception, ...]) -> ConverterType
"""
Wrapper that implements an "optional converter" pattern. Allows specifying a converter
for which a set of exceptions is ignored (and the original value is returned)
:param converters: A converter callable
:param exceptions: A tuple of exception types to ignore
"""
# noinspection PyUnresolvedReferences
exceptions = kwargs.get("exceptions", (ValueError, TypeError))
def wrapper(value):
for converter in converters:
try:
return converter(value)
except exceptions:
pass
return value
return wrapper