Merge remote-tracking branch 'origin' into logit_bias

This commit is contained in:
dannyl1u
2025-02-27 23:48:22 -08:00
181 changed files with 10428 additions and 5218 deletions

View File

@@ -0,0 +1,249 @@
from contextlib import asynccontextmanager
from dataclasses import asdict, dataclass
from enum import Enum
import re
from typing import (
TYPE_CHECKING,
Any,
AsyncGenerator,
Dict,
MutableMapping,
Optional,
cast,
)
import uuid
from asgiref.typing import (
ASGI3Application,
ASGIReceiveCallable,
ASGIReceiveEvent,
ASGISendCallable,
ASGISendEvent,
Scope as ASGIScope,
)
from loguru import logger
from starlette.requests import Request
from open_webui.env import AUDIT_LOG_LEVEL, MAX_BODY_LOG_SIZE
from open_webui.utils.auth import get_current_user, get_http_authorization_cred
from open_webui.models.users import UserModel
if TYPE_CHECKING:
from loguru import Logger
@dataclass(frozen=True)
class AuditLogEntry:
# `Metadata` audit level properties
id: str
user: dict[str, Any]
audit_level: str
verb: str
request_uri: str
user_agent: Optional[str] = None
source_ip: Optional[str] = None
# `Request` audit level properties
request_object: Any = None
# `Request Response` level
response_object: Any = None
response_status_code: Optional[int] = None
class AuditLevel(str, Enum):
NONE = "NONE"
METADATA = "METADATA"
REQUEST = "REQUEST"
REQUEST_RESPONSE = "REQUEST_RESPONSE"
class AuditLogger:
"""
A helper class that encapsulates audit logging functionality. It uses Logurus logger with an auditable binding to ensure that audit log entries are filtered correctly.
Parameters:
logger (Logger): An instance of Logurus logger.
"""
def __init__(self, logger: "Logger"):
self.logger = logger.bind(auditable=True)
def write(
self,
audit_entry: AuditLogEntry,
*,
log_level: str = "INFO",
extra: Optional[dict] = None,
):
entry = asdict(audit_entry)
if extra:
entry["extra"] = extra
self.logger.log(
log_level,
"",
**entry,
)
class AuditContext:
"""
Captures and aggregates the HTTP request and response bodies during the processing of a request. It ensures that only a configurable maximum amount of data is stored to prevent excessive memory usage.
Attributes:
request_body (bytearray): Accumulated request payload.
response_body (bytearray): Accumulated response payload.
max_body_size (int): Maximum number of bytes to capture.
metadata (Dict[str, Any]): A dictionary to store additional audit metadata (user, http verb, user agent, etc.).
"""
def __init__(self, max_body_size: int = MAX_BODY_LOG_SIZE):
self.request_body = bytearray()
self.response_body = bytearray()
self.max_body_size = max_body_size
self.metadata: Dict[str, Any] = {}
def add_request_chunk(self, chunk: bytes):
if len(self.request_body) < self.max_body_size:
self.request_body.extend(
chunk[: self.max_body_size - len(self.request_body)]
)
def add_response_chunk(self, chunk: bytes):
if len(self.response_body) < self.max_body_size:
self.response_body.extend(
chunk[: self.max_body_size - len(self.response_body)]
)
class AuditLoggingMiddleware:
"""
ASGI middleware that intercepts HTTP requests and responses to perform audit logging. It captures request/response bodies (depending on audit level), headers, HTTP methods, and user information, then logs a structured audit entry at the end of the request cycle.
"""
AUDITED_METHODS = {"PUT", "PATCH", "DELETE", "POST"}
def __init__(
self,
app: ASGI3Application,
*,
excluded_paths: Optional[list[str]] = None,
max_body_size: int = MAX_BODY_LOG_SIZE,
audit_level: AuditLevel = AuditLevel.NONE,
) -> None:
self.app = app
self.audit_logger = AuditLogger(logger)
self.excluded_paths = excluded_paths or []
self.max_body_size = max_body_size
self.audit_level = audit_level
async def __call__(
self,
scope: ASGIScope,
receive: ASGIReceiveCallable,
send: ASGISendCallable,
) -> None:
if scope["type"] != "http":
return await self.app(scope, receive, send)
request = Request(scope=cast(MutableMapping, scope))
if self._should_skip_auditing(request):
return await self.app(scope, receive, send)
async with self._audit_context(request) as context:
async def send_wrapper(message: ASGISendEvent) -> None:
if self.audit_level == AuditLevel.REQUEST_RESPONSE:
await self._capture_response(message, context)
await send(message)
original_receive = receive
async def receive_wrapper() -> ASGIReceiveEvent:
nonlocal original_receive
message = await original_receive()
if self.audit_level in (
AuditLevel.REQUEST,
AuditLevel.REQUEST_RESPONSE,
):
await self._capture_request(message, context)
return message
await self.app(scope, receive_wrapper, send_wrapper)
@asynccontextmanager
async def _audit_context(
self, request: Request
) -> AsyncGenerator[AuditContext, None]:
"""
async context manager that ensures that an audit log entry is recorded after the request is processed.
"""
context = AuditContext()
try:
yield context
finally:
await self._log_audit_entry(request, context)
async def _get_authenticated_user(self, request: Request) -> UserModel:
auth_header = request.headers.get("Authorization")
assert auth_header
user = get_current_user(request, None, get_http_authorization_cred(auth_header))
return user
def _should_skip_auditing(self, request: Request) -> bool:
if (
request.method not in {"POST", "PUT", "PATCH", "DELETE"}
or AUDIT_LOG_LEVEL == "NONE"
or not request.headers.get("authorization")
):
return True
# match either /api/<resource>/...(for the endpoint /api/chat case) or /api/v1/<resource>/...
pattern = re.compile(
r"^/api(?:/v1)?/(" + "|".join(self.excluded_paths) + r")\b"
)
if pattern.match(request.url.path):
return True
return False
async def _capture_request(self, message: ASGIReceiveEvent, context: AuditContext):
if message["type"] == "http.request":
body = message.get("body", b"")
context.add_request_chunk(body)
async def _capture_response(self, message: ASGISendEvent, context: AuditContext):
if message["type"] == "http.response.start":
context.metadata["response_status_code"] = message["status"]
elif message["type"] == "http.response.body":
body = message.get("body", b"")
context.add_response_chunk(body)
async def _log_audit_entry(self, request: Request, context: AuditContext):
try:
user = await self._get_authenticated_user(request)
entry = AuditLogEntry(
id=str(uuid.uuid4()),
user=user.model_dump(include={"id", "name", "email", "role"}),
audit_level=self.audit_level.value,
verb=request.method,
request_uri=str(request.url),
response_status_code=context.metadata.get("response_status_code", None),
source_ip=request.client.host if request.client else None,
user_agent=request.headers.get("user-agent"),
request_object=context.request_body.decode("utf-8", errors="replace"),
response_object=context.response_body.decode("utf-8", errors="replace"),
)
self.audit_logger.write(entry)
except Exception as e:
logger.error(f"Failed to log audit entry: {str(e)}")

View File

@@ -5,6 +5,7 @@ import base64
import hmac
import hashlib
import requests
import os
from datetime import UTC, datetime, timedelta
@@ -13,15 +14,22 @@ from typing import Optional, Union, List, Dict
from open_webui.models.users import Users
from open_webui.constants import ERROR_MESSAGES
from open_webui.config import override_static
from open_webui.env import WEBUI_SECRET_KEY, TRUSTED_SIGNATURE_KEY
from open_webui.env import (
WEBUI_SECRET_KEY,
TRUSTED_SIGNATURE_KEY,
STATIC_DIR,
SRC_LOG_LEVELS,
)
from fastapi import Depends, HTTPException, Request, Response, status
from fastapi import BackgroundTasks, Depends, HTTPException, Request, Response, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from passlib.context import CryptContext
logging.getLogger("passlib").setLevel(logging.ERROR)
log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["OAUTH"])
SESSION_SECRET = WEBUI_SECRET_KEY
ALGORITHM = "HS256"
@@ -47,6 +55,19 @@ def verify_signature(payload: str, signature: str) -> bool:
return False
def override_static(path: str, content: str):
# Ensure path is safe
if "/" in path or ".." in path:
log.error(f"Invalid path: {path}")
return
file_path = os.path.join(STATIC_DIR, path)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "wb") as f:
f.write(base64.b64decode(content)) # Convert Base64 back to raw binary
def get_license_data(app, key):
if key:
try:
@@ -69,11 +90,11 @@ def get_license_data(app, key):
return True
else:
print(
log.error(
f"License: retrieval issue: {getattr(res, 'text', 'unknown error')}"
)
except Exception as ex:
print(f"License: Uncaught Exception: {ex}")
log.exception(f"License: Uncaught Exception: {ex}")
return False
@@ -129,6 +150,7 @@ def get_http_authorization_cred(auth_header: str):
def get_current_user(
request: Request,
background_tasks: BackgroundTasks,
auth_token: HTTPAuthorizationCredentials = Depends(bearer_security),
):
token = None
@@ -181,7 +203,10 @@ def get_current_user(
detail=ERROR_MESSAGES.INVALID_TOKEN,
)
else:
Users.update_user_last_active_by_id(user.id)
# Refresh the user's last active timestamp asynchronously
# to prevent blocking the request
if background_tasks:
background_tasks.add_task(Users.update_user_last_active_by_id, user.id)
return user
else:
raise HTTPException(

View File

@@ -66,7 +66,7 @@ async def generate_direct_chat_completion(
user: Any,
models: dict,
):
print("generate_direct_chat_completion")
log.info("generate_direct_chat_completion")
metadata = form_data.pop("metadata", {})
@@ -103,7 +103,7 @@ async def generate_direct_chat_completion(
}
)
print("res", res)
log.info(f"res: {res}")
if res.get("status", False):
# Define a generator to stream responses
@@ -200,7 +200,7 @@ async def generate_chat_completion(
except Exception as e:
raise e
if model["owned_by"] == "arena":
if model.get("owned_by") == "arena":
model_ids = model.get("info", {}).get("meta", {}).get("model_ids")
filter_mode = model.get("info", {}).get("meta", {}).get("filter_mode")
if model_ids and filter_mode == "exclude":
@@ -253,7 +253,7 @@ async def generate_chat_completion(
return await generate_function_chat_completion(
request, form_data, user=user, models=models
)
if model["owned_by"] == "ollama":
if model.get("owned_by") == "ollama":
# Using /ollama/api/chat endpoint
form_data = convert_payload_openai_to_ollama(form_data)
response = await generate_ollama_chat_completion(
@@ -285,7 +285,7 @@ chat_completion = generate_chat_completion
async def chat_completed(request: Request, form_data: dict, user: Any):
if not request.app.state.MODELS:
await get_all_models(request)
await get_all_models(request, user=user)
if getattr(request.state, "direct", False) and hasattr(request.state, "model"):
models = {
@@ -351,7 +351,7 @@ async def chat_action(request: Request, action_id: str, form_data: dict, user: A
raise Exception(f"Action not found: {action_id}")
if not request.app.state.MODELS:
await get_all_models(request)
await get_all_models(request, user=user)
if getattr(request.state, "direct", False) and hasattr(request.state, "model"):
models = {
@@ -432,7 +432,7 @@ async def chat_action(request: Request, action_id: str, form_data: dict, user: A
)
)
except Exception as e:
print(e)
log.exception(f"Failed to get user values: {e}")
params = {**params, "__user__": __user__}

View File

@@ -1,6 +1,12 @@
import inspect
import logging
from open_webui.utils.plugin import load_function_module_by_id
from open_webui.models.functions import Functions
from open_webui.env import SRC_LOG_LEVELS
log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["MAIN"])
def get_sorted_filter_ids(model):
@@ -61,7 +67,12 @@ async def process_filter_functions(
try:
# Prepare parameters
sig = inspect.signature(handler)
params = {"body": form_data} | {
params = {"body": form_data}
if filter_type == "stream":
params = {"event": form_data}
params = params | {
k: v
for k, v in {
**extra_params,
@@ -80,7 +91,7 @@ async def process_filter_functions(
)
)
except Exception as e:
print(e)
log.exception(f"Failed to get user values: {e}")
# Execute handler
if inspect.iscoroutinefunction(handler):
@@ -89,7 +100,7 @@ async def process_filter_functions(
form_data = handler(**params)
except Exception as e:
print(f"Error in {filter_type} handler {filter_id}: {e}")
log.exception(f"Error in {filter_type} handler {filter_id}: {e}")
raise e
# Handle file cleanup for inlet

View File

@@ -0,0 +1,140 @@
import json
import logging
import sys
from typing import TYPE_CHECKING
from loguru import logger
from open_webui.env import (
AUDIT_LOG_FILE_ROTATION_SIZE,
AUDIT_LOG_LEVEL,
AUDIT_LOGS_FILE_PATH,
GLOBAL_LOG_LEVEL,
)
if TYPE_CHECKING:
from loguru import Record
def stdout_format(record: "Record") -> str:
"""
Generates a formatted string for log records that are output to the console. This format includes a timestamp, log level, source location (module, function, and line), the log message, and any extra data (serialized as JSON).
Parameters:
record (Record): A Loguru record that contains logging details including time, level, name, function, line, message, and any extra context.
Returns:
str: A formatted log string intended for stdout.
"""
record["extra"]["extra_json"] = json.dumps(record["extra"])
return (
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
"<level>{level: <8}</level> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - "
"<level>{message}</level> - {extra[extra_json]}"
"\n{exception}"
)
class InterceptHandler(logging.Handler):
"""
Intercepts log records from Python's standard logging module
and redirects them to Loguru's logger.
"""
def emit(self, record):
"""
Called by the standard logging module for each log event.
It transforms the standard `LogRecord` into a format compatible with Loguru
and passes it to Loguru's logger.
"""
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno
frame, depth = sys._getframe(6), 6
while frame and frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
logger.opt(depth=depth, exception=record.exc_info).log(
level, record.getMessage()
)
def file_format(record: "Record"):
"""
Formats audit log records into a structured JSON string for file output.
Parameters:
record (Record): A Loguru record containing extra audit data.
Returns:
str: A JSON-formatted string representing the audit data.
"""
audit_data = {
"id": record["extra"].get("id", ""),
"timestamp": int(record["time"].timestamp()),
"user": record["extra"].get("user", dict()),
"audit_level": record["extra"].get("audit_level", ""),
"verb": record["extra"].get("verb", ""),
"request_uri": record["extra"].get("request_uri", ""),
"response_status_code": record["extra"].get("response_status_code", 0),
"source_ip": record["extra"].get("source_ip", ""),
"user_agent": record["extra"].get("user_agent", ""),
"request_object": record["extra"].get("request_object", b""),
"response_object": record["extra"].get("response_object", b""),
"extra": record["extra"].get("extra", {}),
}
record["extra"]["file_extra"] = json.dumps(audit_data, default=str)
return "{extra[file_extra]}\n"
def start_logger():
"""
Initializes and configures Loguru's logger with distinct handlers:
A console (stdout) handler for general log messages (excluding those marked as auditable).
An optional file handler for audit logs if audit logging is enabled.
Additionally, this function reconfigures Pythons standard logging to route through Loguru and adjusts logging levels for Uvicorn.
Parameters:
enable_audit_logging (bool): Determines whether audit-specific log entries should be recorded to file.
"""
logger.remove()
logger.add(
sys.stdout,
level=GLOBAL_LOG_LEVEL,
format=stdout_format,
filter=lambda record: "auditable" not in record["extra"],
)
if AUDIT_LOG_LEVEL != "NONE":
try:
logger.add(
AUDIT_LOGS_FILE_PATH,
level="INFO",
rotation=AUDIT_LOG_FILE_ROTATION_SIZE,
compression="zip",
format=file_format,
filter=lambda record: record["extra"].get("auditable") is True,
)
except Exception as e:
logger.error(f"Failed to initialize audit log file handler: {str(e)}")
logging.basicConfig(
handlers=[InterceptHandler()], level=GLOBAL_LOG_LEVEL, force=True
)
for uvicorn_logger_name in ["uvicorn", "uvicorn.error"]:
uvicorn_logger = logging.getLogger(uvicorn_logger_name)
uvicorn_logger.setLevel(GLOBAL_LOG_LEVEL)
uvicorn_logger.handlers = []
for uvicorn_logger_name in ["uvicorn.access"]:
uvicorn_logger = logging.getLogger(uvicorn_logger_name)
uvicorn_logger.setLevel(GLOBAL_LOG_LEVEL)
uvicorn_logger.handlers = [InterceptHandler()]
logger.info(f"GLOBAL_LOG_LEVEL: {GLOBAL_LOG_LEVEL}")

View File

@@ -322,78 +322,95 @@ async def chat_web_search_handler(
)
return form_data
searchQuery = queries[0]
all_results = []
await event_emitter(
{
"type": "status",
"data": {
"action": "web_search",
"description": 'Searching "{{searchQuery}}"',
"query": searchQuery,
"done": False,
},
}
)
try:
results = await process_web_search(
request,
SearchForm(
**{
for searchQuery in queries:
await event_emitter(
{
"type": "status",
"data": {
"action": "web_search",
"description": 'Searching "{{searchQuery}}"',
"query": searchQuery,
}
),
user,
"done": False,
},
}
)
if results:
await event_emitter(
{
"type": "status",
"data": {
"action": "web_search",
"description": "Searched {{count}} sites",
try:
results = await process_web_search(
request,
SearchForm(
**{
"query": searchQuery,
"urls": results["filenames"],
"done": True,
},
}
}
),
user=user,
)
files = form_data.get("files", [])
files.append(
{
"collection_name": results["collection_name"],
"name": searchQuery,
"type": "web_search_results",
"urls": results["filenames"],
}
)
form_data["files"] = files
else:
if results:
all_results.append(results)
files = form_data.get("files", [])
if results.get("collection_name"):
files.append(
{
"collection_name": results["collection_name"],
"name": searchQuery,
"type": "web_search",
"urls": results["filenames"],
}
)
elif results.get("docs"):
files.append(
{
"docs": results.get("docs", []),
"name": searchQuery,
"type": "web_search",
"urls": results["filenames"],
}
)
form_data["files"] = files
except Exception as e:
log.exception(e)
await event_emitter(
{
"type": "status",
"data": {
"action": "web_search",
"description": "No search results found",
"description": 'Error searching "{{searchQuery}}"',
"query": searchQuery,
"done": True,
"error": True,
},
}
)
except Exception as e:
log.exception(e)
if all_results:
urls = []
for results in all_results:
if "filenames" in results:
urls.extend(results["filenames"])
await event_emitter(
{
"type": "status",
"data": {
"action": "web_search",
"description": 'Error searching "{{searchQuery}}"',
"query": searchQuery,
"description": "Searched {{count}} sites",
"urls": urls,
"done": True,
},
}
)
else:
await event_emitter(
{
"type": "status",
"data": {
"action": "web_search",
"description": "No search results found",
"done": True,
"error": True,
},
@@ -503,6 +520,7 @@ async def chat_completion_files_handler(
sources = []
if files := body.get("metadata", {}).get("files", None):
queries = []
try:
queries_response = await generate_queries(
request,
@@ -528,8 +546,8 @@ async def chat_completion_files_handler(
queries_response = {"queries": [queries_response]}
queries = queries_response.get("queries", [])
except Exception as e:
queries = []
except:
pass
if len(queries) == 0:
queries = [get_last_user_message(body["messages"])]
@@ -541,6 +559,7 @@ async def chat_completion_files_handler(
sources = await loop.run_in_executor(
executor,
lambda: get_sources_from_files(
request=request,
files=files,
queries=queries,
embedding_function=lambda query: request.app.state.EMBEDDING_FUNCTION(
@@ -550,9 +569,9 @@ async def chat_completion_files_handler(
reranking_function=request.app.state.rf,
r=request.app.state.config.RELEVANCE_THRESHOLD,
hybrid_search=request.app.state.config.ENABLE_RAG_HYBRID_SEARCH,
full_context=request.app.state.config.RAG_FULL_CONTEXT,
),
)
except Exception as e:
log.exception(e)
@@ -728,6 +747,7 @@ async def process_chat_payload(request, form_data, metadata, user, model):
tool_ids = form_data.pop("tool_ids", None)
files = form_data.pop("files", None)
# Remove files duplicates
if files:
files = list({json.dumps(f, sort_keys=True): f for f in files}.values())
@@ -785,8 +805,6 @@ async def process_chat_payload(request, form_data, metadata, user, model):
if len(sources) > 0:
context_string = ""
for source_idx, source in enumerate(sources):
source_id = source.get("source", {}).get("name", "")
if "document" in source:
for doc_idx, doc_context in enumerate(source["document"]):
context_string += f"<source><source_id>{source_idx}</source_id><source_context>{doc_context}</source_context></source>\n"
@@ -806,7 +824,7 @@ async def process_chat_payload(request, form_data, metadata, user, model):
# Workaround for Ollama 2.0+ system prompt issue
# TODO: replace with add_or_update_system_message
if model["owned_by"] == "ollama":
if model.get("owned_by") == "ollama":
form_data["messages"] = prepend_to_first_user_message_content(
rag_template(
request.app.state.config.RAG_TEMPLATE, context_string, prompt
@@ -1038,6 +1056,21 @@ async def process_chat_response(
):
return response
extra_params = {
"__event_emitter__": event_emitter,
"__event_call__": event_caller,
"__user__": {
"id": user.id,
"email": user.email,
"name": user.name,
"role": user.role,
},
"__metadata__": metadata,
"__request__": request,
"__model__": metadata.get("model"),
}
filter_ids = get_sorted_filter_ids(form_data.get("model"))
# Streaming response
if event_emitter and event_caller:
task_id = str(uuid4()) # Create a unique task ID.
@@ -1117,12 +1150,12 @@ async def process_chat_response(
if reasoning_duration is not None:
if raw:
content = f'{content}\n<{block["tag"]}>{block["content"]}</{block["tag"]}>\n'
content = f'{content}\n<{block["start_tag"]}>{block["content"]}<{block["end_tag"]}>\n'
else:
content = f'{content}\n<details type="reasoning" done="true" duration="{reasoning_duration}">\n<summary>Thought for {reasoning_duration} seconds</summary>\n{reasoning_display_content}\n</details>\n'
else:
if raw:
content = f'{content}\n<{block["tag"]}>{block["content"]}</{block["tag"]}>\n'
content = f'{content}\n<{block["start_tag"]}>{block["content"]}<{block["end_tag"]}>\n'
else:
content = f'{content}\n<details type="reasoning" done="false">\n<summary>Thinking…</summary>\n{reasoning_display_content}\n</details>\n'
@@ -1218,9 +1251,9 @@ async def process_chat_response(
return attributes
if content_blocks[-1]["type"] == "text":
for tag in tags:
for start_tag, end_tag in tags:
# Match start tag e.g., <tag> or <tag attr="value">
start_tag_pattern = rf"<{tag}(\s.*?)?>"
start_tag_pattern = rf"<{re.escape(start_tag)}(\s.*?)?>"
match = re.search(start_tag_pattern, content)
if match:
attr_content = (
@@ -1253,7 +1286,8 @@ async def process_chat_response(
content_blocks.append(
{
"type": content_type,
"tag": tag,
"start_tag": start_tag,
"end_tag": end_tag,
"attributes": attributes,
"content": "",
"started_at": time.time(),
@@ -1265,9 +1299,10 @@ async def process_chat_response(
break
elif content_blocks[-1]["type"] == content_type:
tag = content_blocks[-1]["tag"]
start_tag = content_blocks[-1]["start_tag"]
end_tag = content_blocks[-1]["end_tag"]
# Match end tag e.g., </tag>
end_tag_pattern = rf"</{tag}>"
end_tag_pattern = rf"<{re.escape(end_tag)}>"
# Check if the content has the end tag
if re.search(end_tag_pattern, content):
@@ -1275,7 +1310,7 @@ async def process_chat_response(
block_content = content_blocks[-1]["content"]
# Strip start and end tags from the content
start_tag_pattern = rf"<{tag}(.*?)>"
start_tag_pattern = rf"<{re.escape(start_tag)}(.*?)>"
block_content = re.sub(
start_tag_pattern, "", block_content
).strip()
@@ -1340,7 +1375,7 @@ async def process_chat_response(
# Clean processed content
content = re.sub(
rf"<{tag}(.*?)>(.|\n)*?</{tag}>",
rf"<{re.escape(start_tag)}(.*?)>(.|\n)*?<{re.escape(end_tag)}>",
"",
content,
flags=re.DOTALL,
@@ -1353,7 +1388,22 @@ async def process_chat_response(
)
tool_calls = []
content = message.get("content", "") if message else ""
last_assistant_message = None
try:
if form_data["messages"][-1]["role"] == "assistant":
last_assistant_message = get_last_assistant_message(
form_data["messages"]
)
except Exception as e:
pass
content = (
message.get("content", "")
if message
else last_assistant_message if last_assistant_message else ""
)
content_blocks = [
{
"type": "text",
@@ -1363,19 +1413,24 @@ async def process_chat_response(
# We might want to disable this by default
DETECT_REASONING = True
DETECT_SOLUTION = True
DETECT_CODE_INTERPRETER = metadata.get("features", {}).get(
"code_interpreter", False
)
reasoning_tags = [
"think",
"thinking",
"reason",
"reasoning",
"thought",
"Thought",
("think", "/think"),
("thinking", "/thinking"),
("reason", "/reason"),
("reasoning", "/reasoning"),
("thought", "/thought"),
("Thought", "/Thought"),
("|begin_of_thought|", "|end_of_thought|"),
]
code_interpreter_tags = ["code_interpreter"]
code_interpreter_tags = [("code_interpreter", "/code_interpreter")]
solution_tags = [("|begin_of_solution|", "|end_of_solution|")]
try:
for event in events:
@@ -1419,119 +1474,154 @@ async def process_chat_response(
try:
data = json.loads(data)
if "selected_model_id" in data:
model_id = data["selected_model_id"]
Chats.upsert_message_to_chat_by_id_and_message_id(
metadata["chat_id"],
metadata["message_id"],
{
"selectedModelId": model_id,
},
)
else:
choices = data.get("choices", [])
if not choices:
continue
data, _ = await process_filter_functions(
request=request,
filter_ids=filter_ids,
filter_type="stream",
form_data=data,
extra_params=extra_params,
)
delta = choices[0].get("delta", {})
delta_tool_calls = delta.get("tool_calls", None)
if delta_tool_calls:
for delta_tool_call in delta_tool_calls:
tool_call_index = delta_tool_call.get("index")
if tool_call_index is not None:
if (
len(response_tool_calls)
<= tool_call_index
):
response_tool_calls.append(
delta_tool_call
)
else:
delta_name = delta_tool_call.get(
"function", {}
).get("name")
delta_arguments = delta_tool_call.get(
"function", {}
).get("arguments")
if delta_name:
response_tool_calls[
tool_call_index
]["function"]["name"] += delta_name
if delta_arguments:
response_tool_calls[
tool_call_index
]["function"][
"arguments"
] += delta_arguments
value = delta.get("content")
if value:
content = f"{content}{value}"
if not content_blocks:
content_blocks.append(
{
"type": "text",
"content": "",
}
)
content_blocks[-1]["content"] = (
content_blocks[-1]["content"] + value
if data:
if "selected_model_id" in data:
model_id = data["selected_model_id"]
Chats.upsert_message_to_chat_by_id_and_message_id(
metadata["chat_id"],
metadata["message_id"],
{
"selectedModelId": model_id,
},
)
if DETECT_REASONING:
content, content_blocks, _ = (
tag_content_handler(
"reasoning",
reasoning_tags,
content,
content_blocks,
else:
choices = data.get("choices", [])
if not choices:
usage = data.get("usage", {})
if usage:
await event_emitter(
{
"type": "chat:completion",
"data": {
"usage": usage,
},
}
)
continue
delta = choices[0].get("delta", {})
delta_tool_calls = delta.get("tool_calls", None)
if delta_tool_calls:
for delta_tool_call in delta_tool_calls:
tool_call_index = delta_tool_call.get(
"index"
)
if tool_call_index is not None:
if (
len(response_tool_calls)
<= tool_call_index
):
response_tool_calls.append(
delta_tool_call
)
else:
delta_name = delta_tool_call.get(
"function", {}
).get("name")
delta_arguments = (
delta_tool_call.get(
"function", {}
).get("arguments")
)
if delta_name:
response_tool_calls[
tool_call_index
]["function"][
"name"
] += delta_name
if delta_arguments:
response_tool_calls[
tool_call_index
]["function"][
"arguments"
] += delta_arguments
value = delta.get("content")
if value:
content = f"{content}{value}"
if not content_blocks:
content_blocks.append(
{
"type": "text",
"content": "",
}
)
content_blocks[-1]["content"] = (
content_blocks[-1]["content"] + value
)
if DETECT_CODE_INTERPRETER:
content, content_blocks, end = (
tag_content_handler(
"code_interpreter",
code_interpreter_tags,
content,
content_blocks,
if DETECT_REASONING:
content, content_blocks, _ = (
tag_content_handler(
"reasoning",
reasoning_tags,
content,
content_blocks,
)
)
)
if end:
break
if DETECT_CODE_INTERPRETER:
content, content_blocks, end = (
tag_content_handler(
"code_interpreter",
code_interpreter_tags,
content,
content_blocks,
)
)
if ENABLE_REALTIME_CHAT_SAVE:
# Save message in the database
Chats.upsert_message_to_chat_by_id_and_message_id(
metadata["chat_id"],
metadata["message_id"],
{
if end:
break
if DETECT_SOLUTION:
content, content_blocks, _ = (
tag_content_handler(
"solution",
solution_tags,
content,
content_blocks,
)
)
if ENABLE_REALTIME_CHAT_SAVE:
# Save message in the database
Chats.upsert_message_to_chat_by_id_and_message_id(
metadata["chat_id"],
metadata["message_id"],
{
"content": serialize_content_blocks(
content_blocks
),
},
)
else:
data = {
"content": serialize_content_blocks(
content_blocks
),
},
)
else:
data = {
"content": serialize_content_blocks(
content_blocks
),
}
}
await event_emitter(
{
"type": "chat:completion",
"data": data,
}
)
await event_emitter(
{
"type": "chat:completion",
"data": data,
}
)
except Exception as e:
done = "data: [DONE]" in line
if done:
@@ -1736,6 +1826,7 @@ async def process_chat_response(
== "password"
else None
),
request.app.state.config.CODE_INTERPRETER_JUPYTER_TIMEOUT,
)
else:
output = {
@@ -1829,7 +1920,10 @@ async def process_chat_response(
}
)
print(content_blocks, serialize_content_blocks(content_blocks))
log.info(f"content_blocks={content_blocks}")
log.info(
f"serialize_content_blocks={serialize_content_blocks(content_blocks)}"
)
try:
res = await generate_chat_completion(
@@ -1900,7 +1994,7 @@ async def process_chat_response(
await background_tasks_handler()
except asyncio.CancelledError:
print("Task was cancelled!")
log.warning("Task was cancelled!")
await event_emitter({"type": "task-cancelled"})
if not ENABLE_REALTIME_CHAT_SAVE:
@@ -1921,17 +2015,34 @@ async def process_chat_response(
return {"status": True, "task_id": task_id}
else:
# Fallback to the original response
async def stream_wrapper(original_generator, events):
def wrap_item(item):
return f"data: {item}\n\n"
for event in events:
yield wrap_item(json.dumps(event))
event, _ = await process_filter_functions(
request=request,
filter_ids=filter_ids,
filter_type="stream",
form_data=event,
extra_params=extra_params,
)
if event:
yield wrap_item(json.dumps(event))
async for data in original_generator:
yield data
data, _ = await process_filter_functions(
request=request,
filter_ids=filter_ids,
filter_type="stream",
form_data=data,
extra_params=extra_params,
)
if data:
yield data
return StreamingResponse(
stream_wrapper(response.body_iterator, events),

View File

@@ -2,6 +2,7 @@ import hashlib
import re
import time
import uuid
import logging
from datetime import timedelta
from pathlib import Path
from typing import Callable, Optional
@@ -9,6 +10,10 @@ import json
import collections.abc
from open_webui.env import SRC_LOG_LEVELS
log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["MAIN"])
def deep_update(d, u):
@@ -413,7 +418,7 @@ def parse_ollama_modelfile(model_text):
elif param_type is bool:
value = value.lower() == "true"
except Exception as e:
print(e)
log.exception(f"Failed to parse parameter {param}: {e}")
continue
data["params"][param] = value

View File

@@ -22,6 +22,7 @@ from open_webui.config import (
)
from open_webui.env import SRC_LOG_LEVELS, GLOBAL_LOG_LEVEL
from open_webui.models.users import UserModel
logging.basicConfig(stream=sys.stdout, level=GLOBAL_LOG_LEVEL)
@@ -29,17 +30,17 @@ log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["MAIN"])
async def get_all_base_models(request: Request):
async def get_all_base_models(request: Request, user: UserModel = None):
function_models = []
openai_models = []
ollama_models = []
if request.app.state.config.ENABLE_OPENAI_API:
openai_models = await openai.get_all_models(request)
openai_models = await openai.get_all_models(request, user=user)
openai_models = openai_models["data"]
if request.app.state.config.ENABLE_OLLAMA_API:
ollama_models = await ollama.get_all_models(request)
ollama_models = await ollama.get_all_models(request, user=user)
ollama_models = [
{
"id": model["model"],
@@ -58,8 +59,8 @@ async def get_all_base_models(request: Request):
return models
async def get_all_models(request):
models = await get_all_base_models(request)
async def get_all_models(request, user: UserModel = None):
models = await get_all_base_models(request, user=user)
# If there are no models, return an empty list
if len(models) == 0:
@@ -142,7 +143,7 @@ async def get_all_models(request):
custom_model.base_model_id == model["id"]
or custom_model.base_model_id == model["id"].split(":")[0]
):
owned_by = model["owned_by"]
owned_by = model.get("owned_by", "unknown owner")
if "pipe" in model:
pipe = model["pipe"]
break

View File

@@ -140,7 +140,14 @@ class OAuthManager:
log.debug("Running OAUTH Group management")
oauth_claim = auth_manager_config.OAUTH_GROUPS_CLAIM
user_oauth_groups: list[str] = user_data.get(oauth_claim, list())
# Nested claim search for groups claim
if oauth_claim:
claim_data = user_data
nested_claims = oauth_claim.split(".")
for nested_claim in nested_claims:
claim_data = claim_data.get(nested_claim, {})
user_oauth_groups = claim_data if isinstance(claim_data, list) else []
user_current_groups: list[GroupModel] = Groups.get_groups_by_member_id(user.id)
all_available_groups: list[GroupModel] = Groups.get_groups()
@@ -239,11 +246,46 @@ class OAuthManager:
raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
provider_sub = f"{provider}@{sub}"
email_claim = auth_manager_config.OAUTH_EMAIL_CLAIM
email = user_data.get(email_claim, "").lower()
email = user_data.get(email_claim, "")
# We currently mandate that email addresses are provided
if not email:
log.warning(f"OAuth callback failed, email is missing: {user_data}")
raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
# If the provider is GitHub,and public email is not provided, we can use the access token to fetch the user's email
if provider == "github":
try:
access_token = token.get("access_token")
headers = {"Authorization": f"Bearer {access_token}"}
async with aiohttp.ClientSession() as session:
async with session.get(
"https://api.github.com/user/emails", headers=headers
) as resp:
if resp.ok:
emails = await resp.json()
# use the primary email as the user's email
primary_email = next(
(e["email"] for e in emails if e.get("primary")),
None,
)
if primary_email:
email = primary_email
else:
log.warning(
"No primary email found in GitHub response"
)
raise HTTPException(
400, detail=ERROR_MESSAGES.INVALID_CRED
)
else:
log.warning("Failed to fetch GitHub email")
raise HTTPException(
400, detail=ERROR_MESSAGES.INVALID_CRED
)
except Exception as e:
log.warning(f"Error fetching GitHub email: {e}")
raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
else:
log.warning(f"OAuth callback failed, email is missing: {user_data}")
raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
email = email.lower()
if (
"*" not in auth_manager_config.OAUTH_ALLOWED_DOMAINS
and email.split("@")[-1] not in auth_manager_config.OAUTH_ALLOWED_DOMAINS
@@ -273,21 +315,10 @@ class OAuthManager:
if not user:
user_count = Users.get_num_users()
if (
request.app.state.USER_COUNT
and user_count >= request.app.state.USER_COUNT
):
raise HTTPException(
403,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
# If the user does not exist, check if signups are enabled
if auth_manager_config.ENABLE_OAUTH_SIGNUP:
# Check if an existing user with the same email already exists
existing_user = Users.get_user_by_email(
user_data.get("email", "").lower()
)
existing_user = Users.get_user_by_email(email)
if existing_user:
raise HTTPException(400, detail=ERROR_MESSAGES.EMAIL_TAKEN)

View File

@@ -4,6 +4,7 @@ from open_webui.utils.misc import (
)
from typing import Callable, Optional
import json
# inplace function: form_data is modified
@@ -67,38 +68,49 @@ def apply_model_params_to_body_openai(params: dict, form_data: dict) -> dict:
def apply_model_params_to_body_ollama(params: dict, form_data: dict) -> dict:
opts = [
"temperature",
"top_p",
"seed",
"mirostat",
"mirostat_eta",
"mirostat_tau",
"num_ctx",
"num_batch",
"num_keep",
"repeat_last_n",
"tfs_z",
"top_k",
"min_p",
"use_mmap",
"use_mlock",
"num_thread",
"num_gpu",
]
mappings = {i: lambda x: x for i in opts}
form_data = apply_model_params_to_body(params, form_data, mappings)
# Convert OpenAI parameter names to Ollama parameter names if needed.
name_differences = {
"max_tokens": "num_predict",
"frequency_penalty": "repeat_penalty",
}
for key, value in name_differences.items():
if (param := params.get(key, None)) is not None:
form_data[value] = param
# Copy the parameter to new name then delete it, to prevent Ollama warning of invalid option provided
params[value] = params[key]
del params[key]
return form_data
# See https://github.com/ollama/ollama/blob/main/docs/api.md#request-8
mappings = {
"temperature": float,
"top_p": float,
"seed": lambda x: x,
"mirostat": int,
"mirostat_eta": float,
"mirostat_tau": float,
"num_ctx": int,
"num_batch": int,
"num_keep": int,
"num_predict": int,
"repeat_last_n": int,
"top_k": int,
"min_p": float,
"typical_p": float,
"repeat_penalty": float,
"presence_penalty": float,
"frequency_penalty": float,
"penalize_newline": bool,
"stop": lambda x: [bytes(s, "utf-8").decode("unicode_escape") for s in x],
"numa": bool,
"num_gpu": int,
"main_gpu": int,
"low_vram": bool,
"vocab_only": bool,
"use_mmap": bool,
"use_mlock": bool,
"num_thread": int,
}
return apply_model_params_to_body(params, form_data, mappings)
def convert_messages_openai_to_ollama(messages: list[dict]) -> list[dict]:
@@ -109,11 +121,38 @@ def convert_messages_openai_to_ollama(messages: list[dict]) -> list[dict]:
new_message = {"role": message["role"]}
content = message.get("content", [])
tool_calls = message.get("tool_calls", None)
tool_call_id = message.get("tool_call_id", None)
# Check if the content is a string (just a simple message)
if isinstance(content, str):
if isinstance(content, str) and not tool_calls:
# If the content is a string, it's pure text
new_message["content"] = content
# If message is a tool call, add the tool call id to the message
if tool_call_id:
new_message["tool_call_id"] = tool_call_id
elif tool_calls:
# If tool calls are present, add them to the message
ollama_tool_calls = []
for tool_call in tool_calls:
ollama_tool_call = {
"index": tool_call.get("index", 0),
"id": tool_call.get("id", None),
"function": {
"name": tool_call.get("function", {}).get("name", ""),
"arguments": json.loads(
tool_call.get("function", {}).get("arguments", {})
),
},
}
ollama_tool_calls.append(ollama_tool_call)
new_message["tool_calls"] = ollama_tool_calls
# Put the content to empty string (Ollama requires an empty string for tool calls)
new_message["content"] = ""
else:
# Otherwise, assume the content is a list of dicts, e.g., text followed by an image URL
content_text = ""
@@ -174,33 +213,28 @@ def convert_payload_openai_to_ollama(openai_payload: dict) -> dict:
ollama_payload["format"] = openai_payload["format"]
# If there are advanced parameters in the payload, format them in Ollama's options field
ollama_options = {}
if openai_payload.get("options"):
ollama_payload["options"] = openai_payload["options"]
ollama_options = openai_payload["options"]
# Handle parameters which map directly
for param in ["temperature", "top_p", "seed"]:
if param in openai_payload:
ollama_options[param] = openai_payload[param]
# Re-Mapping OpenAI's `max_tokens` -> Ollama's `num_predict`
if "max_tokens" in ollama_options:
ollama_options["num_predict"] = ollama_options["max_tokens"]
del ollama_options[
"max_tokens"
] # To prevent Ollama warning of invalid option provided
# Mapping OpenAI's `max_tokens` -> Ollama's `num_predict`
if "max_completion_tokens" in openai_payload:
ollama_options["num_predict"] = openai_payload["max_completion_tokens"]
elif "max_tokens" in openai_payload:
ollama_options["num_predict"] = openai_payload["max_tokens"]
# Ollama lacks a "system" prompt option. It has to be provided as a direct parameter, so we copy it down.
if "system" in ollama_options:
ollama_payload["system"] = ollama_options["system"]
del ollama_options[
"system"
] # To prevent Ollama warning of invalid option provided
# Handle frequency / presence_penalty, which needs renaming and checking
if "frequency_penalty" in openai_payload:
ollama_options["repeat_penalty"] = openai_payload["frequency_penalty"]
if "presence_penalty" in openai_payload and "penalty" not in ollama_options:
# We are assuming presence penalty uses a similar concept in Ollama, which needs custom handling if exists.
ollama_options["new_topic_penalty"] = openai_payload["presence_penalty"]
# Add options to payload if any have been set
if ollama_options:
# If there is the "stop" parameter in the openai_payload, remap it to the ollama_payload.options
if "stop" in openai_payload:
ollama_options = ollama_payload.get("options", {})
ollama_options["stop"] = openai_payload.get("stop")
ollama_payload["options"] = ollama_options
if "metadata" in openai_payload:

View File

@@ -45,7 +45,7 @@ def extract_frontmatter(content):
frontmatter[key.strip()] = value.strip()
except Exception as e:
print(f"An error occurred: {e}")
log.exception(f"Failed to extract frontmatter: {e}")
return {}
return frontmatter

View File

@@ -24,17 +24,8 @@ def convert_ollama_tool_call_to_openai(tool_calls: dict) -> dict:
return openai_tool_calls
def convert_response_ollama_to_openai(ollama_response: dict) -> dict:
model = ollama_response.get("model", "ollama")
message_content = ollama_response.get("message", {}).get("content", "")
tool_calls = ollama_response.get("message", {}).get("tool_calls", None)
openai_tool_calls = None
if tool_calls:
openai_tool_calls = convert_ollama_tool_call_to_openai(tool_calls)
data = ollama_response
usage = {
def convert_ollama_usage_to_openai(data: dict) -> dict:
return {
"response_token/s": (
round(
(
@@ -66,14 +57,42 @@ def convert_response_ollama_to_openai(ollama_response: dict) -> dict:
"total_duration": data.get("total_duration", 0),
"load_duration": data.get("load_duration", 0),
"prompt_eval_count": data.get("prompt_eval_count", 0),
"prompt_tokens": int(
data.get("prompt_eval_count", 0)
), # This is the OpenAI compatible key
"prompt_eval_duration": data.get("prompt_eval_duration", 0),
"eval_count": data.get("eval_count", 0),
"completion_tokens": int(
data.get("eval_count", 0)
), # This is the OpenAI compatible key
"eval_duration": data.get("eval_duration", 0),
"approximate_total": (lambda s: f"{s // 3600}h{(s % 3600) // 60}m{s % 60}s")(
(data.get("total_duration", 0) or 0) // 1_000_000_000
),
"total_tokens": int( # This is the OpenAI compatible key
data.get("prompt_eval_count", 0) + data.get("eval_count", 0)
),
"completion_tokens_details": { # This is the OpenAI compatible key
"reasoning_tokens": 0,
"accepted_prediction_tokens": 0,
"rejected_prediction_tokens": 0,
},
}
def convert_response_ollama_to_openai(ollama_response: dict) -> dict:
model = ollama_response.get("model", "ollama")
message_content = ollama_response.get("message", {}).get("content", "")
tool_calls = ollama_response.get("message", {}).get("tool_calls", None)
openai_tool_calls = None
if tool_calls:
openai_tool_calls = convert_ollama_tool_call_to_openai(tool_calls)
data = ollama_response
usage = convert_ollama_usage_to_openai(data)
response = openai_chat_completion_message_template(
model, message_content, openai_tool_calls, usage
)
@@ -85,7 +104,7 @@ async def convert_streaming_response_ollama_to_openai(ollama_streaming_response)
data = json.loads(data)
model = data.get("model", "ollama")
message_content = data.get("message", {}).get("content", "")
message_content = data.get("message", {}).get("content", None)
tool_calls = data.get("message", {}).get("tool_calls", None)
openai_tool_calls = None
@@ -96,48 +115,10 @@ async def convert_streaming_response_ollama_to_openai(ollama_streaming_response)
usage = None
if done:
usage = {
"response_token/s": (
round(
(
(
data.get("eval_count", 0)
/ ((data.get("eval_duration", 0) / 10_000_000))
)
* 100
),
2,
)
if data.get("eval_duration", 0) > 0
else "N/A"
),
"prompt_token/s": (
round(
(
(
data.get("prompt_eval_count", 0)
/ ((data.get("prompt_eval_duration", 0) / 10_000_000))
)
* 100
),
2,
)
if data.get("prompt_eval_duration", 0) > 0
else "N/A"
),
"total_duration": data.get("total_duration", 0),
"load_duration": data.get("load_duration", 0),
"prompt_eval_count": data.get("prompt_eval_count", 0),
"prompt_eval_duration": data.get("prompt_eval_duration", 0),
"eval_count": data.get("eval_count", 0),
"eval_duration": data.get("eval_duration", 0),
"approximate_total": (
lambda s: f"{s // 3600}h{(s % 3600) // 60}m{s % 60}s"
)((data.get("total_duration", 0) or 0) // 1_000_000_000),
}
usage = convert_ollama_usage_to_openai(data)
data = openai_chat_chunk_message_template(
model, message_content if not done else None, openai_tool_calls, usage
model, message_content, openai_tool_calls, usage
)
line = f"data: {json.dumps(data)}\n\n"

View File

@@ -22,7 +22,7 @@ def get_task_model_id(
# Set the task model
task_model_id = default_model_id
# Check if the user has a custom task model and use that model
if models[task_model_id]["owned_by"] == "ollama":
if models[task_model_id].get("owned_by") == "ollama":
if task_model and task_model in models:
task_model_id = task_model
else: