diff --git a/backend/open_webui/env.py b/backend/open_webui/env.py index 6c4d151b0..274cd9245 100644 --- a/backend/open_webui/env.py +++ b/backend/open_webui/env.py @@ -379,6 +379,10 @@ WEBSOCKET_MANAGER = os.environ.get("WEBSOCKET_MANAGER", "") WEBSOCKET_REDIS_URL = os.environ.get("WEBSOCKET_REDIS_URL", REDIS_URL) WEBSOCKET_REDIS_LOCK_TIMEOUT = os.environ.get("WEBSOCKET_REDIS_LOCK_TIMEOUT", 60) +WEBSOCKET_SENTINEL_HOSTS = os.environ.get("WEBSOCKET_SENTINEL_HOSTS", "") + +WEBSOCKET_SENTINEL_PORT = os.environ.get("WEBSOCKET_SENTINEL_PORT", "26379") + AIOHTTP_CLIENT_TIMEOUT = os.environ.get("AIOHTTP_CLIENT_TIMEOUT", "") if AIOHTTP_CLIENT_TIMEOUT == "": diff --git a/backend/open_webui/socket/main.py b/backend/open_webui/socket/main.py index 4346d64e6..f1f02956e 100644 --- a/backend/open_webui/socket/main.py +++ b/backend/open_webui/socket/main.py @@ -3,7 +3,7 @@ import socketio import logging import sys import time -from redis.sentinel import Sentinel +from redis import asyncio as aioredis from open_webui.models.users import Users, UserNameResponse from open_webui.models.channels import Channels @@ -26,52 +26,53 @@ from open_webui.env import ( ) class AsyncRedisSentinelManager(socketio.AsyncRedisManager): - def __init__(self, sentinel_hosts, sentinel_port=26379, redis_port=6379, service_name="mymaster", db=0, - username=None, password=None, channel='socketio', write_only=False, **kwargs): + def __init__(self, sentinel_hosts, sentinel_port=26379, redis_port=6379, service="mymaster", db=0, + username=None, password=None, channel='socketio', write_only=False, logger=None, redis_options=None): """ Initialize the Redis Sentinel Manager. + This implementation mostly replicates the __init__ of AsyncRedisManager and + overrides _redis_connect() with a version that uses Redis Sentinel :param sentinel_hosts: List of Sentinel hosts :param sentinel_port: Sentinel Port - :param redis_port: Redis Port - :param service_name: Master service name in Sentinel + :param redis_port: Redis Port (currently unsupported by aioredis!) + :param service: Master service name in Sentinel :param db: Redis database to use - :param username: Redis username (if any) + :param username: Redis username (if any) (currently unsupported by aioredis!) :param password: Redis password (if any) - :param channel: The Redis channel name - :param write_only: If set to True, only initialize the connection to send messages - :param kwargs: Additional connection arguments for Redis + :param channel: The channel name on which the server sends and receives + notifications. Must be the same in all the servers. + :param write_only: If set to ``True``, only initialize to emit events. The + default of ``False`` initializes the class for emitting + and receiving. + :param redis_options: additional keyword arguments to be passed to + ``aioredis.from_url()``. """ - self.sentinel_addresses = [(host, sentinel_port) for host in sentinel_hosts] - self.redis_port=redis_port - self.service_name = service_name - self.db = db - self.username = username - self.password = password - self.channel = channel - self.write_only = write_only - self.redis_kwargs = kwargs + self._sentinels = [(host, sentinel_port) for host in sentinel_hosts] + self._redis_port=redis_port + self._service = service + self._db = db + self._username = username + self._password = password + self._channel = channel + self.redis_options = redis_options or {} - # Skip parent's init but call grandparent's init - socketio.AsyncManager.__init__(self) + # connect and call grandparent constructor self._redis_connect() + super(socketio.AsyncRedisManager, self).__init__(channel=channel, write_only=write_only, logger=logger) def _redis_connect(self): """Establish connections to Redis through Sentinel.""" - sentinel = redis.sentinel.Sentinel( - self.sentinel_addresses, - port=self.redis_port, - db=self.db, - username=self.username, - password=self.password, - **self.redis_kwargs + sentinel = aioredis.sentinel.Sentinel( + self._sentinels, + port=self._redis_port, + db=self._db, + password=self._password, + **self.redis_options ) - # Get connections to the Redis master and slave - self.redis = sentinel.master_for(self.service_name) - if not self.write_only: - self.pubsub = sentinel.slave_for(self.service_name).pubsub() - self.pubsub.subscribe(self.channel) + self.redis = sentinel.master_for(self._service) + self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) logging.basicConfig(stream=sys.stdout, level=GLOBAL_LOG_LEVEL) @@ -114,15 +115,15 @@ if WEBSOCKET_MANAGER == "redis": sentinel_hosts=WEBSOCKET_SENTINEL_HOSTS.split(',') sentinel_port=int(WEBSOCKET_SENTINEL_PORT) sentinels=[(host, sentinel_port) for host in sentinel_hosts] - SESSION_POOL = RedisDict("open-webui:session_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels) - USER_POOL = RedisDict("open-webui:user_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels) - USAGE_POOL = RedisDict("open-webui:usage_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels) + SESSION_POOL = RedisDict("open-webui:session_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels=sentinels) + USER_POOL = RedisDict("open-webui:user_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels=sentinels) + USAGE_POOL = RedisDict("open-webui:usage_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels=sentinels) clean_up_lock = RedisLock( redis_url=WEBSOCKET_REDIS_URL, lock_name="usage_cleanup_lock", timeout_secs=WEBSOCKET_REDIS_LOCK_TIMEOUT, - sentinels, + sentinels=sentinels, ) aquire_func = clean_up_lock.aquire_lock renew_func = clean_up_lock.renew_lock diff --git a/backend/open_webui/socket/utils.py b/backend/open_webui/socket/utils.py index 284dd3290..b20366c6a 100644 --- a/backend/open_webui/socket/utils.py +++ b/backend/open_webui/socket/utils.py @@ -24,13 +24,13 @@ def get_redis_connection(redis_url, sentinels, decode_responses=True): if sentinels: redis_config = parse_redis_sentinel_url(redis_url) sentinel = redis.sentinel.Sentinel( - self.sentinels, + sentinels, port=redis_config['port'], db=redis_config['db'], username=redis_config['username'], password=redis_config['password'], decode_responses=decode_responses - } + ) # Get a master connection from Sentinel return sentinel.master_for(redis_config['service'])