mirror of
				https://github.com/open-webui/mcpo
				synced 2025-06-26 18:26:58 +00:00 
			
		
		
		
	Resolve "object" and "array" types
This commit is contained in:
		
							parent
							
								
									292059ee5e
								
							
						
					
					
						commit
						9cbfcd49b9
					
				
							
								
								
									
										20
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										20
									
								
								README.md
									
									
									
									
									
								
							| @ -94,6 +94,26 @@ Each with a dedicated OpenAPI schema and proxy handler. Access full schema UI at | ||||
| - Python 3.8+ | ||||
| - uv (optional, but highly recommended for performance + packaging) | ||||
| 
 | ||||
| ## 🛠️ Development & Testing | ||||
| 
 | ||||
| To contribute or run tests locally: | ||||
| 
 | ||||
| 1.  **Set up the environment:** | ||||
|     ```bash | ||||
|     # Clone the repository | ||||
|     git clone https://github.com/open-webui/mcpo.git | ||||
|     cd mcpo | ||||
| 
 | ||||
|     # Install dependencies (including dev dependencies) | ||||
|     uv sync --dev | ||||
|     ``` | ||||
| 
 | ||||
| 2.  **Run tests:** | ||||
|     ```bash | ||||
|     pytest | ||||
|     ``` | ||||
| 
 | ||||
| 
 | ||||
| ## 🪪 License | ||||
| 
 | ||||
| MIT | ||||
|  | ||||
							
								
								
									
										106
									
								
								src/mcpo/main.py
									
									
									
									
									
								
							
							
						
						
									
										106
									
								
								src/mcpo/main.py
									
									
									
									
									
								
							| @ -1,36 +1,91 @@ | ||||
| import json | ||||
| import os | ||||
| from contextlib import AsyncExitStack, asynccontextmanager | ||||
| from typing import Dict, Any, Optional | ||||
| from typing import Dict, Any, Optional, List, Type, Union, ForwardRef | ||||
| 
 | ||||
| import uvicorn | ||||
| from fastapi import FastAPI, Body, Depends | ||||
| from fastapi import FastAPI, Depends | ||||
| from fastapi.middleware.cors import CORSMiddleware | ||||
| from mcp import ClientSession, StdioServerParameters, types | ||||
| from mcp.client.stdio import stdio_client | ||||
| from mcp.types import CallToolResult | ||||
| 
 | ||||
| from mcpo.utils.auth import get_verify_api_key | ||||
| from pydantic import create_model | ||||
| from pydantic import create_model, Field | ||||
| from pydantic.fields import FieldInfo | ||||
| from starlette.routing import Mount | ||||
| 
 | ||||
| 
 | ||||
| def get_python_type(param_type: str): | ||||
|     if param_type == "string": | ||||
|         return str | ||||
|     elif param_type == "integer": | ||||
|         return int | ||||
|     elif param_type == "boolean": | ||||
|         return bool | ||||
|     elif param_type == "number": | ||||
|         return float | ||||
|     elif param_type == "object": | ||||
|         return Dict[str, Any] | ||||
|     elif param_type == "array": | ||||
|         return list | ||||
| _model_cache: Dict[str, Type] = {} | ||||
| 
 | ||||
| def _process_schema_property( | ||||
|     prop_schema: Dict[str, Any], | ||||
|     model_name_prefix: str, | ||||
|     prop_name: str, | ||||
|     is_required: bool, | ||||
| ) -> tuple[Union[Type, List, ForwardRef, Any], FieldInfo]: | ||||
|     """ | ||||
|     Recursively processes a schema property to determine its Python type hint | ||||
|     and Pydantic Field definition. | ||||
| 
 | ||||
|     Returns: | ||||
|         A tuple containing (python_type_hint, pydantic_field). | ||||
|         The pydantic_field contains default value and description. | ||||
|     """ | ||||
|     prop_type = prop_schema.get("type") | ||||
|     prop_desc = prop_schema.get("description", "") | ||||
|     default_value = ... if is_required else prop_schema.get("default", None) | ||||
|     pydantic_field = Field(default=default_value, description=prop_desc) | ||||
| 
 | ||||
|     if prop_type == "object": | ||||
|         nested_properties = prop_schema.get("properties", {}) | ||||
|         nested_required = prop_schema.get("required", []) | ||||
|         nested_fields = {} | ||||
| 
 | ||||
|         nested_model_name = f"{model_name_prefix}_{prop_name}_model".replace("__", "_").rstrip('_') | ||||
| 
 | ||||
|         if nested_model_name in _model_cache: | ||||
|             return _model_cache[nested_model_name], pydantic_field | ||||
| 
 | ||||
|         for name, schema in nested_properties.items(): | ||||
|             is_nested_required = name in nested_required | ||||
|             nested_type_hint, nested_pydantic_field = _process_schema_property( | ||||
|                 schema, nested_model_name, name, is_nested_required | ||||
|             ) | ||||
| 
 | ||||
|             nested_fields[name] = (nested_type_hint, nested_pydantic_field) | ||||
| 
 | ||||
|         if not nested_fields: | ||||
|              return Dict[str, Any], pydantic_field | ||||
| 
 | ||||
|         NestedModel = create_model(nested_model_name, **nested_fields) | ||||
|         _model_cache[nested_model_name] = NestedModel | ||||
| 
 | ||||
|         return NestedModel, pydantic_field | ||||
| 
 | ||||
|     elif prop_type == "array": | ||||
|         items_schema = prop_schema.get("items") | ||||
|         if not items_schema: | ||||
|             # Default to list of anything if items schema is missing | ||||
|             return List[Any], pydantic_field | ||||
| 
 | ||||
|         # Recursively determine the type of items in the array | ||||
|         item_type_hint, _ = _process_schema_property( | ||||
|             items_schema, f"{model_name_prefix}_{prop_name}", "item", False # Items aren't required at this level | ||||
|         ) | ||||
|         list_type_hint = List[item_type_hint] | ||||
|         return list_type_hint, pydantic_field | ||||
| 
 | ||||
|     elif prop_type == "string": | ||||
|         return str, pydantic_field | ||||
|     elif prop_type == "integer": | ||||
|         return int, pydantic_field | ||||
|     elif prop_type == "boolean": | ||||
|         return bool, pydantic_field | ||||
|     elif prop_type == "number": | ||||
|         return float, pydantic_field | ||||
|     else: | ||||
|         return str  # Fallback | ||||
|     # Expand as needed. PRs welcome! | ||||
|         return Any, pydantic_field | ||||
| 
 | ||||
| 
 | ||||
| def process_tool_response(result: CallToolResult) -> list: | ||||
| @ -80,18 +135,17 @@ async def create_dynamic_endpoints(app: FastAPI, api_dependency=None): | ||||
|         required_fields = schema.get("required", []) | ||||
|         properties = schema.get("properties", {}) | ||||
| 
 | ||||
|         form_model_name = f"{endpoint_name}_form_model" | ||||
|         for param_name, param_schema in properties.items(): | ||||
|             param_type = param_schema.get("type", "string") | ||||
|             param_desc = param_schema.get("description", "") | ||||
|             python_type = get_python_type(param_type) | ||||
|             default_value = ... if param_name in required_fields else None | ||||
|             model_fields[param_name] = ( | ||||
|                 python_type, | ||||
|                 Body(default_value, description=param_desc), | ||||
|             is_required = param_name in required_fields | ||||
|             python_type_hint, pydantic_field_info = _process_schema_property( | ||||
|                 param_schema, form_model_name, param_name, is_required | ||||
|             ) | ||||
|             # Use the generated type hint and Field info | ||||
|             model_fields[param_name] = (python_type_hint, pydantic_field_info) | ||||
| 
 | ||||
|         if model_fields: | ||||
|             FormModel = create_model(f"{endpoint_name}_form_model", **model_fields) | ||||
|             FormModel = create_model(form_model_name, **model_fields) | ||||
| 
 | ||||
|             def make_endpoint_func( | ||||
|                 endpoint_name: str, FormModel, session: ClientSession | ||||
|  | ||||
							
								
								
									
										201
									
								
								tests/test_main.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										201
									
								
								tests/test_main.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,201 @@ | ||||
| import pytest | ||||
| from pydantic import BaseModel, Field | ||||
| from typing import Any, List, Dict | ||||
| 
 | ||||
| from src.mcpo.main import _process_schema_property, _model_cache | ||||
| 
 | ||||
| 
 | ||||
| @pytest.fixture(autouse=True) | ||||
| def clear_model_cache(): | ||||
|     _model_cache.clear() | ||||
|     yield | ||||
|     _model_cache.clear() | ||||
| 
 | ||||
| 
 | ||||
| def test_process_simple_string_required(): | ||||
|     schema = {"type": "string", "description": "A simple string"} | ||||
|     expected_type = str | ||||
|     expected_field = Field(default=..., description="A simple string") | ||||
|     result_type, result_field = _process_schema_property(schema, "test", "prop", True) | ||||
|     assert result_type == expected_type | ||||
|     assert result_field.default == expected_field.default | ||||
|     assert result_field.description == expected_field.description | ||||
| 
 | ||||
| 
 | ||||
| def test_process_simple_integer_optional(): | ||||
|     schema = {"type": "integer", "default": 10} | ||||
|     expected_type = int | ||||
|     expected_field = Field(default=10, description="") | ||||
|     result_type, result_field = _process_schema_property(schema, "test", "prop", False) | ||||
|     assert result_type == expected_type | ||||
|     assert result_field.default == expected_field.default | ||||
|     assert result_field.description == expected_field.description | ||||
| 
 | ||||
| 
 | ||||
| def test_process_simple_boolean_optional_no_default(): | ||||
|     schema = {"type": "boolean"} | ||||
|     expected_type = bool | ||||
|     expected_field = Field( | ||||
|         default=None, description="" | ||||
|     )  # Default is None if not required and no default specified | ||||
|     result_type, result_field = _process_schema_property(schema, "test", "prop", False) | ||||
|     assert result_type == expected_type | ||||
|     assert result_field.default == expected_field.default | ||||
|     assert result_field.description == expected_field.description | ||||
| 
 | ||||
| 
 | ||||
| def test_process_simple_number(): | ||||
|     schema = {"type": "number"} | ||||
|     expected_type = float | ||||
|     expected_field = Field(default=..., description="") | ||||
|     result_type, result_field = _process_schema_property(schema, "test", "prop", True) | ||||
|     assert result_type == expected_type | ||||
|     assert result_field.default == expected_field.default | ||||
| 
 | ||||
| 
 | ||||
| def test_process_unknown_type(): | ||||
|     schema = {"type": "unknown"} | ||||
|     expected_type = Any | ||||
|     expected_field = Field(default=..., description="") | ||||
|     result_type, result_field = _process_schema_property(schema, "test", "prop", True) | ||||
|     assert result_type == expected_type | ||||
|     assert result_field.default == expected_field.default | ||||
| 
 | ||||
| 
 | ||||
| def test_process_array_of_strings(): | ||||
|     schema = {"type": "array", "items": {"type": "string"}} | ||||
|     expected_type = List[str] | ||||
|     expected_field = Field(default=..., description="") | ||||
|     result_type, result_field = _process_schema_property(schema, "test", "prop", True) | ||||
|     assert result_type == expected_type | ||||
|     assert result_field.default == expected_field.default | ||||
| 
 | ||||
| 
 | ||||
| def test_process_array_of_any_missing_items(): | ||||
|     schema = {"type": "array"}  # Missing "items" | ||||
|     expected_type = List[Any] | ||||
|     expected_field = Field(default=None, description="") | ||||
|     result_type, result_field = _process_schema_property(schema, "test", "prop", False) | ||||
|     assert result_type == expected_type | ||||
|     assert result_field.default == expected_field.default | ||||
| 
 | ||||
| 
 | ||||
| def test_process_simple_object(): | ||||
|     schema = { | ||||
|         "type": "object", | ||||
|         "properties": {"name": {"type": "string"}, "age": {"type": "integer"}}, | ||||
|         "required": ["name"], | ||||
|     } | ||||
|     expected_field = Field(default=..., description="") | ||||
|     result_type, result_field = _process_schema_property(schema, "test", "prop", True) | ||||
| 
 | ||||
|     assert result_field.default == expected_field.default | ||||
|     assert result_field.description == expected_field.description | ||||
|     assert issubclass(result_type, BaseModel)  # Check if it's a Pydantic model | ||||
|     assert "test_prop_model" in _model_cache  # Check caching | ||||
| 
 | ||||
|     # Check fields of the generated model | ||||
|     model_fields = result_type.model_fields | ||||
|     assert "name" in model_fields | ||||
|     assert model_fields["name"].annotation is str | ||||
|     assert model_fields["name"].is_required() | ||||
| 
 | ||||
|     assert "age" in model_fields | ||||
|     assert model_fields["age"].annotation is int | ||||
|     assert not model_fields["age"].is_required() | ||||
|     assert model_fields["age"].default is None  # Optional without default | ||||
| 
 | ||||
| 
 | ||||
| def test_process_nested_object(): | ||||
|     schema = { | ||||
|         "type": "object", | ||||
|         "properties": { | ||||
|             "user": { | ||||
|                 "type": "object", | ||||
|                 "properties": {"id": {"type": "integer"}}, | ||||
|                 "required": ["id"], | ||||
|             } | ||||
|         }, | ||||
|         "required": ["user"], | ||||
|     } | ||||
|     expected_field = Field(default=..., description="") | ||||
|     result_type, result_field = _process_schema_property( | ||||
|         schema, "test", "outer_prop", True | ||||
|     ) | ||||
| 
 | ||||
|     assert result_field.default == expected_field.default | ||||
|     assert "test_outer_prop_model" in _model_cache | ||||
|     assert issubclass(result_type, BaseModel) | ||||
| 
 | ||||
|     outer_model_fields = result_type.model_fields | ||||
|     assert "user" in outer_model_fields | ||||
|     assert outer_model_fields["user"].is_required() | ||||
| 
 | ||||
|     nested_model_type = outer_model_fields["user"].annotation | ||||
|     assert issubclass(nested_model_type, BaseModel) | ||||
|     assert "test_outer_prop_model_user_model" in _model_cache | ||||
| 
 | ||||
|     nested_model_fields = nested_model_type.model_fields | ||||
|     assert "id" in nested_model_fields | ||||
|     assert nested_model_fields["id"].annotation is int | ||||
|     assert nested_model_fields["id"].is_required() | ||||
| 
 | ||||
| 
 | ||||
| def test_process_array_of_objects(): | ||||
|     schema = { | ||||
|         "type": "array", | ||||
|         "items": { | ||||
|             "type": "object", | ||||
|             "properties": {"item_id": {"type": "string"}}, | ||||
|             "required": ["item_id"], | ||||
|         }, | ||||
|     } | ||||
|     expected_field = Field(default=..., description="") | ||||
|     result_type, result_field = _process_schema_property(schema, "test", "prop", True) | ||||
| 
 | ||||
|     assert result_field.default == expected_field.default | ||||
|     assert str(result_type).startswith("typing.List[")  # Check it's a List | ||||
|     assert "test_prop_item_model" in _model_cache | ||||
| 
 | ||||
|     # Get the inner type from List[...] | ||||
|     item_model_type = result_type.__args__[0] | ||||
|     assert issubclass(item_model_type, BaseModel) | ||||
| 
 | ||||
|     item_model_fields = item_model_type.model_fields | ||||
|     assert "item_id" in item_model_fields | ||||
|     assert item_model_fields["item_id"].annotation is str | ||||
|     assert item_model_fields["item_id"].is_required() | ||||
| 
 | ||||
| 
 | ||||
| def test_process_empty_object(): | ||||
|     schema = {"type": "object", "properties": {}} | ||||
|     expected_type = Dict[str, Any]  # Should default to Dict[str, Any] if no properties | ||||
|     expected_field = Field(default=..., description="") | ||||
|     result_type, result_field = _process_schema_property(schema, "test", "prop", True) | ||||
|     assert result_type == expected_type | ||||
|     assert result_field.default == expected_field.default | ||||
| 
 | ||||
| 
 | ||||
| def test_model_caching(): | ||||
|     schema = { | ||||
|         "type": "object", | ||||
|         "properties": {"id": {"type": "integer"}}, | ||||
|         "required": ["id"], | ||||
|     } | ||||
|     # First call | ||||
|     result_type1, _ = _process_schema_property(schema, "cache_test", "obj1", True) | ||||
|     model_name = "cache_test_obj1_model" | ||||
|     assert model_name in _model_cache | ||||
|     assert _model_cache[model_name] == result_type1 | ||||
| 
 | ||||
|     # Second call with same structure but different prefix/prop name (should generate new) | ||||
|     result_type2, _ = _process_schema_property(schema, "cache_test", "obj2", True) | ||||
|     model_name2 = "cache_test_obj2_model" | ||||
|     assert model_name2 in _model_cache | ||||
|     assert _model_cache[model_name2] == result_type2 | ||||
|     assert result_type1 != result_type2  # Different models | ||||
| 
 | ||||
|     # Third call identical to the first (should return cached model) | ||||
|     result_type3, _ = _process_schema_property(schema, "cache_test", "obj1", True) | ||||
|     assert result_type3 == result_type1  # Should be the same cached object | ||||
|     assert len(_model_cache) == 2  # Only two unique models created | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user