mirror of
https://github.com/clearml/clearml-server
synced 2025-01-31 02:46:53 +00:00
173 lines
6.0 KiB
Python
173 lines
6.0 KiB
Python
|
import threading
|
||
|
from time import sleep
|
||
|
|
||
|
from redis import StrictRedis
|
||
|
from redis.sentinel import Sentinel, SentinelConnectionPool
|
||
|
|
||
|
from apierrors.errors.server_error import ConfigError, GeneralError
|
||
|
from config import config
|
||
|
|
||
|
log = config.logger(__file__)
|
||
|
|
||
|
|
||
|
class MyPubSubWorkerThread(threading.Thread):
|
||
|
def __init__(self, sentinel, on_new_master, msg_sleep_time, daemon=True):
|
||
|
super(MyPubSubWorkerThread, self).__init__()
|
||
|
self.daemon = daemon
|
||
|
self.sentinel = sentinel
|
||
|
self.on_new_master = on_new_master
|
||
|
self.sentinel_host = sentinel.connection_pool.connection_kwargs["host"]
|
||
|
self.msg_sleep_time = msg_sleep_time
|
||
|
self._running = False
|
||
|
self.pubsub = None
|
||
|
|
||
|
def subscribe(self):
|
||
|
if self.pubsub:
|
||
|
try:
|
||
|
self.pubsub.unsubscribe()
|
||
|
self.pubsub.punsubscribe()
|
||
|
except Exception:
|
||
|
pass
|
||
|
finally:
|
||
|
self.pubsub = None
|
||
|
|
||
|
subscriptions = {"+switch-master": self.on_new_master}
|
||
|
|
||
|
while not self.pubsub or not self.pubsub.subscribed:
|
||
|
try:
|
||
|
self.pubsub = self.sentinel.pubsub()
|
||
|
self.pubsub.subscribe(**subscriptions)
|
||
|
except Exception as ex:
|
||
|
log.warn(
|
||
|
f"Error while subscribing to sentinel at {self.sentinel_host} ({ex.args[0]}) Sleeping and retrying"
|
||
|
)
|
||
|
sleep(3)
|
||
|
log.info(f"Subscribed to sentinel {self.sentinel_host}")
|
||
|
|
||
|
def run(self):
|
||
|
if self._running:
|
||
|
return
|
||
|
self._running = True
|
||
|
|
||
|
self.subscribe()
|
||
|
|
||
|
while self.pubsub.subscribed:
|
||
|
try:
|
||
|
self.pubsub.get_message(
|
||
|
ignore_subscribe_messages=True, timeout=self.msg_sleep_time
|
||
|
)
|
||
|
except Exception as ex:
|
||
|
log.warn(
|
||
|
f"Error while getting message from sentinel {self.sentinel_host} ({ex.args[0]}) Resubscribing"
|
||
|
)
|
||
|
self.subscribe()
|
||
|
|
||
|
self.pubsub.close()
|
||
|
self._running = False
|
||
|
|
||
|
def stop(self):
|
||
|
# stopping simply unsubscribes from all channels and patterns.
|
||
|
# the unsubscribe responses that are generated will short circuit
|
||
|
# the loop in run(), calling pubsub.close() to clean up the connection
|
||
|
self.pubsub.unsubscribe()
|
||
|
self.pubsub.punsubscribe()
|
||
|
|
||
|
|
||
|
# todo,future - multi master clusters?
|
||
|
class RedisCluster(object):
|
||
|
def __init__(self, sentinel_hosts, service_name, **connection_kwargs):
|
||
|
self.service_name = service_name
|
||
|
self.sentinel = Sentinel(sentinel_hosts, **connection_kwargs)
|
||
|
self.master = None
|
||
|
self.master_host_port = None
|
||
|
self.reconfigure()
|
||
|
self.sentinel_threads = {}
|
||
|
self.listen()
|
||
|
|
||
|
def reconfigure(self):
|
||
|
try:
|
||
|
self.master_host_port = self.sentinel.discover_master(self.service_name)
|
||
|
self.master = self.sentinel.master_for(self.service_name)
|
||
|
log.info(f"Reconfigured master to {self.master_host_port}")
|
||
|
except Exception as ex:
|
||
|
log.error(f"Error while reconfiguring. {ex.args[0]}")
|
||
|
|
||
|
def listen(self):
|
||
|
def on_new_master(workerThread):
|
||
|
self.reconfigure()
|
||
|
|
||
|
for sentinel in self.sentinel.sentinels:
|
||
|
sentinel_host = sentinel.connection_pool.connection_kwargs["host"]
|
||
|
self.sentinel_threads[sentinel_host] = MyPubSubWorkerThread(
|
||
|
sentinel, on_new_master, msg_sleep_time=0.001, daemon=True
|
||
|
)
|
||
|
self.sentinel_threads[sentinel_host].start()
|
||
|
|
||
|
|
||
|
class RedisManager(object):
|
||
|
def __init__(self, redis_config_dict):
|
||
|
self.aliases = {}
|
||
|
for alias, alias_config in redis_config_dict.items():
|
||
|
is_cluster = alias_config.get("cluster", False)
|
||
|
host = alias_config.get("host", None)
|
||
|
port = alias_config.get("port", None)
|
||
|
db = alias_config.get("db", 0)
|
||
|
sentinels = alias_config.get("sentinels", None)
|
||
|
service_name = alias_config.get("service_name", None)
|
||
|
|
||
|
if not is_cluster and sentinels:
|
||
|
raise ConfigError(
|
||
|
"Redis configuration is invalid. mixed regular and cluster mode",
|
||
|
alias=alias,
|
||
|
)
|
||
|
if is_cluster and (not sentinels or not service_name):
|
||
|
raise ConfigError(
|
||
|
"Redis configuration is invalid. missing sentinels or service_name",
|
||
|
alias=alias,
|
||
|
)
|
||
|
if not is_cluster and (not port or not host):
|
||
|
raise ConfigError(
|
||
|
"Redis configuration is invalid. missing port or host", alias=alias
|
||
|
)
|
||
|
|
||
|
if is_cluster:
|
||
|
# todo support all redis connection args via sentinel's connection_kwargs
|
||
|
connection_kwargs = alias_config.as_plain_ordered_dict()
|
||
|
del connection_kwargs["sentinels"]
|
||
|
del connection_kwargs["cluster"]
|
||
|
del connection_kwargs["service_name"]
|
||
|
self.aliases[alias] = RedisCluster(
|
||
|
sentinels, service_name, **connection_kwargs
|
||
|
)
|
||
|
else:
|
||
|
self.aliases[alias] = StrictRedis(**alias_config)
|
||
|
|
||
|
def connection(self, alias):
|
||
|
obj = self.aliases.get(alias)
|
||
|
if not obj:
|
||
|
raise GeneralError(f"Invalid Redis alias {alias}")
|
||
|
if isinstance(obj, RedisCluster):
|
||
|
obj.master.get("health")
|
||
|
return obj.master
|
||
|
else:
|
||
|
obj.get("health")
|
||
|
return obj
|
||
|
|
||
|
def host(self, alias):
|
||
|
r = self.connection(alias)
|
||
|
pool = r.connection_pool
|
||
|
if isinstance(pool, SentinelConnectionPool):
|
||
|
connections = pool.connection_kwargs[
|
||
|
"connection_pool"
|
||
|
]._available_connections
|
||
|
else:
|
||
|
connections = pool._available_connections
|
||
|
|
||
|
if len(connections) > 0:
|
||
|
return connections[0].host
|
||
|
else:
|
||
|
return None
|
||
|
|
||
|
|
||
|
redman = RedisManager(config.get("hosts.redis"))
|