Merge branch 'dev' into chat-message-rebased

This commit is contained in:
Tim Baek
2026-02-02 09:33:41 -06:00
committed by GitHub
6 changed files with 695 additions and 158 deletions

View File

@@ -294,6 +294,476 @@ def get_citation_source_from_tool_result(
]
def split_content_and_whitespace(content):
content_stripped = content.rstrip()
original_whitespace = (
content[len(content_stripped) :] if len(content) > len(content_stripped) else ""
)
return content_stripped, original_whitespace
def is_opening_code_block(content):
backtick_segments = content.split("```")
# Even number of segments means the last backticks are opening a new block
return len(backtick_segments) > 1 and len(backtick_segments) % 2 == 0
def serialize_output(output: list) -> str:
"""
Convert OR-aligned output items to HTML for display.
For LLM consumption, use convert_output_to_messages() instead.
"""
content = ""
# First pass: collect function_call_output items by call_id for lookup
tool_outputs = {}
for item in output:
if item.get("type") == "function_call_output":
tool_outputs[item.get("call_id")] = item
# Second pass: render items in order
for idx, item in enumerate(output):
item_type = item.get("type", "")
if item_type == "message":
for content_part in item.get("content", []):
if "text" in content_part:
text = content_part.get("text", "").strip()
if text:
content = f"{content}{text}\n"
elif item_type == "function_call":
# Render tool call inline with its result (if available)
if content and not content.endswith("\n"):
content += "\n"
call_id = item.get("call_id", "")
name = item.get("name", "")
arguments = item.get("arguments", "")
result_item = tool_outputs.get(call_id)
if result_item:
result_text = ""
for out in result_item.get("output", []):
if "text" in out:
result_text += out.get("text", "")
files = result_item.get("files")
embeds = result_item.get("embeds", "")
content += f'<details type="tool_calls" done="true" id="{call_id}" name="{name}" arguments="{html.escape(json.dumps(arguments))}" result="{html.escape(json.dumps(result_text, ensure_ascii=False))}" files="{html.escape(json.dumps(files)) if files else ""}" embeds="{html.escape(json.dumps(embeds))}">\n<summary>Tool Executed</summary>\n</details>\n'
else:
content += f'<details type="tool_calls" done="false" id="{call_id}" name="{name}" arguments="{html.escape(json.dumps(arguments))}">\n<summary>Executing...</summary>\n</details>\n'
elif item_type == "function_call_output":
# Already handled inline with function_call above
pass
elif item_type == "reasoning":
reasoning_content = ""
# Check for 'summary' (new structure) or 'content' (legacy/fallback)
source_list = item.get("summary", []) or item.get("content", [])
for content_part in source_list:
if "text" in content_part:
reasoning_content += content_part.get("text", "")
elif "summary" in content_part: # Handle potential nested logic if any
pass
reasoning_content = reasoning_content.strip()
duration = item.get("duration")
status = item.get("status", "in_progress")
# Infer completion: if this reasoning item is NOT the last item,
# render as done (a subsequent item means reasoning is complete)
is_last_item = idx == len(output) - 1
if content and not content.endswith("\n"):
content += "\n"
display = html.escape(
"\n".join(
(f"> {line}" if not line.startswith(">") else line)
for line in reasoning_content.splitlines()
)
)
if status == "completed" or duration is not None or not is_last_item:
content = f'{content}<details type="reasoning" done="true" duration="{duration or 0}">\n<summary>Thought for {duration or 0} seconds</summary>\n{display}\n</details>\n'
else:
content = f'{content}<details type="reasoning" done="false">\n<summary>Thinking…</summary>\n{display}\n</details>\n'
elif item_type == "open_webui:code_interpreter":
content_stripped, original_whitespace = split_content_and_whitespace(
content
)
if is_opening_code_block(content_stripped):
content = content_stripped.rstrip("`").rstrip() + original_whitespace
else:
content = content_stripped + original_whitespace
if content and not content.endswith("\n"):
content += "\n"
return content.strip()
def deep_merge(target, source):
"""
Merge source into target recursively (returning new structure).
- Dicts: Recursive merge.
- Strings: Concatenation.
- Others: Overwrite.
"""
if isinstance(target, dict) and isinstance(source, dict):
new_target = target.copy()
for k, v in source.items():
if k in new_target:
new_target[k] = deep_merge(new_target[k], v)
else:
new_target[k] = v
return new_target
elif isinstance(target, str) and isinstance(source, str):
return target + source
else:
return source
def handle_responses_streaming_event(
data: dict,
current_output: list,
) -> tuple[list, dict | None]:
"""
Handle Responses API streaming events in a pure functional way.
Args:
data: The event data
current_output: List of output items (treated as immutable)
Returns:
tuple[list, dict | None]: (new_output, metadata)
- new_output: The updated output list.
- metadata: Metadata to emit (e.g. usage), {} if update occurred, None if skip.
"""
# Default: no change
# Note: treating current_output as immutable, but avoiding full deepcopy for perf.
# We will shallow copy only if we need to modify the list structure or items.
event_type = data.get("type", "")
if event_type == "response.output_item.added":
item = data.get("item", {})
if item:
new_output = list(current_output)
new_output.append(item)
return new_output, None
return current_output, None
elif event_type == "response.content_part.added":
part = data.get("part", {})
output_index = data.get("output_index", len(current_output) - 1)
if current_output and 0 <= output_index < len(current_output):
new_output = list(current_output)
# Copy the item to mutate it
item = new_output[output_index].copy()
new_output[output_index] = item
if "content" not in item:
item["content"] = []
else:
# Copy content list
item["content"] = list(item["content"])
if item.get("type") == "reasoning":
# Reasoning items should not have content parts
pass
else:
item["content"].append(part)
item["content"].append(part)
return new_output, None
return current_output, None
elif event_type == "response.reasoning_summary_part.added":
part = data.get("part", {})
output_index = data.get("output_index", len(current_output) - 1)
if current_output and 0 <= output_index < len(current_output):
new_output = list(current_output)
item = new_output[output_index].copy()
new_output[output_index] = item
if "summary" not in item:
item["summary"] = []
else:
item["summary"] = list(item["summary"])
item["summary"].append(part)
return new_output, None
return current_output, None
elif event_type.startswith("response.") and event_type.endswith(".delta"):
# Generic Delta Handling
parts = event_type.split(".")
if len(parts) >= 3:
delta_type = parts[1]
delta = data.get("delta", "")
output_index = data.get("output_index", len(current_output) - 1)
if current_output and 0 <= output_index < len(current_output):
new_output = list(current_output)
item = new_output[output_index].copy()
new_output[output_index] = item
item_type = item.get("type", "")
# Determine target field and object based on delta_type and item_type
if delta_type == "function_call_arguments":
key = "arguments"
if item_type == "function_call":
# Function call args are usually strings
item[key] = item.get(key, "") + str(delta)
else:
# Generic handling, refined by item type below
pass
if item_type == "message":
# Message items: "text"/"output_text" -> "text"
# "reasoning_text" -> Skipped (should use reasoning item)
if delta_type in ["text", "output_text"]:
key = "text"
elif delta_type in ["reasoning_text", "reasoning_summary_text"]:
# Skip reasoning updates for message items
return new_output, None
else:
key = delta_type
content_index = data.get("content_index", 0)
if "content" not in item:
item["content"] = []
else:
item["content"] = list(item["content"])
content_list = item["content"]
while len(content_list) <= content_index:
content_list.append({"type": "text", "text": ""})
# Copy the part to mutate it
part = content_list[content_index].copy()
content_list[content_index] = part
current_val = part.get(key)
if current_val is None:
# Initialize based on delta type
current_val = {} if isinstance(delta, dict) else ""
part[key] = deep_merge(current_val, delta)
elif item_type == "reasoning":
# Reasoning items: "reasoning_text"/"reasoning_summary_text" -> "text"
# "text"/"output_text" -> Skipped (should use message item)
if delta_type == "reasoning_summary_text":
# Summary updates -> item['summary']
key = "text"
summary_index = data.get("summary_index", 0)
if "summary" not in item:
item["summary"] = []
else:
item["summary"] = list(item["summary"])
summary_list = item["summary"]
while len(summary_list) <= summary_index:
summary_list.append(
{"type": "summary_text", "text": ""}
)
part = summary_list[summary_index].copy()
summary_list[summary_index] = part
target_val = part.get(key, "")
part[key] = deep_merge(target_val, delta)
elif delta_type == "reasoning_text":
# Reasoning body updates -> item['content']
key = "text"
content_index = data.get("content_index", 0)
if "content" not in item:
item["content"] = []
else:
item["content"] = list(item["content"])
content_list = item["content"]
while len(content_list) <= content_index:
# Reasoning content parts default to text
content_list.append({"type": "text", "text": ""})
part = content_list[content_index].copy()
content_list[content_index] = part
target_val = part.get(key, "")
part[key] = deep_merge(target_val, delta)
elif delta_type in ["text", "output_text"]:
return new_output, None
else:
# Fallback just in case other deltas target reasoning?
pass
else:
# Fallback for other item types
if delta_type in ["text", "output_text"]:
key = "text"
else:
key = delta_type
current_val = item.get(key)
if current_val is None:
current_val = {} if isinstance(delta, dict) else ""
item[key] = deep_merge(current_val, delta)
return new_output, None
elif event_type.startswith("response.") and event_type.endswith(".done"):
# Delta Events: response.content_part.done, response.text.done, etc.
parts = event_type.split(".")
if len(parts) >= 3:
type_name = parts[1]
# 1. Handle specific Delta "done" signals
if type_name == "content_part":
# "Signaling that no further changes will occur to a content part"
# If payloads contains the full part, we could update it.
# Usually purely signaling in standard implementation, but we check payload.
part = data.get("part")
output_index = data.get("output_index", len(current_output) - 1)
if part and current_output and 0 <= output_index < len(current_output):
new_output = list(current_output)
item = new_output[output_index].copy()
new_output[output_index] = item
if "content" in item:
item["content"] = list(item["content"])
content_index = data.get(
"content_index", len(item["content"]) - 1
)
if 0 <= content_index < len(item["content"]):
item["content"][content_index] = part
return new_output, {}
return current_output, None
elif type_name == "reasoning_summary_part":
part = data.get("part")
output_index = data.get("output_index", len(current_output) - 1)
if part and current_output and 0 <= output_index < len(current_output):
new_output = list(current_output)
item = new_output[output_index].copy()
new_output[output_index] = item
if "summary" in item:
item["summary"] = list(item["summary"])
summary_index = data.get(
"summary_index", len(item["summary"]) - 1
)
if 0 <= summary_index < len(item["summary"]):
item["summary"][summary_index] = part
return new_output, {}
return current_output, None
# 2. Skip Output Item done (handled specifically below)
if type_name == "output_item":
pass
# 3. Generic Field Done (text.done, audio.done)
elif type_name not in ["completed", "failed"]:
output_index = data.get("output_index", len(current_output) - 1)
if current_output and 0 <= output_index < len(current_output):
key = (
"text"
if type_name
in [
"text",
"output_text",
"reasoning_text",
"reasoning_summary_text",
]
else type_name
)
if type_name == "function_call_arguments":
key = "arguments"
if key in data:
final_value = data[key]
new_output = list(current_output)
item = new_output[output_index].copy()
new_output[output_index] = item
item_type = item.get("type", "")
if type_name == "function_call_arguments":
if item_type == "function_call":
item["arguments"] = final_value
elif item_type == "message":
content_index = data.get("content_index", 0)
if "content" in item:
item["content"] = list(item["content"])
if len(item["content"]) > content_index:
part = item["content"][content_index].copy()
item["content"][content_index] = part
part[key] = final_value
elif item_type == "reasoning":
item["status"] = "completed"
else:
item[key] = final_value
return new_output, {}
return current_output, None
elif event_type == "response.output_item.done":
# Delta Event: Output item complete
item = data.get("item")
output_index = data.get("output_index", len(current_output) - 1)
new_output = list(current_output)
if item and 0 <= output_index < len(current_output):
new_output[output_index] = item
elif item:
new_output.append(item)
return new_output, {}
elif event_type == "response.completed":
# State Machine Event: Completed
response_data = data.get("response", {})
final_output = response_data.get("output")
new_output = final_output if final_output is not None else current_output
# Ensure reasoning items are marked as completed in the final output
if new_output:
for item in new_output:
if (
item.get("type") == "reasoning"
and item.get("status") != "completed"
):
item["status"] = "completed"
return new_output, {"usage": response_data.get("usage"), "done": True}
elif event_type == "response.in_progress":
# State Machine Event: In Progress
# We could extract metadata if needed, but for now just acknowledge iteration
return current_output, None
elif event_type == "response.failed":
# State Machine Event: Failed
error = data.get("response", {}).get("error", {})
return current_output, {"error": error}
else:
return current_output, None
def apply_source_context_to_messages(
request: Request,
messages: list,
@@ -1571,7 +2041,9 @@ async def process_chat_payload(request, form_data, user, metadata, model):
raise e
try:
filter_ids = get_sorted_filter_ids(request, model, metadata.get("filter_ids", []))
filter_ids = get_sorted_filter_ids(
request, model, metadata.get("filter_ids", [])
)
filter_functions = Functions.get_functions_by_ids(filter_ids)
form_data, flags = await process_filter_functions(
@@ -2368,20 +2840,6 @@ async def process_chat_response(
task_id = str(uuid4()) # Create a unique task ID.
model_id = form_data.get("model", "")
def split_content_and_whitespace(content):
content_stripped = content.rstrip()
original_whitespace = (
content[len(content_stripped) :]
if len(content) > len(content_stripped)
else ""
)
return content_stripped, original_whitespace
def is_opening_code_block(content):
backtick_segments = content.split("```")
# Even number of segments means the last backticks are opening a new block
return len(backtick_segments) > 1 and len(backtick_segments) % 2 == 0
# Handle as a background task
async def response_handler(response, events):
def serialize_content_blocks(content_blocks, raw=False):
@@ -2519,105 +2977,6 @@ async def process_chat_response(
return content.strip()
def serialize_output(output: list) -> str:
"""
Convert OR-aligned output items to HTML for display.
For LLM consumption, use convert_output_to_messages() instead.
"""
content = ""
# First pass: collect function_call_output items by call_id for lookup
tool_outputs = {}
for item in output:
if item.get("type") == "function_call_output":
tool_outputs[item.get("call_id")] = item
# Second pass: render items in order
for item in output:
item_type = item.get("type", "")
if item_type == "message":
for content_part in item.get("content", []):
if content_part.get("type") == "output_text":
text = content_part.get("text", "").strip()
if text:
content = f"{content}{text}\n"
elif item_type == "function_call":
# Render tool call inline with its result (if available)
if content and not content.endswith("\n"):
content += "\n"
call_id = item.get("call_id", "")
name = item.get("name", "")
arguments = item.get("arguments", "")
result_item = tool_outputs.get(call_id)
if result_item:
result_text = ""
for out in result_item.get("output", []):
if out.get("type") == "input_text":
result_text += out.get("text", "")
files = result_item.get("files")
embeds = result_item.get("embeds", "")
content += f'<details type="tool_calls" done="true" id="{call_id}" name="{name}" arguments="{html.escape(json.dumps(arguments))}" result="{html.escape(json.dumps(result_text, ensure_ascii=False))}" files="{html.escape(json.dumps(files)) if files else ""}" embeds="{html.escape(json.dumps(embeds))}">\n<summary>Tool Executed</summary>\n</details>\n'
else:
content += f'<details type="tool_calls" done="false" id="{call_id}" name="{name}" arguments="{html.escape(json.dumps(arguments))}">\n<summary>Executing...</summary>\n</details>\n'
elif item_type == "function_call_output":
# Already handled inline with function_call above
pass
elif item_type == "reasoning":
reasoning_content = ""
for content_part in item.get("content", []):
if content_part.get("type") == "output_text":
reasoning_content = content_part.get("text", "").strip()
duration = item.get("duration")
status = item.get("status", "in_progress")
if content and not content.endswith("\n"):
content += "\n"
display = html.escape(
"\n".join(
(f"> {line}" if not line.startswith(">") else line)
for line in reasoning_content.splitlines()
)
)
if status == "completed" or duration is not None:
content = f'{content}<details type="reasoning" done="true" duration="{duration or 0}">\n<summary>Thought for {duration or 0} seconds</summary>\n{display}\n</details>\n'
else:
content = f'{content}<details type="reasoning" done="false">\n<summary>Thinking…</summary>\n{display}\n</details>\n'
elif item_type == "open_webui:code_interpreter":
code = item.get("code", "")
output_val = item.get("output")
lang = item.get("lang", "")
content_stripped, original_whitespace = (
split_content_and_whitespace(content)
)
if is_opening_code_block(content_stripped):
content = (
content_stripped.rstrip("`").rstrip()
+ original_whitespace
)
else:
content = content_stripped + original_whitespace
if content and not content.endswith("\n"):
content += "\n"
if output_val:
output_escaped = html.escape(json.dumps(output_val))
content = f'{content}<details type="code_interpreter" done="true" output="{output_escaped}">\n<summary>Analyzed</summary>\n```{lang}\n{code}\n```\n</details>\n'
else:
content = f'{content}<details type="code_interpreter" done="false">\n<summary>Analyzing...</summary>\n```{lang}\n{code}\n```\n</details>\n'
return content.strip()
def convert_content_blocks_to_messages(content_blocks, raw=False):
@@ -2983,16 +3342,19 @@ async def process_chat_response(
if existing_output:
output = existing_output
else:
# Always create an initial message item (even if content is empty)
output = [
{
"type": "message",
"id": output_id("msg"),
"status": "in_progress",
"role": "assistant",
"content": [{"type": "output_text", "text": content}],
}
]
# Only create an initial message item if there is content to initialize with
if content:
output = [
{
"type": "message",
"id": output_id("msg"),
"status": "in_progress",
"role": "assistant",
"content": [{"type": "output_text", "text": content}],
}
]
else:
output = []
# Keep content_blocks for backward compatibility during transition
content_blocks = [
@@ -3043,6 +3405,7 @@ async def process_chat_response(
nonlocal content
nonlocal content_blocks
nonlocal usage
nonlocal output
response_tool_calls = []
@@ -3121,6 +3484,31 @@ async def process_chat_response(
"data": data,
}
)
# Check for Responses API events (type field starts with "response.")
elif data.get("type", "").startswith("response."):
output, response_metadata = (
handle_responses_streaming_event(data, output)
)
processed_data = {
"output": output,
"content": serialize_output(output),
}
# print(data)
# print(processed_data)
# Merge any metadata (usage, done, etc.)
if response_metadata:
processed_data.update(response_metadata)
await event_emitter(
{
"type": "chat:completion",
"data": processed_data,
}
)
continue
else:
choices = data.get("choices", [])