clearml-server/fileserver/redis_manager.py

104 lines
2.9 KiB
Python

from os import getenv
from boltons.iterutils import first
from redis import StrictRedis
from redis.cluster import RedisCluster
from config import config
log = config.logger(__file__)
OVERRIDE_HOST_ENV_KEY = (
"CLEARML_REDIS_SERVICE_HOST",
"TRAINS_REDIS_SERVICE_HOST",
"REDIS_SERVICE_HOST",
)
OVERRIDE_PORT_ENV_KEY = (
"CLEARML_REDIS_SERVICE_PORT",
"TRAINS_REDIS_SERVICE_PORT",
"REDIS_SERVICE_PORT",
)
OVERRIDE_PASSWORD_ENV_KEY = (
"CLEARML_REDIS_SERVICE_PASSWORD",
"TRAINS_REDIS_SERVICE_PASSWORD",
"REDIS_SERVICE_PASSWORD",
)
OVERRIDE_HOST = first(filter(None, map(getenv, OVERRIDE_HOST_ENV_KEY)))
if OVERRIDE_HOST:
log.info(f"Using override redis host {OVERRIDE_HOST}")
OVERRIDE_PORT = first(filter(None, map(getenv, OVERRIDE_PORT_ENV_KEY)))
if OVERRIDE_PORT:
log.info(f"Using override redis port {OVERRIDE_PORT}")
OVERRIDE_PASSWORD = first(filter(None, map(getenv, OVERRIDE_PASSWORD_ENV_KEY)))
class ConfigError(Exception):
pass
class GeneralError(Exception):
pass
class RedisManager(object):
def __init__(self, redis_config_dict):
self.aliases = {}
for alias, alias_config in redis_config_dict.items():
alias_config = alias_config.as_plain_ordered_dict()
alias_config["password"] = config.get(
f"secure.redis.{alias}.password", None
)
is_cluster = alias_config.get("cluster", False)
host = OVERRIDE_HOST or alias_config.get("host", None)
if host:
alias_config["host"] = host
port = OVERRIDE_PORT or alias_config.get("port", None)
if port:
alias_config["port"] = port
password = OVERRIDE_PASSWORD or alias_config.get("password", None)
if password:
alias_config["password"] = password
if not port or not host:
raise ConfigError(
f"Redis configuration is invalid. missing port or host (alias={alias})"
)
if is_cluster:
del alias_config["cluster"]
del alias_config["db"]
self.aliases[alias] = RedisCluster(**alias_config)
else:
self.aliases[alias] = StrictRedis(**alias_config)
def connection(self, alias) -> StrictRedis:
obj = self.aliases.get(alias)
if not obj:
raise GeneralError(f"Invalid Redis alias {alias}")
obj.get("health")
return obj
def host(self, alias):
r = self.connection(alias)
if isinstance(r, RedisCluster):
connections = r.get_default_node().redis_connection.connection_pool._available_connections
else:
connections = r.connection_pool._available_connections
if not connections:
return None
return connections[0].host
redman = RedisManager(config.get("hosts.redis"))