refac: usage event handling

This commit is contained in:
Timothy Jaeryang Baek
2025-06-16 10:42:34 +04:00
parent deaa7133a2
commit 423a35782b
12 changed files with 219 additions and 152 deletions

View File

@@ -3094,4 +3094,3 @@ LDAP_ATTRIBUTE_FOR_GROUPS = PersistentConfig(
"ldap.server.attribute_for_groups",
os.environ.get("LDAP_ATTRIBUTE_FOR_GROUPS", "memberOf"),
)

View File

@@ -57,6 +57,8 @@ from open_webui.utils.logger import start_logger
from open_webui.socket.main import (
app as socket_app,
periodic_usage_pool_cleanup,
get_models_in_use,
get_active_user_ids,
)
from open_webui.routers import (
audio,
@@ -1627,6 +1629,19 @@ async def get_app_changelog():
return {key: CHANGELOG[key] for idx, key in enumerate(CHANGELOG) if idx < 5}
@app.get("/api/usage")
async def get_current_usage(user=Depends(get_verified_user)):
"""
Get current usage statistics for Open WebUI.
This is an experimental endpoint and subject to change.
"""
try:
return {"model_ids": get_models_in_use(), "user_ids": get_active_user_ids()}
except Exception as e:
log.error(f"Error getting usage statistics: {e}")
raise HTTPException(status_code=500, detail="Internal Server Error")
############################
# OAuth Login & Callback
############################

View File

@@ -14,7 +14,11 @@ from open_webui.models.users import (
)
from open_webui.socket.main import get_active_status_by_user_id
from open_webui.socket.main import (
get_active_status_by_user_id,
get_active_user_ids,
get_user_active_status,
)
from open_webui.constants import ERROR_MESSAGES
from open_webui.env import SRC_LOG_LEVELS
from fastapi import APIRouter, Depends, HTTPException, Request, status
@@ -29,6 +33,24 @@ log.setLevel(SRC_LOG_LEVELS["MODELS"])
router = APIRouter()
############################
# GetActiveUsers
############################
@router.get("/active")
async def get_active_users(
user=Depends(get_verified_user),
):
"""
Get a list of active users.
"""
return {
"user_ids": get_active_user_ids(),
}
############################
# GetUsers
############################
@@ -303,6 +325,18 @@ async def get_user_by_id(user_id: str, user=Depends(get_verified_user)):
)
############################
# GetUserActiveStatusById
############################
@router.get("/{user_id}/active", response_model=dict)
async def get_user_active_status_by_id(user_id: str, user=Depends(get_verified_user)):
return {
"active": get_user_active_status(user_id),
}
############################
# UpdateUserById
############################

View File

@@ -135,11 +135,6 @@ async def periodic_usage_pool_cleanup():
USAGE_POOL[model_id] = connections
send_usage = True
if send_usage:
# Emit updated usage information after cleaning
await sio.emit("usage", {"models": get_models_in_use()})
await asyncio.sleep(TIMEOUT_DURATION)
finally:
release_func()
@@ -157,6 +152,43 @@ def get_models_in_use():
return models_in_use
def get_active_user_ids():
"""Get the list of active user IDs."""
return list(USER_POOL.keys())
def get_user_active_status(user_id):
"""Check if a user is currently active."""
return user_id in USER_POOL
def get_user_id_from_session_pool(sid):
user = SESSION_POOL.get(sid)
if user:
return user["id"]
return None
def get_user_ids_from_room(room):
active_session_ids = sio.manager.get_participants(
namespace="/",
room=room,
)
active_user_ids = list(
set(
[SESSION_POOL.get(session_id[0])["id"] for session_id in active_session_ids]
)
)
return active_user_ids
def get_active_status_by_user_id(user_id):
if user_id in USER_POOL:
return True
return False
@sio.on("usage")
async def usage(sid, data):
if sid in SESSION_POOL:
@@ -170,9 +202,6 @@ async def usage(sid, data):
sid: {"updated_at": current_time},
}
# Broadcast the usage data to all clients
await sio.emit("usage", {"models": get_models_in_use()})
@sio.event
async def connect(sid, environ, auth):
@@ -190,10 +219,6 @@ async def connect(sid, environ, auth):
else:
USER_POOL[user.id] = [sid]
# print(f"user {user.name}({user.id}) connected with session ID {sid}")
await sio.emit("user-list", {"user_ids": list(USER_POOL.keys())})
await sio.emit("usage", {"models": get_models_in_use()})
@sio.on("user-join")
async def user_join(sid, data):
@@ -221,10 +246,6 @@ async def user_join(sid, data):
log.debug(f"{channels=}")
for channel in channels:
await sio.enter_room(sid, f"channel:{channel.id}")
# print(f"user {user.name}({user.id}) connected with session ID {sid}")
await sio.emit("user-list", {"user_ids": list(USER_POOL.keys())})
return {"id": user.id, "name": user.name}
@@ -277,12 +298,6 @@ async def channel_events(sid, data):
)
@sio.on("user-list")
async def user_list(sid):
if sid in SESSION_POOL:
await sio.emit("user-list", {"user_ids": list(USER_POOL.keys())})
@sio.event
async def disconnect(sid):
if sid in SESSION_POOL:
@@ -294,8 +309,6 @@ async def disconnect(sid):
if len(USER_POOL[user_id]) == 0:
del USER_POOL[user_id]
await sio.emit("user-list", {"user_ids": list(USER_POOL.keys())})
else:
pass
# print(f"Unknown session ID {sid} disconnected")
@@ -388,30 +401,3 @@ def get_event_call(request_info):
get_event_caller = get_event_call
def get_user_id_from_session_pool(sid):
user = SESSION_POOL.get(sid)
if user:
return user["id"]
return None
def get_user_ids_from_room(room):
active_session_ids = sio.manager.get_participants(
namespace="/",
room=room,
)
active_user_ids = list(
set(
[SESSION_POOL.get(session_id[0])["id"] for session_id in active_session_ids]
)
)
return active_user_ids
def get_active_status_by_user_id(user_id):
if user_id in USER_POOL:
return True
return False