diff --git a/backend/open_webui/socket/main.py b/backend/open_webui/socket/main.py index f1f02956e..925e28fbe 100644 --- a/backend/open_webui/socket/main.py +++ b/backend/open_webui/socket/main.py @@ -8,6 +8,7 @@ from redis import asyncio as aioredis from open_webui.models.users import Users, UserNameResponse from open_webui.models.channels import Channels from open_webui.models.chats import Chats +from open_webui.utils.redis import parse_redis_sentinel_url, AsyncRedisSentinelManager from open_webui.env import ( ENABLE_WEBSOCKET_SUPPORT, @@ -18,62 +19,13 @@ from open_webui.env import ( WEBSOCKET_SENTINEL_HOSTS, ) from open_webui.utils.auth import decode_token -from open_webui.socket.utils import RedisDict, RedisLock, parse_redis_sentinel_url +from open_webui.socket.utils import RedisDict, RedisLock from open_webui.env import ( GLOBAL_LOG_LEVEL, SRC_LOG_LEVELS, ) -class AsyncRedisSentinelManager(socketio.AsyncRedisManager): - def __init__(self, sentinel_hosts, sentinel_port=26379, redis_port=6379, service="mymaster", db=0, - username=None, password=None, channel='socketio', write_only=False, logger=None, redis_options=None): - """ - Initialize the Redis Sentinel Manager. - This implementation mostly replicates the __init__ of AsyncRedisManager and - overrides _redis_connect() with a version that uses Redis Sentinel - - :param sentinel_hosts: List of Sentinel hosts - :param sentinel_port: Sentinel Port - :param redis_port: Redis Port (currently unsupported by aioredis!) - :param service: Master service name in Sentinel - :param db: Redis database to use - :param username: Redis username (if any) (currently unsupported by aioredis!) - :param password: Redis password (if any) - :param channel: The channel name on which the server sends and receives - notifications. Must be the same in all the servers. - :param write_only: If set to ``True``, only initialize to emit events. The - default of ``False`` initializes the class for emitting - and receiving. - :param redis_options: additional keyword arguments to be passed to - ``aioredis.from_url()``. - """ - self._sentinels = [(host, sentinel_port) for host in sentinel_hosts] - self._redis_port=redis_port - self._service = service - self._db = db - self._username = username - self._password = password - self._channel = channel - self.redis_options = redis_options or {} - - # connect and call grandparent constructor - self._redis_connect() - super(socketio.AsyncRedisManager, self).__init__(channel=channel, write_only=write_only, logger=logger) - - def _redis_connect(self): - """Establish connections to Redis through Sentinel.""" - sentinel = aioredis.sentinel.Sentinel( - self._sentinels, - port=self._redis_port, - db=self._db, - password=self._password, - **self.redis_options - ) - - self.redis = sentinel.master_for(self._service) - self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) - logging.basicConfig(stream=sys.stdout, level=GLOBAL_LOG_LEVEL) log = logging.getLogger(__name__) diff --git a/backend/open_webui/socket/utils.py b/backend/open_webui/socket/utils.py index b20366c6a..00aad57e2 100644 --- a/backend/open_webui/socket/utils.py +++ b/backend/open_webui/socket/utils.py @@ -1,42 +1,6 @@ import json -import redis import uuid -from urllib.parse import urlparse - -def parse_redis_sentinel_url(redis_url): - parsed_url = urlparse(redis_url) - if parsed_url.scheme != "redis": - raise ValueError("Invalid Redis URL scheme. Must be 'redis'.") - - return { - "username": parsed_url.username or None, - "password": parsed_url.password or None, - "service": parsed_url.hostname or 'mymaster', - "port": parsed_url.port or 6379, - "db": int(parsed_url.path.lstrip("/") or 0), - } - -def get_redis_connection(redis_url, sentinels, decode_responses=True): - """ - Creates a Redis connection from either a standard Redis URL or uses special - parsing to setup a Sentinel connection, if given an array of host/port tuples. - """ - if sentinels: - redis_config = parse_redis_sentinel_url(redis_url) - sentinel = redis.sentinel.Sentinel( - sentinels, - port=redis_config['port'], - db=redis_config['db'], - username=redis_config['username'], - password=redis_config['password'], - decode_responses=decode_responses - ) - - # Get a master connection from Sentinel - return sentinel.master_for(redis_config['service']) - else: - # Standard Redis connection - return redis.Redis.from_url(redis_url, decode_responses=decode_responses) +from open_webui.utils.redis import get_redis_connection class RedisLock: def __init__(self, redis_url, lock_name, timeout_secs, sentinels=[]): diff --git a/backend/open_webui/utils/redis.py b/backend/open_webui/utils/redis.py new file mode 100644 index 000000000..07512fe13 --- /dev/null +++ b/backend/open_webui/utils/redis.py @@ -0,0 +1,84 @@ +import socketio +import redis +from redis import asyncio as aioredis +from urllib.parse import urlparse + +def parse_redis_sentinel_url(redis_url): + parsed_url = urlparse(redis_url) + if parsed_url.scheme != "redis": + raise ValueError("Invalid Redis URL scheme. Must be 'redis'.") + + return { + "username": parsed_url.username or None, + "password": parsed_url.password or None, + "service": parsed_url.hostname or 'mymaster', + "port": parsed_url.port or 6379, + "db": int(parsed_url.path.lstrip("/") or 0), + } + +def get_redis_connection(redis_url, sentinels, decode_responses=True): + if sentinels: + redis_config = parse_redis_sentinel_url(redis_url) + sentinel = redis.sentinel.Sentinel( + sentinels, + port=redis_config['port'], + db=redis_config['db'], + username=redis_config['username'], + password=redis_config['password'], + decode_responses=decode_responses + ) + + # Get a master connection from Sentinel + return sentinel.master_for(redis_config['service']) + else: + # Standard Redis connection + return redis.Redis.from_url(redis_url, decode_responses=decode_responses) + +class AsyncRedisSentinelManager(socketio.AsyncRedisManager): + def __init__(self, sentinel_hosts, sentinel_port=26379, redis_port=6379, service="mymaster", db=0, + username=None, password=None, channel='socketio', write_only=False, logger=None, redis_options=None): + """ + Initialize the Redis Sentinel Manager. + This implementation mostly replicates the __init__ of AsyncRedisManager and + overrides _redis_connect() with a version that uses Redis Sentinel + + :param sentinel_hosts: List of Sentinel hosts + :param sentinel_port: Sentinel Port + :param redis_port: Redis Port (currently unsupported by aioredis!) + :param service: Master service name in Sentinel + :param db: Redis database to use + :param username: Redis username (if any) (currently unsupported by aioredis!) + :param password: Redis password (if any) + :param channel: The channel name on which the server sends and receives + notifications. Must be the same in all the servers. + :param write_only: If set to ``True``, only initialize to emit events. The + default of ``False`` initializes the class for emitting + and receiving. + :param redis_options: additional keyword arguments to be passed to + ``aioredis.from_url()``. + """ + self._sentinels = [(host, sentinel_port) for host in sentinel_hosts] + self._redis_port=redis_port + self._service = service + self._db = db + self._username = username + self._password = password + self._channel = channel + self.redis_options = redis_options or {} + + # connect and call grandparent constructor + self._redis_connect() + super(socketio.AsyncRedisManager, self).__init__(channel=channel, write_only=write_only, logger=logger) + + def _redis_connect(self): + """Establish connections to Redis through Sentinel.""" + sentinel = aioredis.sentinel.Sentinel( + self._sentinels, + port=self._redis_port, + db=self._db, + password=self._password, + **self.redis_options + ) + + self.redis = sentinel.master_for(self._service) + self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) \ No newline at end of file