mirror of
https://github.com/open-webui/mcpo
synced 2025-06-26 18:26:58 +00:00
Abstracted out alias checks and creation
This commit is contained in:
@@ -48,6 +48,32 @@ def process_tool_response(result: CallToolResult) -> list:
|
||||
return response
|
||||
|
||||
|
||||
def name_needs_alias(name: str) -> bool:
|
||||
"""Check if a field name needs aliasing (for now if it starts with '__')."""
|
||||
return name.startswith('__')
|
||||
|
||||
|
||||
def generate_alias_name(original_name: str, existing_names: set) -> str:
|
||||
"""
|
||||
Generate an alias field name by stripping unwanted chars, and avoiding conflicts with existing names.
|
||||
|
||||
Args:
|
||||
original_name: The original field name (should start with '__')
|
||||
existing_names: Set of existing names to avoid conflicts with
|
||||
|
||||
Returns:
|
||||
An alias name that doesn't conflict with existing names
|
||||
"""
|
||||
alias_name = original_name.lstrip('_')
|
||||
# Handle potential naming conflicts
|
||||
original_alias_name = alias_name
|
||||
suffix_counter = 1
|
||||
while alias_name in existing_names:
|
||||
alias_name = f"{original_alias_name}_{suffix_counter}"
|
||||
suffix_counter += 1
|
||||
return alias_name
|
||||
|
||||
|
||||
def _process_schema_property(
|
||||
_model_cache: Dict[str, Type],
|
||||
prop_schema: Dict[str, Any],
|
||||
@@ -130,21 +156,15 @@ def _process_schema_property(
|
||||
schema_defs,
|
||||
)
|
||||
|
||||
# Handle nested field names with leading underscores
|
||||
if name.startswith('__'):
|
||||
clean_name = name.lstrip('_')
|
||||
# Handle potential naming conflicts
|
||||
original_clean_name = clean_name
|
||||
suffix_counter = 1
|
||||
while clean_name in nested_properties or clean_name in nested_fields:
|
||||
clean_name = f"{original_clean_name}_{suffix_counter}"
|
||||
suffix_counter += 1
|
||||
if name_needs_alias(name):
|
||||
other_names = set().union(nested_properties, nested_fields, _model_cache)
|
||||
alias_name = generate_alias_name(name, other_names)
|
||||
aliased_field = Field(
|
||||
default=nested_pydantic_field.default,
|
||||
description=nested_pydantic_field.description,
|
||||
alias=name
|
||||
)
|
||||
nested_fields[clean_name] = (nested_type_hint, aliased_field)
|
||||
nested_fields[alias_name] = (nested_type_hint, aliased_field)
|
||||
else:
|
||||
nested_fields[name] = (nested_type_hint, nested_pydantic_field)
|
||||
|
||||
@@ -205,20 +225,18 @@ def get_model_fields(form_model_name, properties, required_fields, schema_defs=N
|
||||
)
|
||||
|
||||
# Handle parameter names with leading underscores (e.g., __top, __filter) which Pydantic v2 does not allow
|
||||
if param_name.startswith('__'):
|
||||
clean_name = param_name.lstrip('_')
|
||||
# Handle potential naming conflicts
|
||||
original_clean_name = clean_name
|
||||
suffix_counter = 1
|
||||
while clean_name in properties or clean_name in model_fields:
|
||||
clean_name = f"{original_clean_name}_{suffix_counter}"
|
||||
suffix_counter += 1
|
||||
if name_needs_alias(param_name):
|
||||
print(f"DEBUG: Handling underscore parameter: {param_name}")
|
||||
other_names = set().union(properties, model_fields, _model_cache)
|
||||
alias_name = generate_alias_name(param_name, other_names)
|
||||
print(f"DEBUG: Clean name for {param_name} is {alias_name}")
|
||||
aliased_field = Field(
|
||||
default=pydantic_field_info.default,
|
||||
description=pydantic_field_info.description,
|
||||
alias=param_name
|
||||
)
|
||||
model_fields[clean_name] = (python_type_hint, aliased_field)
|
||||
# Use the generated type hint and Field info
|
||||
model_fields[alias_name] = (python_type_hint, aliased_field)
|
||||
else:
|
||||
model_fields[param_name] = (python_type_hint, pydantic_field_info)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user