From 3b357746d8b5f0f24cceab56846073b62465a333 Mon Sep 17 00:00:00 2001 From: Jan Kessler Date: Mon, 3 Mar 2025 16:15:41 +0100 Subject: [PATCH 1/7] first part of adding Redis Sentinel support --- backend/open_webui/socket/main.py | 60 +++++++++++++++++++++++++++++- backend/open_webui/socket/utils.py | 13 +++++++ 2 files changed, 71 insertions(+), 2 deletions(-) diff --git a/backend/open_webui/socket/main.py b/backend/open_webui/socket/main.py index 8f5a9568b..61c4bcd46 100644 --- a/backend/open_webui/socket/main.py +++ b/backend/open_webui/socket/main.py @@ -3,6 +3,7 @@ import socketio import logging import sys import time +from redis.sentinel import Sentinel from open_webui.models.users import Users, UserNameResponse from open_webui.models.channels import Channels @@ -13,15 +14,65 @@ from open_webui.env import ( WEBSOCKET_MANAGER, WEBSOCKET_REDIS_URL, WEBSOCKET_REDIS_LOCK_TIMEOUT, + WEBSOCKET_SENTINEL_PORT, + WEBSOCKET_SENTINEL_HOSTS, ) from open_webui.utils.auth import decode_token -from open_webui.socket.utils import RedisDict, RedisLock +from open_webui.socket.utils import RedisDict, RedisLock, parse_redis_sentinel_url from open_webui.env import ( GLOBAL_LOG_LEVEL, SRC_LOG_LEVELS, ) +class AsyncRedisSentinelManager(socketio.AsyncRedisManager): + def __init__(self, sentinel_hosts, sentinel_port=26379, redis_port=6379, service_name="mymaster", db=0, + username=None, password=None, channel='socketio', write_only=False, **kwargs): + """ + Initialize the Redis Sentinel Manager. + + :param sentinel_hosts: List of Sentinel hosts + :param sentinel_port: Sentinel Port + :param redis_port: Redis Port + :param service_name: Master service name in Sentinel + :param db: Redis database to use + :param username: Redis username (if any) + :param password: Redis password (if any) + :param channel: The Redis channel name + :param write_only: If set to True, only initialize the connection to send messages + :param kwargs: Additional connection arguments for Redis + """ + self.sentinel_addresses = [(host, sentinel_port) for host in sentinel_hosts] + self.redis_port=redis_port + self.service_name = service_name + self.db = db + self.username = username + self.password = password + self.channel = channel + self.write_only = write_only + self.redis_kwargs = kwargs + + # Skip parent's init but call grandparent's init + socketio.AsyncManager.__init__(self) + self._redis_connect() + + def _redis_connect(self): + """Establish connections to Redis through Sentinel.""" + sentinel = redis.sentinel.Sentinel( + self.sentinel_addresses, + port=self.redis_port, + db=self.db, + username=self.username, + password=self.password, + **self.redis_kwargs + ) + + # Get connections to the Redis master and slave + self.redis = sentinel.master_for(self.service_name) + if not self.write_only: + self.pubsub = sentinel.slave_for(self.service_name).pubsub() + self.pubsub.subscribe(self.channel) + logging.basicConfig(stream=sys.stdout, level=GLOBAL_LOG_LEVEL) log = logging.getLogger(__name__) @@ -29,7 +80,12 @@ log.setLevel(SRC_LOG_LEVELS["SOCKET"]) if WEBSOCKET_MANAGER == "redis": - mgr = socketio.AsyncRedisManager(WEBSOCKET_REDIS_URL) + if WEBSOCKET_SENTINEL_HOSTS: + redis_config = parse_redis_sentinel_url(WEBSOCKET_REDIS_URL) + mgr = AsyncRedisSentinelManager(WEBSOCKET_SENTINEL_HOSTS.split(','), sentinel_port=int(WEBSOCKET_SENTINEL_PORT), redis_port=redis_config["port"], + service=redis_config["service"], db=redis_config["db"], username=redis_config["username"], password=redis_config["password"]) + else: + mgr = socketio.AsyncRedisManager(WEBSOCKET_REDIS_URL) sio = socketio.AsyncServer( cors_allowed_origins=[], async_mode="asgi", diff --git a/backend/open_webui/socket/utils.py b/backend/open_webui/socket/utils.py index 46fafbb9e..f5628ee1e 100644 --- a/backend/open_webui/socket/utils.py +++ b/backend/open_webui/socket/utils.py @@ -1,7 +1,20 @@ import json import redis import uuid +from urllib.parse import urlparse +def parse_redis_sentinel_url(redis_url): + parsed_url = urlparse(redis_url) + if parsed_url.scheme != "redis": + raise ValueError("Invalid Redis URL scheme. Must be 'redis'.") + + return { + "username": parsed_url.username or None, + "password": parsed_url.password or None, + "service": parsed_url.hostname or 'mymaster', + "port": parsed_url.port or 6379, + "db": int(parsed_url.path.lstrip("/") or 0), + } class RedisLock: def __init__(self, redis_url, lock_name, timeout_secs): From 4370332e32730deb6db689f6d01456b0bd6e5076 Mon Sep 17 00:00:00 2001 From: Jan Kessler Date: Tue, 4 Mar 2025 08:42:37 +0100 Subject: [PATCH 2/7] second part of adding Redis Sentinel support --- backend/open_webui/socket/main.py | 10 +++++++--- backend/open_webui/socket/utils.py | 30 ++++++++++++++++++++++++++---- 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/backend/open_webui/socket/main.py b/backend/open_webui/socket/main.py index 61c4bcd46..4346d64e6 100644 --- a/backend/open_webui/socket/main.py +++ b/backend/open_webui/socket/main.py @@ -111,14 +111,18 @@ TIMEOUT_DURATION = 3 if WEBSOCKET_MANAGER == "redis": log.debug("Using Redis to manage websockets.") - SESSION_POOL = RedisDict("open-webui:session_pool", redis_url=WEBSOCKET_REDIS_URL) - USER_POOL = RedisDict("open-webui:user_pool", redis_url=WEBSOCKET_REDIS_URL) - USAGE_POOL = RedisDict("open-webui:usage_pool", redis_url=WEBSOCKET_REDIS_URL) + sentinel_hosts=WEBSOCKET_SENTINEL_HOSTS.split(',') + sentinel_port=int(WEBSOCKET_SENTINEL_PORT) + sentinels=[(host, sentinel_port) for host in sentinel_hosts] + SESSION_POOL = RedisDict("open-webui:session_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels) + USER_POOL = RedisDict("open-webui:user_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels) + USAGE_POOL = RedisDict("open-webui:usage_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels) clean_up_lock = RedisLock( redis_url=WEBSOCKET_REDIS_URL, lock_name="usage_cleanup_lock", timeout_secs=WEBSOCKET_REDIS_LOCK_TIMEOUT, + sentinels, ) aquire_func = clean_up_lock.aquire_lock renew_func = clean_up_lock.renew_lock diff --git a/backend/open_webui/socket/utils.py b/backend/open_webui/socket/utils.py index f5628ee1e..284dd3290 100644 --- a/backend/open_webui/socket/utils.py +++ b/backend/open_webui/socket/utils.py @@ -16,13 +16,35 @@ def parse_redis_sentinel_url(redis_url): "db": int(parsed_url.path.lstrip("/") or 0), } +def get_redis_connection(redis_url, sentinels, decode_responses=True): + """ + Creates a Redis connection from either a standard Redis URL or uses special + parsing to setup a Sentinel connection, if given an array of host/port tuples. + """ + if sentinels: + redis_config = parse_redis_sentinel_url(redis_url) + sentinel = redis.sentinel.Sentinel( + self.sentinels, + port=redis_config['port'], + db=redis_config['db'], + username=redis_config['username'], + password=redis_config['password'], + decode_responses=decode_responses + } + + # Get a master connection from Sentinel + return sentinel.master_for(redis_config['service']) + else: + # Standard Redis connection + return redis.Redis.from_url(redis_url, decode_responses=decode_responses) + class RedisLock: - def __init__(self, redis_url, lock_name, timeout_secs): + def __init__(self, redis_url, lock_name, timeout_secs, sentinels=[]): self.lock_name = lock_name self.lock_id = str(uuid.uuid4()) self.timeout_secs = timeout_secs self.lock_obtained = False - self.redis = redis.Redis.from_url(redis_url, decode_responses=True) + self.redis = get_redis_connection(redis_url, sentinels, decode_responses=True) def aquire_lock(self): # nx=True will only set this key if it _hasn't_ already been set @@ -44,9 +66,9 @@ class RedisLock: class RedisDict: - def __init__(self, name, redis_url): + def __init__(self, name, redis_url, sentinels=[]): self.name = name - self.redis = redis.Redis.from_url(redis_url, decode_responses=True) + self.redis = get_redis_connection(redis_url, sentinels, decode_responses=True) def __setitem__(self, key, value): serialized_value = json.dumps(value) From 9bf663934afb7e676938a6753dabf5863e53057f Mon Sep 17 00:00:00 2001 From: Jan Kessler Date: Tue, 4 Mar 2025 11:10:07 +0100 Subject: [PATCH 3/7] fixes & add envs for Sentinel implementation --- backend/open_webui/env.py | 4 ++ backend/open_webui/socket/main.py | 73 +++++++++++++++--------------- backend/open_webui/socket/utils.py | 4 +- 3 files changed, 43 insertions(+), 38 deletions(-) diff --git a/backend/open_webui/env.py b/backend/open_webui/env.py index 6c4d151b0..274cd9245 100644 --- a/backend/open_webui/env.py +++ b/backend/open_webui/env.py @@ -379,6 +379,10 @@ WEBSOCKET_MANAGER = os.environ.get("WEBSOCKET_MANAGER", "") WEBSOCKET_REDIS_URL = os.environ.get("WEBSOCKET_REDIS_URL", REDIS_URL) WEBSOCKET_REDIS_LOCK_TIMEOUT = os.environ.get("WEBSOCKET_REDIS_LOCK_TIMEOUT", 60) +WEBSOCKET_SENTINEL_HOSTS = os.environ.get("WEBSOCKET_SENTINEL_HOSTS", "") + +WEBSOCKET_SENTINEL_PORT = os.environ.get("WEBSOCKET_SENTINEL_PORT", "26379") + AIOHTTP_CLIENT_TIMEOUT = os.environ.get("AIOHTTP_CLIENT_TIMEOUT", "") if AIOHTTP_CLIENT_TIMEOUT == "": diff --git a/backend/open_webui/socket/main.py b/backend/open_webui/socket/main.py index 4346d64e6..f1f02956e 100644 --- a/backend/open_webui/socket/main.py +++ b/backend/open_webui/socket/main.py @@ -3,7 +3,7 @@ import socketio import logging import sys import time -from redis.sentinel import Sentinel +from redis import asyncio as aioredis from open_webui.models.users import Users, UserNameResponse from open_webui.models.channels import Channels @@ -26,52 +26,53 @@ from open_webui.env import ( ) class AsyncRedisSentinelManager(socketio.AsyncRedisManager): - def __init__(self, sentinel_hosts, sentinel_port=26379, redis_port=6379, service_name="mymaster", db=0, - username=None, password=None, channel='socketio', write_only=False, **kwargs): + def __init__(self, sentinel_hosts, sentinel_port=26379, redis_port=6379, service="mymaster", db=0, + username=None, password=None, channel='socketio', write_only=False, logger=None, redis_options=None): """ Initialize the Redis Sentinel Manager. + This implementation mostly replicates the __init__ of AsyncRedisManager and + overrides _redis_connect() with a version that uses Redis Sentinel :param sentinel_hosts: List of Sentinel hosts :param sentinel_port: Sentinel Port - :param redis_port: Redis Port - :param service_name: Master service name in Sentinel + :param redis_port: Redis Port (currently unsupported by aioredis!) + :param service: Master service name in Sentinel :param db: Redis database to use - :param username: Redis username (if any) + :param username: Redis username (if any) (currently unsupported by aioredis!) :param password: Redis password (if any) - :param channel: The Redis channel name - :param write_only: If set to True, only initialize the connection to send messages - :param kwargs: Additional connection arguments for Redis + :param channel: The channel name on which the server sends and receives + notifications. Must be the same in all the servers. + :param write_only: If set to ``True``, only initialize to emit events. The + default of ``False`` initializes the class for emitting + and receiving. + :param redis_options: additional keyword arguments to be passed to + ``aioredis.from_url()``. """ - self.sentinel_addresses = [(host, sentinel_port) for host in sentinel_hosts] - self.redis_port=redis_port - self.service_name = service_name - self.db = db - self.username = username - self.password = password - self.channel = channel - self.write_only = write_only - self.redis_kwargs = kwargs + self._sentinels = [(host, sentinel_port) for host in sentinel_hosts] + self._redis_port=redis_port + self._service = service + self._db = db + self._username = username + self._password = password + self._channel = channel + self.redis_options = redis_options or {} - # Skip parent's init but call grandparent's init - socketio.AsyncManager.__init__(self) + # connect and call grandparent constructor self._redis_connect() + super(socketio.AsyncRedisManager, self).__init__(channel=channel, write_only=write_only, logger=logger) def _redis_connect(self): """Establish connections to Redis through Sentinel.""" - sentinel = redis.sentinel.Sentinel( - self.sentinel_addresses, - port=self.redis_port, - db=self.db, - username=self.username, - password=self.password, - **self.redis_kwargs + sentinel = aioredis.sentinel.Sentinel( + self._sentinels, + port=self._redis_port, + db=self._db, + password=self._password, + **self.redis_options ) - # Get connections to the Redis master and slave - self.redis = sentinel.master_for(self.service_name) - if not self.write_only: - self.pubsub = sentinel.slave_for(self.service_name).pubsub() - self.pubsub.subscribe(self.channel) + self.redis = sentinel.master_for(self._service) + self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) logging.basicConfig(stream=sys.stdout, level=GLOBAL_LOG_LEVEL) @@ -114,15 +115,15 @@ if WEBSOCKET_MANAGER == "redis": sentinel_hosts=WEBSOCKET_SENTINEL_HOSTS.split(',') sentinel_port=int(WEBSOCKET_SENTINEL_PORT) sentinels=[(host, sentinel_port) for host in sentinel_hosts] - SESSION_POOL = RedisDict("open-webui:session_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels) - USER_POOL = RedisDict("open-webui:user_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels) - USAGE_POOL = RedisDict("open-webui:usage_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels) + SESSION_POOL = RedisDict("open-webui:session_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels=sentinels) + USER_POOL = RedisDict("open-webui:user_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels=sentinels) + USAGE_POOL = RedisDict("open-webui:usage_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels=sentinels) clean_up_lock = RedisLock( redis_url=WEBSOCKET_REDIS_URL, lock_name="usage_cleanup_lock", timeout_secs=WEBSOCKET_REDIS_LOCK_TIMEOUT, - sentinels, + sentinels=sentinels, ) aquire_func = clean_up_lock.aquire_lock renew_func = clean_up_lock.renew_lock diff --git a/backend/open_webui/socket/utils.py b/backend/open_webui/socket/utils.py index 284dd3290..b20366c6a 100644 --- a/backend/open_webui/socket/utils.py +++ b/backend/open_webui/socket/utils.py @@ -24,13 +24,13 @@ def get_redis_connection(redis_url, sentinels, decode_responses=True): if sentinels: redis_config = parse_redis_sentinel_url(redis_url) sentinel = redis.sentinel.Sentinel( - self.sentinels, + sentinels, port=redis_config['port'], db=redis_config['db'], username=redis_config['username'], password=redis_config['password'], decode_responses=decode_responses - } + ) # Get a master connection from Sentinel return sentinel.master_for(redis_config['service']) From 9167a8bef0addc2f9613effa582b43acc43fe607 Mon Sep 17 00:00:00 2001 From: Jan Kessler Date: Tue, 18 Mar 2025 08:25:31 +0100 Subject: [PATCH 4/7] refac as prep for sentinel support in AppConfig --- backend/open_webui/socket/main.py | 52 +----------------- backend/open_webui/socket/utils.py | 38 +------------- backend/open_webui/utils/redis.py | 84 ++++++++++++++++++++++++++++++ 3 files changed, 87 insertions(+), 87 deletions(-) create mode 100644 backend/open_webui/utils/redis.py diff --git a/backend/open_webui/socket/main.py b/backend/open_webui/socket/main.py index f1f02956e..925e28fbe 100644 --- a/backend/open_webui/socket/main.py +++ b/backend/open_webui/socket/main.py @@ -8,6 +8,7 @@ from redis import asyncio as aioredis from open_webui.models.users import Users, UserNameResponse from open_webui.models.channels import Channels from open_webui.models.chats import Chats +from open_webui.utils.redis import parse_redis_sentinel_url, AsyncRedisSentinelManager from open_webui.env import ( ENABLE_WEBSOCKET_SUPPORT, @@ -18,62 +19,13 @@ from open_webui.env import ( WEBSOCKET_SENTINEL_HOSTS, ) from open_webui.utils.auth import decode_token -from open_webui.socket.utils import RedisDict, RedisLock, parse_redis_sentinel_url +from open_webui.socket.utils import RedisDict, RedisLock from open_webui.env import ( GLOBAL_LOG_LEVEL, SRC_LOG_LEVELS, ) -class AsyncRedisSentinelManager(socketio.AsyncRedisManager): - def __init__(self, sentinel_hosts, sentinel_port=26379, redis_port=6379, service="mymaster", db=0, - username=None, password=None, channel='socketio', write_only=False, logger=None, redis_options=None): - """ - Initialize the Redis Sentinel Manager. - This implementation mostly replicates the __init__ of AsyncRedisManager and - overrides _redis_connect() with a version that uses Redis Sentinel - - :param sentinel_hosts: List of Sentinel hosts - :param sentinel_port: Sentinel Port - :param redis_port: Redis Port (currently unsupported by aioredis!) - :param service: Master service name in Sentinel - :param db: Redis database to use - :param username: Redis username (if any) (currently unsupported by aioredis!) - :param password: Redis password (if any) - :param channel: The channel name on which the server sends and receives - notifications. Must be the same in all the servers. - :param write_only: If set to ``True``, only initialize to emit events. The - default of ``False`` initializes the class for emitting - and receiving. - :param redis_options: additional keyword arguments to be passed to - ``aioredis.from_url()``. - """ - self._sentinels = [(host, sentinel_port) for host in sentinel_hosts] - self._redis_port=redis_port - self._service = service - self._db = db - self._username = username - self._password = password - self._channel = channel - self.redis_options = redis_options or {} - - # connect and call grandparent constructor - self._redis_connect() - super(socketio.AsyncRedisManager, self).__init__(channel=channel, write_only=write_only, logger=logger) - - def _redis_connect(self): - """Establish connections to Redis through Sentinel.""" - sentinel = aioredis.sentinel.Sentinel( - self._sentinels, - port=self._redis_port, - db=self._db, - password=self._password, - **self.redis_options - ) - - self.redis = sentinel.master_for(self._service) - self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) - logging.basicConfig(stream=sys.stdout, level=GLOBAL_LOG_LEVEL) log = logging.getLogger(__name__) diff --git a/backend/open_webui/socket/utils.py b/backend/open_webui/socket/utils.py index b20366c6a..00aad57e2 100644 --- a/backend/open_webui/socket/utils.py +++ b/backend/open_webui/socket/utils.py @@ -1,42 +1,6 @@ import json -import redis import uuid -from urllib.parse import urlparse - -def parse_redis_sentinel_url(redis_url): - parsed_url = urlparse(redis_url) - if parsed_url.scheme != "redis": - raise ValueError("Invalid Redis URL scheme. Must be 'redis'.") - - return { - "username": parsed_url.username or None, - "password": parsed_url.password or None, - "service": parsed_url.hostname or 'mymaster', - "port": parsed_url.port or 6379, - "db": int(parsed_url.path.lstrip("/") or 0), - } - -def get_redis_connection(redis_url, sentinels, decode_responses=True): - """ - Creates a Redis connection from either a standard Redis URL or uses special - parsing to setup a Sentinel connection, if given an array of host/port tuples. - """ - if sentinels: - redis_config = parse_redis_sentinel_url(redis_url) - sentinel = redis.sentinel.Sentinel( - sentinels, - port=redis_config['port'], - db=redis_config['db'], - username=redis_config['username'], - password=redis_config['password'], - decode_responses=decode_responses - ) - - # Get a master connection from Sentinel - return sentinel.master_for(redis_config['service']) - else: - # Standard Redis connection - return redis.Redis.from_url(redis_url, decode_responses=decode_responses) +from open_webui.utils.redis import get_redis_connection class RedisLock: def __init__(self, redis_url, lock_name, timeout_secs, sentinels=[]): diff --git a/backend/open_webui/utils/redis.py b/backend/open_webui/utils/redis.py new file mode 100644 index 000000000..07512fe13 --- /dev/null +++ b/backend/open_webui/utils/redis.py @@ -0,0 +1,84 @@ +import socketio +import redis +from redis import asyncio as aioredis +from urllib.parse import urlparse + +def parse_redis_sentinel_url(redis_url): + parsed_url = urlparse(redis_url) + if parsed_url.scheme != "redis": + raise ValueError("Invalid Redis URL scheme. Must be 'redis'.") + + return { + "username": parsed_url.username or None, + "password": parsed_url.password or None, + "service": parsed_url.hostname or 'mymaster', + "port": parsed_url.port or 6379, + "db": int(parsed_url.path.lstrip("/") or 0), + } + +def get_redis_connection(redis_url, sentinels, decode_responses=True): + if sentinels: + redis_config = parse_redis_sentinel_url(redis_url) + sentinel = redis.sentinel.Sentinel( + sentinels, + port=redis_config['port'], + db=redis_config['db'], + username=redis_config['username'], + password=redis_config['password'], + decode_responses=decode_responses + ) + + # Get a master connection from Sentinel + return sentinel.master_for(redis_config['service']) + else: + # Standard Redis connection + return redis.Redis.from_url(redis_url, decode_responses=decode_responses) + +class AsyncRedisSentinelManager(socketio.AsyncRedisManager): + def __init__(self, sentinel_hosts, sentinel_port=26379, redis_port=6379, service="mymaster", db=0, + username=None, password=None, channel='socketio', write_only=False, logger=None, redis_options=None): + """ + Initialize the Redis Sentinel Manager. + This implementation mostly replicates the __init__ of AsyncRedisManager and + overrides _redis_connect() with a version that uses Redis Sentinel + + :param sentinel_hosts: List of Sentinel hosts + :param sentinel_port: Sentinel Port + :param redis_port: Redis Port (currently unsupported by aioredis!) + :param service: Master service name in Sentinel + :param db: Redis database to use + :param username: Redis username (if any) (currently unsupported by aioredis!) + :param password: Redis password (if any) + :param channel: The channel name on which the server sends and receives + notifications. Must be the same in all the servers. + :param write_only: If set to ``True``, only initialize to emit events. The + default of ``False`` initializes the class for emitting + and receiving. + :param redis_options: additional keyword arguments to be passed to + ``aioredis.from_url()``. + """ + self._sentinels = [(host, sentinel_port) for host in sentinel_hosts] + self._redis_port=redis_port + self._service = service + self._db = db + self._username = username + self._password = password + self._channel = channel + self.redis_options = redis_options or {} + + # connect and call grandparent constructor + self._redis_connect() + super(socketio.AsyncRedisManager, self).__init__(channel=channel, write_only=write_only, logger=logger) + + def _redis_connect(self): + """Establish connections to Redis through Sentinel.""" + sentinel = aioredis.sentinel.Sentinel( + self._sentinels, + port=self._redis_port, + db=self._db, + password=self._password, + **self.redis_options + ) + + self.redis = sentinel.master_for(self._service) + self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) \ No newline at end of file From e68cd9b6718374f243d94bc2fb62dc4b6db25b08 Mon Sep 17 00:00:00 2001 From: Jan Kessler Date: Tue, 18 Mar 2025 09:28:47 +0100 Subject: [PATCH 5/7] Redis Sentinel support for AppConfig --- backend/open_webui/config.py | 8 +++++--- backend/open_webui/env.py | 2 ++ backend/open_webui/main.py | 7 ++++++- backend/open_webui/socket/main.py | 6 ++---- backend/open_webui/utils/redis.py | 9 +++++++-- 5 files changed, 22 insertions(+), 10 deletions(-) diff --git a/backend/open_webui/config.py b/backend/open_webui/config.py index d153c7dda..900645c83 100644 --- a/backend/open_webui/config.py +++ b/backend/open_webui/config.py @@ -19,6 +19,8 @@ from open_webui.env import ( DATABASE_URL, ENV, REDIS_URL, + SENTINEL_PORT, + SENTINEL_HOSTS, FRONTEND_BUILD_DIR, OFFLINE_MODE, OPEN_WEBUI_DIR, @@ -28,7 +30,7 @@ from open_webui.env import ( log, ) from open_webui.internal.db import Base, get_db - +from open_webui.utils.redis import get_redis_connection class EndpointFilter(logging.Filter): def filter(self, record: logging.LogRecord) -> bool: @@ -252,11 +254,11 @@ class AppConfig: _state: dict[str, PersistentConfig] _redis: Optional[redis.Redis] = None - def __init__(self, redis_url: Optional[str] = None): + def __init__(self, redis_url: Optional[str] = None, sentinels: Optional[list] = []): super().__setattr__("_state", {}) if redis_url: super().__setattr__( - "_redis", redis.Redis.from_url(redis_url, decode_responses=True) + "_redis", get_redis_connection(redis_url, sentinels, decode_responses=True) ) def __setattr__(self, key, value): diff --git a/backend/open_webui/env.py b/backend/open_webui/env.py index 274cd9245..1f7f58445 100644 --- a/backend/open_webui/env.py +++ b/backend/open_webui/env.py @@ -323,6 +323,8 @@ ENABLE_REALTIME_CHAT_SAVE = ( #################################### REDIS_URL = os.environ.get("REDIS_URL", "") +SENTINEL_HOSTS = os.environ.get("SENTINEL_HOSTS", "") +SENTINEL_PORT = os.environ.get("SENTINEL_PORT", "26379") #################################### # WEBUI_AUTH (Required for security) diff --git a/backend/open_webui/main.py b/backend/open_webui/main.py index 1ea79aa26..eeac2b3c7 100644 --- a/backend/open_webui/main.py +++ b/backend/open_webui/main.py @@ -315,6 +315,8 @@ from open_webui.env import ( AUDIT_LOG_LEVEL, CHANGELOG, REDIS_URL, + SENTINEL_HOSTS, + SENTINEL_PORT, GLOBAL_LOG_LEVEL, MAX_BODY_LOG_SIZE, SAFE_MODE, @@ -358,6 +360,9 @@ from open_webui.utils.security_headers import SecurityHeadersMiddleware from open_webui.tasks import stop_task, list_tasks # Import from tasks.py +from open_webui.utils.redis import get_sentinels_from_env + + if SAFE_MODE: print("SAFE MODE ENABLED") Functions.deactivate_all_functions() @@ -421,7 +426,7 @@ app = FastAPI( oauth_manager = OAuthManager(app) -app.state.config = AppConfig(redis_url=REDIS_URL) +app.state.config = AppConfig(redis_url=REDIS_URL, sentinels=get_sentinels_from_env(SENTINEL_HOSTS, SENTINEL_PORT)) app.state.WEBUI_NAME = WEBUI_NAME app.state.LICENSE_METADATA = None diff --git a/backend/open_webui/socket/main.py b/backend/open_webui/socket/main.py index 925e28fbe..b681d7e3b 100644 --- a/backend/open_webui/socket/main.py +++ b/backend/open_webui/socket/main.py @@ -8,7 +8,7 @@ from redis import asyncio as aioredis from open_webui.models.users import Users, UserNameResponse from open_webui.models.channels import Channels from open_webui.models.chats import Chats -from open_webui.utils.redis import parse_redis_sentinel_url, AsyncRedisSentinelManager +from open_webui.utils.redis import parse_redis_sentinel_url, get_sentinels_from_env, AsyncRedisSentinelManager from open_webui.env import ( ENABLE_WEBSOCKET_SUPPORT, @@ -64,9 +64,7 @@ TIMEOUT_DURATION = 3 if WEBSOCKET_MANAGER == "redis": log.debug("Using Redis to manage websockets.") - sentinel_hosts=WEBSOCKET_SENTINEL_HOSTS.split(',') - sentinel_port=int(WEBSOCKET_SENTINEL_PORT) - sentinels=[(host, sentinel_port) for host in sentinel_hosts] + sentinels=get_sentinels_from_env(WEBSOCKET_SENTINEL_HOSTS, WEBSOCKET_SENTINEL_PORT) SESSION_POOL = RedisDict("open-webui:session_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels=sentinels) USER_POOL = RedisDict("open-webui:user_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels=sentinels) USAGE_POOL = RedisDict("open-webui:usage_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels=sentinels) diff --git a/backend/open_webui/utils/redis.py b/backend/open_webui/utils/redis.py index 07512fe13..ff39f96ac 100644 --- a/backend/open_webui/utils/redis.py +++ b/backend/open_webui/utils/redis.py @@ -33,7 +33,12 @@ def get_redis_connection(redis_url, sentinels, decode_responses=True): else: # Standard Redis connection return redis.Redis.from_url(redis_url, decode_responses=decode_responses) - + +def get_sentinels_from_env(SENTINEL_HOSTS, SENTINEL_PORT): + sentinel_hosts=SENTINEL_HOSTS.split(',') + sentinel_port=int(SENTINEL_PORT) + return [(host, sentinel_port) for host in sentinel_hosts] + class AsyncRedisSentinelManager(socketio.AsyncRedisManager): def __init__(self, sentinel_hosts, sentinel_port=26379, redis_port=6379, service="mymaster", db=0, username=None, password=None, channel='socketio', write_only=False, logger=None, redis_options=None): @@ -81,4 +86,4 @@ class AsyncRedisSentinelManager(socketio.AsyncRedisManager): ) self.redis = sentinel.master_for(self._service) - self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) \ No newline at end of file + self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) From d0b13cf38814db23842e50200991f43d00bbf688 Mon Sep 17 00:00:00 2001 From: Jan Kessler Date: Thu, 27 Mar 2025 08:51:55 +0100 Subject: [PATCH 6/7] prefix sentinel envs with redis_ --- backend/open_webui/config.py | 8 ++++---- backend/open_webui/env.py | 4 ++-- backend/open_webui/main.py | 6 +++--- backend/open_webui/socket/main.py | 10 +++++----- backend/open_webui/socket/utils.py | 8 ++++---- backend/open_webui/utils/redis.py | 12 ++++++------ 6 files changed, 24 insertions(+), 24 deletions(-) diff --git a/backend/open_webui/config.py b/backend/open_webui/config.py index 900645c83..3827f4183 100644 --- a/backend/open_webui/config.py +++ b/backend/open_webui/config.py @@ -19,8 +19,8 @@ from open_webui.env import ( DATABASE_URL, ENV, REDIS_URL, - SENTINEL_PORT, - SENTINEL_HOSTS, + REDIS_SENTINEL_HOSTS, + REDIS_SENTINEL_PORT, FRONTEND_BUILD_DIR, OFFLINE_MODE, OPEN_WEBUI_DIR, @@ -254,11 +254,11 @@ class AppConfig: _state: dict[str, PersistentConfig] _redis: Optional[redis.Redis] = None - def __init__(self, redis_url: Optional[str] = None, sentinels: Optional[list] = []): + def __init__(self, redis_url: Optional[str] = None, redis_sentinels: Optional[list] = []): super().__setattr__("_state", {}) if redis_url: super().__setattr__( - "_redis", get_redis_connection(redis_url, sentinels, decode_responses=True) + "_redis", get_redis_connection(redis_url, redis_sentinels, decode_responses=True) ) def __setattr__(self, key, value): diff --git a/backend/open_webui/env.py b/backend/open_webui/env.py index 1f7f58445..0c1ee61a0 100644 --- a/backend/open_webui/env.py +++ b/backend/open_webui/env.py @@ -323,8 +323,8 @@ ENABLE_REALTIME_CHAT_SAVE = ( #################################### REDIS_URL = os.environ.get("REDIS_URL", "") -SENTINEL_HOSTS = os.environ.get("SENTINEL_HOSTS", "") -SENTINEL_PORT = os.environ.get("SENTINEL_PORT", "26379") +REDIS_SENTINEL_HOSTS = os.environ.get("REDIS_SENTINEL_HOSTS", "") +REDIS_SENTINEL_PORT = os.environ.get("REDIS_SENTINEL_PORT", "26379") #################################### # WEBUI_AUTH (Required for security) diff --git a/backend/open_webui/main.py b/backend/open_webui/main.py index eeac2b3c7..d2c435857 100644 --- a/backend/open_webui/main.py +++ b/backend/open_webui/main.py @@ -315,8 +315,8 @@ from open_webui.env import ( AUDIT_LOG_LEVEL, CHANGELOG, REDIS_URL, - SENTINEL_HOSTS, - SENTINEL_PORT, + REDIS_SENTINEL_HOSTS, + REDIS_SENTINEL_PORT, GLOBAL_LOG_LEVEL, MAX_BODY_LOG_SIZE, SAFE_MODE, @@ -426,7 +426,7 @@ app = FastAPI( oauth_manager = OAuthManager(app) -app.state.config = AppConfig(redis_url=REDIS_URL, sentinels=get_sentinels_from_env(SENTINEL_HOSTS, SENTINEL_PORT)) +app.state.config = AppConfig(redis_url=REDIS_URL, redis_sentinels=get_sentinels_from_env(REDIS_SENTINEL_HOSTS, REDIS_SENTINEL_PORT)) app.state.WEBUI_NAME = WEBUI_NAME app.state.LICENSE_METADATA = None diff --git a/backend/open_webui/socket/main.py b/backend/open_webui/socket/main.py index b681d7e3b..0d391c73d 100644 --- a/backend/open_webui/socket/main.py +++ b/backend/open_webui/socket/main.py @@ -64,16 +64,16 @@ TIMEOUT_DURATION = 3 if WEBSOCKET_MANAGER == "redis": log.debug("Using Redis to manage websockets.") - sentinels=get_sentinels_from_env(WEBSOCKET_SENTINEL_HOSTS, WEBSOCKET_SENTINEL_PORT) - SESSION_POOL = RedisDict("open-webui:session_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels=sentinels) - USER_POOL = RedisDict("open-webui:user_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels=sentinels) - USAGE_POOL = RedisDict("open-webui:usage_pool", redis_url=WEBSOCKET_REDIS_URL, sentinels=sentinels) + redis_sentinels=get_sentinels_from_env(WEBSOCKET_SENTINEL_HOSTS, WEBSOCKET_SENTINEL_PORT) + SESSION_POOL = RedisDict("open-webui:session_pool", redis_url=WEBSOCKET_REDIS_URL, redis_sentinels=redis_sentinels) + USER_POOL = RedisDict("open-webui:user_pool", redis_url=WEBSOCKET_REDIS_URL, redis_sentinels=redis_sentinels) + USAGE_POOL = RedisDict("open-webui:usage_pool", redis_url=WEBSOCKET_REDIS_URL, redis_sentinels=redis_sentinels) clean_up_lock = RedisLock( redis_url=WEBSOCKET_REDIS_URL, lock_name="usage_cleanup_lock", timeout_secs=WEBSOCKET_REDIS_LOCK_TIMEOUT, - sentinels=sentinels, + redis_sentinels=redis_sentinels, ) aquire_func = clean_up_lock.aquire_lock renew_func = clean_up_lock.renew_lock diff --git a/backend/open_webui/socket/utils.py b/backend/open_webui/socket/utils.py index 00aad57e2..a63815c02 100644 --- a/backend/open_webui/socket/utils.py +++ b/backend/open_webui/socket/utils.py @@ -3,12 +3,12 @@ import uuid from open_webui.utils.redis import get_redis_connection class RedisLock: - def __init__(self, redis_url, lock_name, timeout_secs, sentinels=[]): + def __init__(self, redis_url, lock_name, timeout_secs, redis_sentinels=[]): self.lock_name = lock_name self.lock_id = str(uuid.uuid4()) self.timeout_secs = timeout_secs self.lock_obtained = False - self.redis = get_redis_connection(redis_url, sentinels, decode_responses=True) + self.redis = get_redis_connection(redis_url, redis_sentinels, decode_responses=True) def aquire_lock(self): # nx=True will only set this key if it _hasn't_ already been set @@ -30,9 +30,9 @@ class RedisLock: class RedisDict: - def __init__(self, name, redis_url, sentinels=[]): + def __init__(self, name, redis_url, redis_sentinels=[]): self.name = name - self.redis = get_redis_connection(redis_url, sentinels, decode_responses=True) + self.redis = get_redis_connection(redis_url, redis_sentinels, decode_responses=True) def __setitem__(self, key, value): serialized_value = json.dumps(value) diff --git a/backend/open_webui/utils/redis.py b/backend/open_webui/utils/redis.py index ff39f96ac..3ad1f78be 100644 --- a/backend/open_webui/utils/redis.py +++ b/backend/open_webui/utils/redis.py @@ -16,11 +16,11 @@ def parse_redis_sentinel_url(redis_url): "db": int(parsed_url.path.lstrip("/") or 0), } -def get_redis_connection(redis_url, sentinels, decode_responses=True): - if sentinels: +def get_redis_connection(redis_url, redis_sentinels, decode_responses=True): + if redis_sentinels: redis_config = parse_redis_sentinel_url(redis_url) sentinel = redis.sentinel.Sentinel( - sentinels, + redis_sentinels, port=redis_config['port'], db=redis_config['db'], username=redis_config['username'], @@ -34,9 +34,9 @@ def get_redis_connection(redis_url, sentinels, decode_responses=True): # Standard Redis connection return redis.Redis.from_url(redis_url, decode_responses=decode_responses) -def get_sentinels_from_env(SENTINEL_HOSTS, SENTINEL_PORT): - sentinel_hosts=SENTINEL_HOSTS.split(',') - sentinel_port=int(SENTINEL_PORT) +def get_sentinels_from_env(sentinel_hosts_env, sentinel_port_env): + sentinel_hosts=sentinel_hosts_env.split(',') + sentinel_port=int(sentinel_port_env) return [(host, sentinel_port) for host in sentinel_hosts] class AsyncRedisSentinelManager(socketio.AsyncRedisManager): From 0615c11a53ca67def3a45bd2c6ca3256479bb14f Mon Sep 17 00:00:00 2001 From: Jan Kessler Date: Thu, 27 Mar 2025 10:22:49 +0100 Subject: [PATCH 7/7] fix sentinel connection being attempted for non-sentinel redis --- backend/open_webui/utils/redis.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/backend/open_webui/utils/redis.py b/backend/open_webui/utils/redis.py index 3ad1f78be..fa90a26db 100644 --- a/backend/open_webui/utils/redis.py +++ b/backend/open_webui/utils/redis.py @@ -35,9 +35,11 @@ def get_redis_connection(redis_url, redis_sentinels, decode_responses=True): return redis.Redis.from_url(redis_url, decode_responses=decode_responses) def get_sentinels_from_env(sentinel_hosts_env, sentinel_port_env): - sentinel_hosts=sentinel_hosts_env.split(',') - sentinel_port=int(sentinel_port_env) - return [(host, sentinel_port) for host in sentinel_hosts] + if sentinel_hosts_env: + sentinel_hosts=sentinel_hosts_env.split(',') + sentinel_port=int(sentinel_port_env) + return [(host, sentinel_port) for host in sentinel_hosts] + return [] class AsyncRedisSentinelManager(socketio.AsyncRedisManager): def __init__(self, sentinel_hosts, sentinel_port=26379, redis_port=6379, service="mymaster", db=0,