Merge remote-tracking branch 'upstream/dev'

This commit is contained in:
Rory
2025-02-03 17:28:53 -06:00
79 changed files with 1143 additions and 470 deletions

View File

@@ -7,7 +7,10 @@ from aiocache import cached
from typing import Any, Optional
import random
import json
import html
import inspect
import re
from uuid import uuid4
from concurrent.futures import ThreadPoolExecutor
@@ -54,6 +57,7 @@ from open_webui.utils.task import (
from open_webui.utils.misc import (
get_message_list,
add_or_update_system_message,
add_or_update_user_message,
get_last_user_message,
get_last_assistant_message,
prepend_to_first_user_message_content,
@@ -64,7 +68,10 @@ from open_webui.utils.plugin import load_function_module_by_id
from open_webui.tasks import create_task
from open_webui.config import DEFAULT_TOOLS_FUNCTION_CALLING_PROMPT_TEMPLATE
from open_webui.config import (
DEFAULT_TOOLS_FUNCTION_CALLING_PROMPT_TEMPLATE,
DEFAULT_CODE_INTERPRETER_PROMPT,
)
from open_webui.env import (
SRC_LOG_LEVELS,
GLOBAL_LOG_LEVEL,
@@ -768,6 +775,11 @@ async def process_chat_payload(request, form_data, metadata, user, model):
request, form_data, extra_params, user
)
if "code_interpreter" in features and features["code_interpreter"]:
form_data["messages"] = add_or_update_user_message(
DEFAULT_CODE_INTERPRETER_PROMPT, form_data["messages"]
)
try:
form_data, flags = await chat_completion_filter_functions_handler(
request, form_data, model, extra_params
@@ -982,6 +994,7 @@ async def process_chat_response(
pass
event_emitter = None
event_caller = None
if (
"session_id" in metadata
and metadata["session_id"]
@@ -991,10 +1004,11 @@ async def process_chat_response(
and metadata["message_id"]
):
event_emitter = get_event_emitter(metadata)
event_caller = get_event_call(metadata)
# Non-streaming response
if not isinstance(response, StreamingResponse):
if event_emitter:
if "selected_model_id" in response:
Chats.upsert_message_to_chat_by_id_and_message_id(
metadata["chat_id"],
@@ -1059,22 +1073,156 @@ async def process_chat_response(
else:
return response
# Non standard response
if not any(
content_type in response.headers["Content-Type"]
for content_type in ["text/event-stream", "application/x-ndjson"]
):
return response
if event_emitter:
# Streaming response
if event_emitter and event_caller:
task_id = str(uuid4()) # Create a unique task ID.
model_id = form_data.get("model", "")
# Handle as a background task
async def post_response_handler(response, events):
def serialize_content_blocks(content_blocks, raw=False):
content = ""
for block in content_blocks:
if block["type"] == "text":
content = f"{content}{block['content'].strip()}\n"
elif block["type"] == "reasoning":
reasoning_display_content = "\n".join(
(f"> {line}" if not line.startswith(">") else line)
for line in block["content"].splitlines()
)
reasoning_duration = block.get("duration", None)
if reasoning_duration:
content = f'{content}<details type="reasoning" done="true" duration="{reasoning_duration}">\n<summary>Thought for {reasoning_duration} seconds</summary>\n{reasoning_display_content}\n</details>\n'
else:
content = f'{content}<details type="reasoning" done="false">\n<summary>Thinking…</summary>\n{reasoning_display_content}\n</details>\n'
elif block["type"] == "code_interpreter":
attributes = block.get("attributes", {})
output = block.get("output", None)
lang = attributes.get("lang", "")
if output:
output = html.escape(json.dumps(output))
if raw:
content = f'{content}<details type="code_interpreter" done="true" output="{output}">\n<summary>Analyzed</summary>\n```{lang}\n{block["content"]}\n```\n```output\n{output}\n```\n</details>\n'
else:
content = f'{content}<details type="code_interpreter" done="true" output="{output}">\n<summary>Analyzed</summary>\n```{lang}\n{block["content"]}\n```\n</details>\n'
else:
content = f'{content}<details type="code_interpreter" done="false">\n<summary>Analyzing...</summary>\n```{lang}\n{block["content"]}\n```\n</details>\n'
else:
block_content = str(block["content"]).strip()
content = f"{content}{block['type']}: {block_content}\n"
return content
def tag_content_handler(content_type, tags, content, content_blocks):
end_flag = False
def extract_attributes(tag_content):
"""Extract attributes from a tag if they exist."""
attributes = {}
# Match attributes in the format: key="value" (ignores single quotes for simplicity)
matches = re.findall(r'(\w+)\s*=\s*"([^"]+)"', tag_content)
for key, value in matches:
attributes[key] = value
return attributes
if content_blocks[-1]["type"] == "text":
for tag in tags:
# Match start tag e.g., <tag> or <tag attr="value">
start_tag_pattern = rf"<{tag}(.*?)>"
match = re.search(start_tag_pattern, content)
if match:
# Extract attributes in the tag (if present)
attributes = extract_attributes(match.group(1))
# Remove the start tag from the currently handling text block
content_blocks[-1]["content"] = content_blocks[-1][
"content"
].replace(match.group(0), "")
if not content_blocks[-1]["content"]:
content_blocks.pop()
# Append the new block
content_blocks.append(
{
"type": content_type,
"tag": tag,
"attributes": attributes,
"content": "",
"started_at": time.time(),
}
)
break
elif content_blocks[-1]["type"] == content_type:
tag = content_blocks[-1]["tag"]
# Match end tag e.g., </tag>
end_tag_pattern = rf"</{tag}>"
if re.search(end_tag_pattern, content):
block_content = content_blocks[-1]["content"]
# Strip start and end tags from the content
start_tag_pattern = rf"<{tag}(.*?)>"
block_content = re.sub(
start_tag_pattern, "", block_content
).strip()
block_content = re.sub(
end_tag_pattern, "", block_content
).strip()
if block_content:
end_flag = True
content_blocks[-1]["content"] = block_content
content_blocks[-1]["ended_at"] = time.time()
content_blocks[-1]["duration"] = int(
content_blocks[-1]["ended_at"]
- content_blocks[-1]["started_at"]
)
# Reset the content_blocks by appending a new text block
content_blocks.append(
{
"type": "text",
"content": "",
}
)
# Clean processed content
content = re.sub(
rf"<{tag}(.*?)>(.|\n)*?</{tag}>",
"",
content,
flags=re.DOTALL,
)
else:
# Remove the block if content is empty
content_blocks.pop()
return content, content_blocks, end_flag
message = Chats.get_message_by_id_and_message_id(
metadata["chat_id"], metadata["message_id"]
)
content = message.get("content", "") if message else ""
content_blocks = [
{
"type": "text",
"content": content,
}
]
# We might want to disable this by default
DETECT_REASONING = True
DETECT_CODE_INTERPRETER = True
reasoning_tags = ["think", "reason", "reasoning", "thought", "Thought"]
code_interpreter_tags = ["code_interpreter"]
try:
for event in events:
@@ -1094,148 +1242,193 @@ async def process_chat_response(
},
)
# We might want to disable this by default
detect_reasoning = True
reasoning_tags = ["think", "reason", "reasoning", "thought", "Thought"]
current_tag = None
async def stream_body_handler(response):
nonlocal content
nonlocal content_blocks
reasoning_start_time = None
async for line in response.body_iterator:
line = line.decode("utf-8") if isinstance(line, bytes) else line
data = line
reasoning_content = ""
ongoing_content = ""
async for line in response.body_iterator:
line = line.decode("utf-8") if isinstance(line, bytes) else line
data = line
# Skip empty lines
if not data.strip():
continue
# "data:" is the prefix for each event
if not data.startswith("data:"):
continue
# Remove the prefix
data = data[len("data:") :].strip()
try:
data = json.loads(data)
if "selected_model_id" in data:
Chats.upsert_message_to_chat_by_id_and_message_id(
metadata["chat_id"],
metadata["message_id"],
{
"selectedModelId": data["selected_model_id"],
},
)
else:
value = (
data.get("choices", [])[0]
.get("delta", {})
.get("content")
)
if value:
content = f"{content}{value}"
if detect_reasoning:
for tag in reasoning_tags:
start_tag = f"<{tag}>\n"
end_tag = f"</{tag}>\n"
if start_tag in content:
# Remove the start tag
content = content.replace(start_tag, "")
ongoing_content = content
reasoning_start_time = time.time()
reasoning_content = ""
current_tag = tag
break
if reasoning_start_time is not None:
# Remove the last value from the content
content = content[: -len(value)]
reasoning_content += value
end_tag = f"</{current_tag}>\n"
if end_tag in reasoning_content:
reasoning_end_time = time.time()
reasoning_duration = int(
reasoning_end_time
- reasoning_start_time
)
reasoning_content = (
reasoning_content.strip(
f"<{current_tag}>\n"
)
.strip(end_tag)
.strip()
)
if reasoning_content:
reasoning_display_content = "\n".join(
(
f"> {line}"
if not line.startswith(">")
else line
)
for line in reasoning_content.splitlines()
)
# Format reasoning with <details> tag
content = f'{ongoing_content}<details type="reasoning" done="true" duration="{reasoning_duration}">\n<summary>Thought for {reasoning_duration} seconds</summary>\n{reasoning_display_content}\n</details>\n'
else:
content = ""
reasoning_start_time = None
else:
reasoning_display_content = "\n".join(
(
f"> {line}"
if not line.startswith(">")
else line
)
for line in reasoning_content.splitlines()
)
# Show ongoing thought process
content = f'{ongoing_content}<details type="reasoning" done="false">\n<summary>Thinking…</summary>\n{reasoning_display_content}\n</details>\n'
if ENABLE_REALTIME_CHAT_SAVE:
# Save message in the database
Chats.upsert_message_to_chat_by_id_and_message_id(
metadata["chat_id"],
metadata["message_id"],
{
"content": content,
},
)
else:
data = {
"content": content,
}
await event_emitter(
{
"type": "chat:completion",
"data": data,
}
)
except Exception as e:
done = "data: [DONE]" in line
if done:
pass
else:
# Skip empty lines
if not data.strip():
continue
# "data:" is the prefix for each event
if not data.startswith("data:"):
continue
# Remove the prefix
data = data[len("data:") :].strip()
try:
data = json.loads(data)
if "selected_model_id" in data:
model_id = data["selected_model_id"]
Chats.upsert_message_to_chat_by_id_and_message_id(
metadata["chat_id"],
metadata["message_id"],
{
"selectedModelId": model_id,
},
)
else:
choices = data.get("choices", [])
if not choices:
continue
value = choices[0].get("delta", {}).get("content")
if value:
content = f"{content}{value}"
content_blocks[-1]["content"] = (
content_blocks[-1]["content"] + value
)
if DETECT_REASONING:
content, content_blocks, _ = (
tag_content_handler(
"reasoning",
reasoning_tags,
content,
content_blocks,
)
)
if DETECT_CODE_INTERPRETER:
content, content_blocks, end = (
tag_content_handler(
"code_interpreter",
code_interpreter_tags,
content,
content_blocks,
)
)
if end:
break
if ENABLE_REALTIME_CHAT_SAVE:
# Save message in the database
Chats.upsert_message_to_chat_by_id_and_message_id(
metadata["chat_id"],
metadata["message_id"],
{
"content": serialize_content_blocks(
content_blocks
),
},
)
else:
data = {
"content": serialize_content_blocks(
content_blocks
),
}
await event_emitter(
{
"type": "chat:completion",
"data": data,
}
)
except Exception as e:
done = "data: [DONE]" in line
if done:
pass
else:
log.debug("Error: ", e)
continue
# Clean up the last text block
if content_blocks[-1]["type"] == "text":
content_blocks[-1]["content"] = content_blocks[-1][
"content"
].strip()
if not content_blocks[-1]["content"]:
content_blocks.pop()
if response.background:
await response.background()
await stream_body_handler(response)
MAX_RETRIES = 5
retries = 0
while (
content_blocks[-1]["type"] == "code_interpreter"
and retries < MAX_RETRIES
):
retries += 1
log.debug(f"Attempt count: {retries}")
try:
if content_blocks[-1]["attributes"].get("type") == "code":
output = await event_caller(
{
"type": "execute:python",
"data": {
"id": str(uuid4()),
"code": content_blocks[-1]["content"],
},
}
)
except Exception as e:
output = str(e)
content_blocks[-1]["output"] = output
content_blocks.append(
{
"type": "text",
"content": "",
}
)
await event_emitter(
{
"type": "chat:completion",
"data": {
"content": serialize_content_blocks(content_blocks),
},
}
)
try:
res = await generate_chat_completion(
request,
{
"model": model_id,
"stream": True,
"messages": [
*form_data["messages"],
{
"role": "assistant",
"content": serialize_content_blocks(
content_blocks, raw=True
),
},
],
},
user,
)
if isinstance(res, StreamingResponse):
await stream_body_handler(res)
else:
break
except Exception as e:
log.debug(e)
break
title = Chats.get_chat_title_by_id(metadata["chat_id"])
data = {"done": True, "content": content, "title": title}
data = {
"done": True,
"content": serialize_content_blocks(content_blocks),
"title": title,
}
if not ENABLE_REALTIME_CHAT_SAVE:
# Save message in the database
@@ -1243,7 +1436,7 @@ async def process_chat_response(
metadata["chat_id"],
metadata["message_id"],
{
"content": content,
"content": serialize_content_blocks(content_blocks),
},
)
@@ -1280,7 +1473,7 @@ async def process_chat_response(
metadata["chat_id"],
metadata["message_id"],
{
"content": content,
"content": serialize_content_blocks(content_blocks),
},
)

View File

@@ -131,6 +131,25 @@ def add_or_update_system_message(content: str, messages: list[dict]):
return messages
def add_or_update_user_message(content: str, messages: list[dict]):
"""
Adds a new user message at the end of the messages list
or updates the existing user message at the end.
:param msg: The message to be added or appended.
:param messages: The list of message dictionaries.
:return: The updated list of message dictionaries.
"""
if messages and messages[-1].get("role") == "user":
messages[-1]["content"] = f"{messages[-1]['content']}\n{content}"
else:
# Insert at the end
messages.append({"role": "user", "content": content})
return messages
def append_or_update_assistant_message(content: str, messages: list[dict]):
"""
Adds a new assistant message at the end of the messages list