refac as prep for sentinel support in AppConfig

This commit is contained in:
Jan Kessler 2025-03-18 08:25:31 +01:00
parent 9bf663934a
commit 9167a8bef0
No known key found for this signature in database
GPG Key ID: FCF0DCB4ADFC53E7
3 changed files with 87 additions and 87 deletions

View File

@ -8,6 +8,7 @@ from redis import asyncio as aioredis
from open_webui.models.users import Users, UserNameResponse
from open_webui.models.channels import Channels
from open_webui.models.chats import Chats
from open_webui.utils.redis import parse_redis_sentinel_url, AsyncRedisSentinelManager
from open_webui.env import (
ENABLE_WEBSOCKET_SUPPORT,
@ -18,62 +19,13 @@ from open_webui.env import (
WEBSOCKET_SENTINEL_HOSTS,
)
from open_webui.utils.auth import decode_token
from open_webui.socket.utils import RedisDict, RedisLock, parse_redis_sentinel_url
from open_webui.socket.utils import RedisDict, RedisLock
from open_webui.env import (
GLOBAL_LOG_LEVEL,
SRC_LOG_LEVELS,
)
class AsyncRedisSentinelManager(socketio.AsyncRedisManager):
def __init__(self, sentinel_hosts, sentinel_port=26379, redis_port=6379, service="mymaster", db=0,
username=None, password=None, channel='socketio', write_only=False, logger=None, redis_options=None):
"""
Initialize the Redis Sentinel Manager.
This implementation mostly replicates the __init__ of AsyncRedisManager and
overrides _redis_connect() with a version that uses Redis Sentinel
:param sentinel_hosts: List of Sentinel hosts
:param sentinel_port: Sentinel Port
:param redis_port: Redis Port (currently unsupported by aioredis!)
:param service: Master service name in Sentinel
:param db: Redis database to use
:param username: Redis username (if any) (currently unsupported by aioredis!)
:param password: Redis password (if any)
:param channel: The channel name on which the server sends and receives
notifications. Must be the same in all the servers.
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
:param redis_options: additional keyword arguments to be passed to
``aioredis.from_url()``.
"""
self._sentinels = [(host, sentinel_port) for host in sentinel_hosts]
self._redis_port=redis_port
self._service = service
self._db = db
self._username = username
self._password = password
self._channel = channel
self.redis_options = redis_options or {}
# connect and call grandparent constructor
self._redis_connect()
super(socketio.AsyncRedisManager, self).__init__(channel=channel, write_only=write_only, logger=logger)
def _redis_connect(self):
"""Establish connections to Redis through Sentinel."""
sentinel = aioredis.sentinel.Sentinel(
self._sentinels,
port=self._redis_port,
db=self._db,
password=self._password,
**self.redis_options
)
self.redis = sentinel.master_for(self._service)
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
logging.basicConfig(stream=sys.stdout, level=GLOBAL_LOG_LEVEL)
log = logging.getLogger(__name__)

View File

@ -1,42 +1,6 @@
import json
import redis
import uuid
from urllib.parse import urlparse
def parse_redis_sentinel_url(redis_url):
parsed_url = urlparse(redis_url)
if parsed_url.scheme != "redis":
raise ValueError("Invalid Redis URL scheme. Must be 'redis'.")
return {
"username": parsed_url.username or None,
"password": parsed_url.password or None,
"service": parsed_url.hostname or 'mymaster',
"port": parsed_url.port or 6379,
"db": int(parsed_url.path.lstrip("/") or 0),
}
def get_redis_connection(redis_url, sentinels, decode_responses=True):
"""
Creates a Redis connection from either a standard Redis URL or uses special
parsing to setup a Sentinel connection, if given an array of host/port tuples.
"""
if sentinels:
redis_config = parse_redis_sentinel_url(redis_url)
sentinel = redis.sentinel.Sentinel(
sentinels,
port=redis_config['port'],
db=redis_config['db'],
username=redis_config['username'],
password=redis_config['password'],
decode_responses=decode_responses
)
# Get a master connection from Sentinel
return sentinel.master_for(redis_config['service'])
else:
# Standard Redis connection
return redis.Redis.from_url(redis_url, decode_responses=decode_responses)
from open_webui.utils.redis import get_redis_connection
class RedisLock:
def __init__(self, redis_url, lock_name, timeout_secs, sentinels=[]):

View File

@ -0,0 +1,84 @@
import socketio
import redis
from redis import asyncio as aioredis
from urllib.parse import urlparse
def parse_redis_sentinel_url(redis_url):
parsed_url = urlparse(redis_url)
if parsed_url.scheme != "redis":
raise ValueError("Invalid Redis URL scheme. Must be 'redis'.")
return {
"username": parsed_url.username or None,
"password": parsed_url.password or None,
"service": parsed_url.hostname or 'mymaster',
"port": parsed_url.port or 6379,
"db": int(parsed_url.path.lstrip("/") or 0),
}
def get_redis_connection(redis_url, sentinels, decode_responses=True):
if sentinels:
redis_config = parse_redis_sentinel_url(redis_url)
sentinel = redis.sentinel.Sentinel(
sentinels,
port=redis_config['port'],
db=redis_config['db'],
username=redis_config['username'],
password=redis_config['password'],
decode_responses=decode_responses
)
# Get a master connection from Sentinel
return sentinel.master_for(redis_config['service'])
else:
# Standard Redis connection
return redis.Redis.from_url(redis_url, decode_responses=decode_responses)
class AsyncRedisSentinelManager(socketio.AsyncRedisManager):
def __init__(self, sentinel_hosts, sentinel_port=26379, redis_port=6379, service="mymaster", db=0,
username=None, password=None, channel='socketio', write_only=False, logger=None, redis_options=None):
"""
Initialize the Redis Sentinel Manager.
This implementation mostly replicates the __init__ of AsyncRedisManager and
overrides _redis_connect() with a version that uses Redis Sentinel
:param sentinel_hosts: List of Sentinel hosts
:param sentinel_port: Sentinel Port
:param redis_port: Redis Port (currently unsupported by aioredis!)
:param service: Master service name in Sentinel
:param db: Redis database to use
:param username: Redis username (if any) (currently unsupported by aioredis!)
:param password: Redis password (if any)
:param channel: The channel name on which the server sends and receives
notifications. Must be the same in all the servers.
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
:param redis_options: additional keyword arguments to be passed to
``aioredis.from_url()``.
"""
self._sentinels = [(host, sentinel_port) for host in sentinel_hosts]
self._redis_port=redis_port
self._service = service
self._db = db
self._username = username
self._password = password
self._channel = channel
self.redis_options = redis_options or {}
# connect and call grandparent constructor
self._redis_connect()
super(socketio.AsyncRedisManager, self).__init__(channel=channel, write_only=write_only, logger=logger)
def _redis_connect(self):
"""Establish connections to Redis through Sentinel."""
sentinel = aioredis.sentinel.Sentinel(
self._sentinels,
port=self._redis_port,
db=self._db,
password=self._password,
**self.redis_options
)
self.redis = sentinel.master_for(self._service)
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)