mirror of
https://github.com/open-webui/open-webui
synced 2025-05-14 18:46:02 +00:00
Merge pull request #12366 from Ithanil/redis_sentinel_2
chore: adapt websocket redis sentinel code to new native support in python-socketio
This commit is contained in:
commit
02ba7aeb95
@ -9,9 +9,8 @@ from open_webui.models.users import Users, UserNameResponse
|
|||||||
from open_webui.models.channels import Channels
|
from open_webui.models.channels import Channels
|
||||||
from open_webui.models.chats import Chats
|
from open_webui.models.chats import Chats
|
||||||
from open_webui.utils.redis import (
|
from open_webui.utils.redis import (
|
||||||
parse_redis_sentinel_url,
|
|
||||||
get_sentinels_from_env,
|
get_sentinels_from_env,
|
||||||
AsyncRedisSentinelManager,
|
get_sentinel_url_from_env,
|
||||||
)
|
)
|
||||||
|
|
||||||
from open_webui.env import (
|
from open_webui.env import (
|
||||||
@ -38,16 +37,7 @@ log.setLevel(SRC_LOG_LEVELS["SOCKET"])
|
|||||||
|
|
||||||
if WEBSOCKET_MANAGER == "redis":
|
if WEBSOCKET_MANAGER == "redis":
|
||||||
if WEBSOCKET_SENTINEL_HOSTS:
|
if WEBSOCKET_SENTINEL_HOSTS:
|
||||||
redis_config = parse_redis_sentinel_url(WEBSOCKET_REDIS_URL)
|
mgr = socketio.AsyncRedisManager(get_sentinel_url_from_env(WEBSOCKET_REDIS_URL, WEBSOCKET_SENTINEL_HOSTS, WEBSOCKET_SENTINEL_PORT))
|
||||||
mgr = AsyncRedisSentinelManager(
|
|
||||||
WEBSOCKET_SENTINEL_HOSTS.split(","),
|
|
||||||
sentinel_port=int(WEBSOCKET_SENTINEL_PORT),
|
|
||||||
redis_port=redis_config["port"],
|
|
||||||
service=redis_config["service"],
|
|
||||||
db=redis_config["db"],
|
|
||||||
username=redis_config["username"],
|
|
||||||
password=redis_config["password"],
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
mgr = socketio.AsyncRedisManager(WEBSOCKET_REDIS_URL)
|
mgr = socketio.AsyncRedisManager(WEBSOCKET_REDIS_URL)
|
||||||
sio = socketio.AsyncServer(
|
sio = socketio.AsyncServer(
|
||||||
|
@ -4,7 +4,7 @@ from redis import asyncio as aioredis
|
|||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
|
|
||||||
def parse_redis_sentinel_url(redis_url):
|
def parse_redis_service_url(redis_url):
|
||||||
parsed_url = urlparse(redis_url)
|
parsed_url = urlparse(redis_url)
|
||||||
if parsed_url.scheme != "redis":
|
if parsed_url.scheme != "redis":
|
||||||
raise ValueError("Invalid Redis URL scheme. Must be 'redis'.")
|
raise ValueError("Invalid Redis URL scheme. Must be 'redis'.")
|
||||||
@ -20,7 +20,7 @@ def parse_redis_sentinel_url(redis_url):
|
|||||||
|
|
||||||
def get_redis_connection(redis_url, redis_sentinels, decode_responses=True):
|
def get_redis_connection(redis_url, redis_sentinels, decode_responses=True):
|
||||||
if redis_sentinels:
|
if redis_sentinels:
|
||||||
redis_config = parse_redis_sentinel_url(redis_url)
|
redis_config = parse_redis_service_url(redis_url)
|
||||||
sentinel = redis.sentinel.Sentinel(
|
sentinel = redis.sentinel.Sentinel(
|
||||||
redis_sentinels,
|
redis_sentinels,
|
||||||
port=redis_config["port"],
|
port=redis_config["port"],
|
||||||
@ -45,65 +45,12 @@ def get_sentinels_from_env(sentinel_hosts_env, sentinel_port_env):
|
|||||||
return []
|
return []
|
||||||
|
|
||||||
|
|
||||||
class AsyncRedisSentinelManager(socketio.AsyncRedisManager):
|
def get_sentinel_url_from_env(redis_url, sentinel_hosts_env, sentinel_port_env):
|
||||||
def __init__(
|
redis_config = parse_redis_service_url(redis_url)
|
||||||
self,
|
username = redis_config["username"] or ""
|
||||||
sentinel_hosts,
|
password = redis_config["password"] or ""
|
||||||
sentinel_port=26379,
|
auth_part = ""
|
||||||
redis_port=6379,
|
if username or password:
|
||||||
service="mymaster",
|
auth_part = f"{username}:{password}@"
|
||||||
db=0,
|
hosts_part = ",".join(f"{host}:{sentinel_port_env}" for host in sentinel_hosts_env.split(","))
|
||||||
username=None,
|
return f"redis+sentinel://{auth_part}{hosts_part}/{redis_config['db']}/{redis_config['service']}"
|
||||||
password=None,
|
|
||||||
channel="socketio",
|
|
||||||
write_only=False,
|
|
||||||
logger=None,
|
|
||||||
redis_options=None,
|
|
||||||
):
|
|
||||||
"""
|
|
||||||
Initialize the Redis Sentinel Manager.
|
|
||||||
This implementation mostly replicates the __init__ of AsyncRedisManager and
|
|
||||||
overrides _redis_connect() with a version that uses Redis Sentinel
|
|
||||||
|
|
||||||
:param sentinel_hosts: List of Sentinel hosts
|
|
||||||
:param sentinel_port: Sentinel Port
|
|
||||||
:param redis_port: Redis Port (currently unsupported by aioredis!)
|
|
||||||
:param service: Master service name in Sentinel
|
|
||||||
:param db: Redis database to use
|
|
||||||
:param username: Redis username (if any) (currently unsupported by aioredis!)
|
|
||||||
:param password: Redis password (if any)
|
|
||||||
:param channel: The channel name on which the server sends and receives
|
|
||||||
notifications. Must be the same in all the servers.
|
|
||||||
:param write_only: If set to ``True``, only initialize to emit events. The
|
|
||||||
default of ``False`` initializes the class for emitting
|
|
||||||
and receiving.
|
|
||||||
:param redis_options: additional keyword arguments to be passed to
|
|
||||||
``aioredis.from_url()``.
|
|
||||||
"""
|
|
||||||
self._sentinels = [(host, sentinel_port) for host in sentinel_hosts]
|
|
||||||
self._redis_port = redis_port
|
|
||||||
self._service = service
|
|
||||||
self._db = db
|
|
||||||
self._username = username
|
|
||||||
self._password = password
|
|
||||||
self._channel = channel
|
|
||||||
self.redis_options = redis_options or {}
|
|
||||||
|
|
||||||
# connect and call grandparent constructor
|
|
||||||
self._redis_connect()
|
|
||||||
super(socketio.AsyncRedisManager, self).__init__(
|
|
||||||
channel=channel, write_only=write_only, logger=logger
|
|
||||||
)
|
|
||||||
|
|
||||||
def _redis_connect(self):
|
|
||||||
"""Establish connections to Redis through Sentinel."""
|
|
||||||
sentinel = aioredis.sentinel.Sentinel(
|
|
||||||
self._sentinels,
|
|
||||||
port=self._redis_port,
|
|
||||||
db=self._db,
|
|
||||||
password=self._password,
|
|
||||||
**self.redis_options,
|
|
||||||
)
|
|
||||||
|
|
||||||
self.redis = sentinel.master_for(self._service)
|
|
||||||
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
|
|
||||||
|
@ -3,7 +3,7 @@ uvicorn[standard]==0.34.0
|
|||||||
pydantic==2.10.6
|
pydantic==2.10.6
|
||||||
python-multipart==0.0.20
|
python-multipart==0.0.20
|
||||||
|
|
||||||
python-socketio==5.11.3
|
python-socketio==5.13.0
|
||||||
python-jose==3.4.0
|
python-jose==3.4.0
|
||||||
passlib[bcrypt]==1.7.4
|
passlib[bcrypt]==1.7.4
|
||||||
|
|
||||||
|
@ -11,7 +11,7 @@ dependencies = [
|
|||||||
"pydantic==2.10.6",
|
"pydantic==2.10.6",
|
||||||
"python-multipart==0.0.18",
|
"python-multipart==0.0.18",
|
||||||
|
|
||||||
"python-socketio==5.11.3",
|
"python-socketio==5.13.0",
|
||||||
"python-jose==3.4.0",
|
"python-jose==3.4.0",
|
||||||
"passlib[bcrypt]==1.7.4",
|
"passlib[bcrypt]==1.7.4",
|
||||||
|
|
||||||
|
8
uv.lock
8
uv.lock
@ -2966,7 +2966,7 @@ requires-dist = [
|
|||||||
{ name = "python-jose", specifier = "==3.3.0" },
|
{ name = "python-jose", specifier = "==3.3.0" },
|
||||||
{ name = "python-multipart", specifier = "==0.0.18" },
|
{ name = "python-multipart", specifier = "==0.0.18" },
|
||||||
{ name = "python-pptx", specifier = "==1.0.0" },
|
{ name = "python-pptx", specifier = "==1.0.0" },
|
||||||
{ name = "python-socketio", specifier = "==5.11.3" },
|
{ name = "python-socketio", specifier = "==5.13.0" },
|
||||||
{ name = "pytube", specifier = "==15.0.0" },
|
{ name = "pytube", specifier = "==15.0.0" },
|
||||||
{ name = "pyxlsb", specifier = "==1.0.10" },
|
{ name = "pyxlsb", specifier = "==1.0.10" },
|
||||||
{ name = "qdrant-client", specifier = "~=1.12.0" },
|
{ name = "qdrant-client", specifier = "~=1.12.0" },
|
||||||
@ -4073,15 +4073,15 @@ wheels = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "python-socketio"
|
name = "python-socketio"
|
||||||
version = "5.11.3"
|
version = "5.13.0"
|
||||||
source = { registry = "https://pypi.org/simple" }
|
source = { registry = "https://pypi.org/simple" }
|
||||||
dependencies = [
|
dependencies = [
|
||||||
{ name = "bidict" },
|
{ name = "bidict" },
|
||||||
{ name = "python-engineio" },
|
{ name = "python-engineio" },
|
||||||
]
|
]
|
||||||
sdist = { url = "https://files.pythonhosted.org/packages/1e/74/b1e8787cea757e1f533a7878e94f929679ef7e07a2aaf44de6b71065b1f2/python_socketio-5.11.3.tar.gz", hash = "sha256:194af8cdbb7b0768c2e807ba76c7abc288eb5bb85559b7cddee51a6bc7a65737", size = 117702 }
|
sdist = { url = "https://files.pythonhosted.org/packages/21/1a/396d50ccf06ee539fa758ce5623b59a9cb27637fc4b2dc07ed08bf495e77/python_socketio-5.13.0.tar.gz", hash = "sha256:ac4e19a0302ae812e23b712ec8b6427ca0521f7c582d6abb096e36e24a263029", size = 121125 }
|
||||||
wheels = [
|
wheels = [
|
||||||
{ url = "https://files.pythonhosted.org/packages/e9/59/5ee858d5736594d75385b9a8c0f65af6eca5da2b359ed3fb6a7486526399/python_socketio-5.11.3-py3-none-any.whl", hash = "sha256:2a923a831ff70664b7c502df093c423eb6aa93c1ce68b8319e840227a26d8b69", size = 76180 },
|
{ url = "https://files.pythonhosted.org/packages/3c/32/b4fb8585d1be0f68bde7e110dffbcf354915f77ad8c778563f0ad9655c02/python_socketio-5.13.0-py3-none-any.whl", hash = "sha256:51f68d6499f2df8524668c24bcec13ba1414117cfb3a90115c559b601ab10caf", size = 77800 },
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
Loading…
Reference in New Issue
Block a user