mirror of
				https://github.com/open-webui/open-webui
				synced 2025-06-26 18:26:48 +00:00 
			
		
		
		
	prepare websocket redis sentinel code for upcoming native support of sentinel in python-socketio
This commit is contained in:
		
							parent
							
								
									d0db4756a6
								
							
						
					
					
						commit
						35ea29b184
					
				@ -9,9 +9,8 @@ from open_webui.models.users import Users, UserNameResponse
 | 
			
		||||
from open_webui.models.channels import Channels
 | 
			
		||||
from open_webui.models.chats import Chats
 | 
			
		||||
from open_webui.utils.redis import (
 | 
			
		||||
    parse_redis_sentinel_url,
 | 
			
		||||
    get_sentinels_from_env,
 | 
			
		||||
    AsyncRedisSentinelManager,
 | 
			
		||||
    get_sentinel_url_from_env,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
from open_webui.env import (
 | 
			
		||||
@ -38,16 +37,7 @@ log.setLevel(SRC_LOG_LEVELS["SOCKET"])
 | 
			
		||||
 | 
			
		||||
if WEBSOCKET_MANAGER == "redis":
 | 
			
		||||
    if WEBSOCKET_SENTINEL_HOSTS:
 | 
			
		||||
        redis_config = parse_redis_sentinel_url(WEBSOCKET_REDIS_URL)
 | 
			
		||||
        mgr = AsyncRedisSentinelManager(
 | 
			
		||||
            WEBSOCKET_SENTINEL_HOSTS.split(","),
 | 
			
		||||
            sentinel_port=int(WEBSOCKET_SENTINEL_PORT),
 | 
			
		||||
            redis_port=redis_config["port"],
 | 
			
		||||
            service=redis_config["service"],
 | 
			
		||||
            db=redis_config["db"],
 | 
			
		||||
            username=redis_config["username"],
 | 
			
		||||
            password=redis_config["password"],
 | 
			
		||||
        )
 | 
			
		||||
        mgr = socketio.AsyncRedisManager(get_sentinel_url_from_env(WEBSOCKET_REDIS_URL, WEBSOCKET_SENTINEL_HOSTS, WEBSOCKET_SENTINEL_PORT))
 | 
			
		||||
    else:
 | 
			
		||||
        mgr = socketio.AsyncRedisManager(WEBSOCKET_REDIS_URL)
 | 
			
		||||
    sio = socketio.AsyncServer(
 | 
			
		||||
 | 
			
		||||
@ -4,7 +4,7 @@ from redis import asyncio as aioredis
 | 
			
		||||
from urllib.parse import urlparse
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def parse_redis_sentinel_url(redis_url):
 | 
			
		||||
def parse_redis_service_url(redis_url):
 | 
			
		||||
    parsed_url = urlparse(redis_url)
 | 
			
		||||
    if parsed_url.scheme != "redis":
 | 
			
		||||
        raise ValueError("Invalid Redis URL scheme. Must be 'redis'.")
 | 
			
		||||
@ -20,7 +20,7 @@ def parse_redis_sentinel_url(redis_url):
 | 
			
		||||
 | 
			
		||||
def get_redis_connection(redis_url, redis_sentinels, decode_responses=True):
 | 
			
		||||
    if redis_sentinels:
 | 
			
		||||
        redis_config = parse_redis_sentinel_url(redis_url)
 | 
			
		||||
        redis_config = parse_redis_service_url(redis_url)
 | 
			
		||||
        sentinel = redis.sentinel.Sentinel(
 | 
			
		||||
            redis_sentinels,
 | 
			
		||||
            port=redis_config["port"],
 | 
			
		||||
@ -45,65 +45,10 @@ def get_sentinels_from_env(sentinel_hosts_env, sentinel_port_env):
 | 
			
		||||
    return []
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class AsyncRedisSentinelManager(socketio.AsyncRedisManager):
 | 
			
		||||
    def __init__(
 | 
			
		||||
        self,
 | 
			
		||||
        sentinel_hosts,
 | 
			
		||||
        sentinel_port=26379,
 | 
			
		||||
        redis_port=6379,
 | 
			
		||||
        service="mymaster",
 | 
			
		||||
        db=0,
 | 
			
		||||
        username=None,
 | 
			
		||||
        password=None,
 | 
			
		||||
        channel="socketio",
 | 
			
		||||
        write_only=False,
 | 
			
		||||
        logger=None,
 | 
			
		||||
        redis_options=None,
 | 
			
		||||
    ):
 | 
			
		||||
        """
 | 
			
		||||
        Initialize the Redis Sentinel Manager.
 | 
			
		||||
        This implementation mostly replicates the __init__ of AsyncRedisManager and
 | 
			
		||||
        overrides _redis_connect() with a version that uses Redis Sentinel
 | 
			
		||||
 | 
			
		||||
        :param sentinel_hosts: List of Sentinel hosts
 | 
			
		||||
        :param sentinel_port: Sentinel Port
 | 
			
		||||
        :param redis_port: Redis Port (currently unsupported by aioredis!)
 | 
			
		||||
        :param service: Master service name in Sentinel
 | 
			
		||||
        :param db: Redis database to use
 | 
			
		||||
        :param username: Redis username (if any) (currently unsupported by aioredis!)
 | 
			
		||||
        :param password: Redis password (if any)
 | 
			
		||||
        :param channel: The channel name on which the server sends and receives
 | 
			
		||||
                        notifications. Must be the same in all the servers.
 | 
			
		||||
        :param write_only: If set to ``True``, only initialize to emit events. The
 | 
			
		||||
                           default of ``False`` initializes the class for emitting
 | 
			
		||||
                           and receiving.
 | 
			
		||||
        :param redis_options: additional keyword arguments to be passed to
 | 
			
		||||
                              ``aioredis.from_url()``.
 | 
			
		||||
        """
 | 
			
		||||
        self._sentinels = [(host, sentinel_port) for host in sentinel_hosts]
 | 
			
		||||
        self._redis_port = redis_port
 | 
			
		||||
        self._service = service
 | 
			
		||||
        self._db = db
 | 
			
		||||
        self._username = username
 | 
			
		||||
        self._password = password
 | 
			
		||||
        self._channel = channel
 | 
			
		||||
        self.redis_options = redis_options or {}
 | 
			
		||||
 | 
			
		||||
        # connect and call grandparent constructor
 | 
			
		||||
        self._redis_connect()
 | 
			
		||||
        super(socketio.AsyncRedisManager, self).__init__(
 | 
			
		||||
            channel=channel, write_only=write_only, logger=logger
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def _redis_connect(self):
 | 
			
		||||
        """Establish connections to Redis through Sentinel."""
 | 
			
		||||
        sentinel = aioredis.sentinel.Sentinel(
 | 
			
		||||
            self._sentinels,
 | 
			
		||||
            port=self._redis_port,
 | 
			
		||||
            db=self._db,
 | 
			
		||||
            password=self._password,
 | 
			
		||||
            **self.redis_options,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        self.redis = sentinel.master_for(self._service)
 | 
			
		||||
        self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
 | 
			
		||||
def get_sentinel_url_from_env(redis_url, sentinel_hosts_env, sentinel_port_env):
 | 
			
		||||
    redis_config = parse_redis_service_url(redis_url)
 | 
			
		||||
    username = redis_config["username"] or ""
 | 
			
		||||
    password = redis_config["password"] or ""
 | 
			
		||||
    auth_part = f"{username}:{password}"
 | 
			
		||||
    hosts_part = ",".join(f"{host}:{sentinel_port_env}" for host in sentinel_hosts_env.split(","))
 | 
			
		||||
    return f"redis+sentinel://{auth_part}@{hosts_part}/{redis_config['db']}/{redis_config['service']}"
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user