mirror of
https://github.com/clearml/clearml-server
synced 2025-01-31 19:06:55 +00:00
196 lines
6.7 KiB
Python
196 lines
6.7 KiB
Python
import threading
|
|
from os import getenv
|
|
from time import sleep
|
|
|
|
from boltons.iterutils import first
|
|
from redis import StrictRedis
|
|
from redis.sentinel import Sentinel, SentinelConnectionPool
|
|
|
|
from apiserver.apierrors.errors.server_error import ConfigError, GeneralError
|
|
from apiserver.config_repo import config
|
|
|
|
log = config.logger(__file__)
|
|
|
|
OVERRIDE_HOST_ENV_KEY = ("TRAINS_REDIS_SERVICE_HOST", "REDIS_SERVICE_HOST")
|
|
OVERRIDE_PORT_ENV_KEY = ("TRAINS_REDIS_SERVICE_PORT", "REDIS_SERVICE_PORT")
|
|
|
|
OVERRIDE_HOST = first(filter(None, map(getenv, OVERRIDE_HOST_ENV_KEY)))
|
|
if OVERRIDE_HOST:
|
|
log.info(f"Using override redis host {OVERRIDE_HOST}")
|
|
|
|
OVERRIDE_PORT = first(filter(None, map(getenv, OVERRIDE_PORT_ENV_KEY)))
|
|
if OVERRIDE_PORT:
|
|
log.info(f"Using override redis port {OVERRIDE_PORT}")
|
|
|
|
|
|
class MyPubSubWorkerThread(threading.Thread):
|
|
def __init__(self, sentinel, on_new_master, msg_sleep_time, daemon=True):
|
|
super(MyPubSubWorkerThread, self).__init__()
|
|
self.daemon = daemon
|
|
self.sentinel = sentinel
|
|
self.on_new_master = on_new_master
|
|
self.sentinel_host = sentinel.connection_pool.connection_kwargs["host"]
|
|
self.msg_sleep_time = msg_sleep_time
|
|
self._running = False
|
|
self.pubsub = None
|
|
|
|
def subscribe(self):
|
|
if self.pubsub:
|
|
try:
|
|
self.pubsub.unsubscribe()
|
|
self.pubsub.punsubscribe()
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
self.pubsub = None
|
|
|
|
subscriptions = {"+switch-master": self.on_new_master}
|
|
|
|
while not self.pubsub or not self.pubsub.subscribed:
|
|
try:
|
|
self.pubsub = self.sentinel.pubsub()
|
|
self.pubsub.subscribe(**subscriptions)
|
|
except Exception as ex:
|
|
log.warn(
|
|
f"Error while subscribing to sentinel at {self.sentinel_host} ({ex.args[0]}) Sleeping and retrying"
|
|
)
|
|
sleep(3)
|
|
log.info(f"Subscribed to sentinel {self.sentinel_host}")
|
|
|
|
def run(self):
|
|
if self._running:
|
|
return
|
|
self._running = True
|
|
|
|
self.subscribe()
|
|
|
|
while self.pubsub.subscribed:
|
|
try:
|
|
self.pubsub.get_message(
|
|
ignore_subscribe_messages=True, timeout=self.msg_sleep_time
|
|
)
|
|
except Exception as ex:
|
|
log.warn(
|
|
f"Error while getting message from sentinel {self.sentinel_host} ({ex.args[0]}) Resubscribing"
|
|
)
|
|
self.subscribe()
|
|
|
|
self.pubsub.close()
|
|
self._running = False
|
|
|
|
def stop(self):
|
|
# stopping simply unsubscribes from all channels and patterns.
|
|
# the unsubscribe responses that are generated will short circuit
|
|
# the loop in run(), calling pubsub.close() to clean up the connection
|
|
self.pubsub.unsubscribe()
|
|
self.pubsub.punsubscribe()
|
|
|
|
|
|
# todo,future - multi master clusters?
|
|
class RedisCluster(object):
|
|
def __init__(self, sentinel_hosts, service_name, **connection_kwargs):
|
|
self.service_name = service_name
|
|
self.sentinel = Sentinel(sentinel_hosts, **connection_kwargs)
|
|
self.master = None
|
|
self.master_host_port = None
|
|
self.reconfigure()
|
|
self.sentinel_threads = {}
|
|
self.listen()
|
|
|
|
def reconfigure(self):
|
|
try:
|
|
self.master_host_port = self.sentinel.discover_master(self.service_name)
|
|
self.master = self.sentinel.master_for(self.service_name)
|
|
log.info(f"Reconfigured master to {self.master_host_port}")
|
|
except Exception as ex:
|
|
log.error(f"Error while reconfiguring. {ex.args[0]}")
|
|
|
|
def listen(self):
|
|
def on_new_master(workerThread):
|
|
self.reconfigure()
|
|
|
|
for sentinel in self.sentinel.sentinels:
|
|
sentinel_host = sentinel.connection_pool.connection_kwargs["host"]
|
|
self.sentinel_threads[sentinel_host] = MyPubSubWorkerThread(
|
|
sentinel, on_new_master, msg_sleep_time=0.001, daemon=True
|
|
)
|
|
self.sentinel_threads[sentinel_host].start()
|
|
|
|
|
|
class RedisManager(object):
|
|
def __init__(self, redis_config_dict):
|
|
self.aliases = {}
|
|
for alias, alias_config in redis_config_dict.items():
|
|
|
|
alias_config = alias_config.as_plain_ordered_dict()
|
|
|
|
is_cluster = alias_config.get("cluster", False)
|
|
|
|
host = OVERRIDE_HOST or alias_config.get("host", None)
|
|
if host:
|
|
alias_config["host"] = host
|
|
|
|
port = OVERRIDE_PORT or alias_config.get("port", None)
|
|
if port:
|
|
alias_config["port"] = port
|
|
|
|
db = alias_config.get("db", 0)
|
|
|
|
sentinels = alias_config.get("sentinels", None)
|
|
service_name = alias_config.get("service_name", None)
|
|
|
|
if not is_cluster and sentinels:
|
|
raise ConfigError(
|
|
"Redis configuration is invalid. mixed regular and cluster mode",
|
|
alias=alias,
|
|
)
|
|
if is_cluster and (not sentinels or not service_name):
|
|
raise ConfigError(
|
|
"Redis configuration is invalid. missing sentinels or service_name",
|
|
alias=alias,
|
|
)
|
|
if not is_cluster and (not port or not host):
|
|
raise ConfigError(
|
|
"Redis configuration is invalid. missing port or host", alias=alias
|
|
)
|
|
|
|
if is_cluster:
|
|
# todo support all redis connection args via sentinel's connection_kwargs
|
|
del alias_config["sentinels"]
|
|
del alias_config["cluster"]
|
|
del alias_config["service_name"]
|
|
self.aliases[alias] = RedisCluster(
|
|
sentinels, service_name, **alias_config
|
|
)
|
|
else:
|
|
self.aliases[alias] = StrictRedis(**alias_config)
|
|
|
|
def connection(self, alias):
|
|
obj = self.aliases.get(alias)
|
|
if not obj:
|
|
raise GeneralError(f"Invalid Redis alias {alias}")
|
|
if isinstance(obj, RedisCluster):
|
|
obj.master.get("health")
|
|
return obj.master
|
|
else:
|
|
obj.get("health")
|
|
return obj
|
|
|
|
def host(self, alias):
|
|
r = self.connection(alias)
|
|
pool = r.connection_pool
|
|
if isinstance(pool, SentinelConnectionPool):
|
|
connections = pool.connection_kwargs[
|
|
"connection_pool"
|
|
]._available_connections
|
|
else:
|
|
connections = pool._available_connections
|
|
|
|
if len(connections) > 0:
|
|
return connections[0].host
|
|
else:
|
|
return None
|
|
|
|
|
|
redman = RedisManager(config.get("hosts.redis"))
|