feat: --config support

This commit is contained in:
Timothy Jaeryang Baek 2025-03-30 20:30:23 -07:00
parent ea8d7e76af
commit 82af7f7f36
4 changed files with 181 additions and 96 deletions

1
.gitignore vendored
View File

@ -8,3 +8,4 @@ wheels/
# Virtual environments
.venv
config.json

View File

@ -1,6 +1,6 @@
[project]
name = "mcpo"
version = "0.0.4"
version = "0.0.5"
description = "A simple, secure MCP-to-OpenAPI proxy server"
authors = [
{ name = "Timothy Jaeryang Baek", email = "tim@openwebui.com" }

View File

@ -1,6 +1,9 @@
import sys
import asyncio
import typer
import os
from typing_extensions import Annotated
from typing import Optional, List
@ -15,28 +18,67 @@ def main(
port: Annotated[
Optional[int], typer.Option("--port", "-p", help="Port number")
] = 8000,
env: Annotated[
Optional[List[str]], typer.Option("--env", "-e", help="Environment variables")
] = None,
config: Annotated[
Optional[str], typer.Option("--config", "-c", help="Config file path")
] = None,
name: Annotated[
Optional[str], typer.Option("--name", "-n", help="Server name")
] = None,
description: Annotated[
Optional[str], typer.Option("--description", "-d", help="Server description")
] = None,
version: Annotated[
Optional[str], typer.Option("--version", "-v", help="Server version")
] = None,
):
# Find the position of "--"
if "--" not in sys.argv:
typer.echo("Usage: mcpo --host 0.0.0.0 --port 8000 -- your_mcp_command")
raise typer.Exit(1)
server_command = None
if not config:
# Find the position of "--"
if "--" not in sys.argv:
typer.echo("Usage: mcpo --host 0.0.0.0 --port 8000 -- your_mcp_command")
raise typer.Exit(1)
idx = sys.argv.index("--")
server_command: List[str] = sys.argv[idx + 1 :]
idx = sys.argv.index("--")
server_command: List[str] = sys.argv[idx + 1 :]
if not server_command:
typer.echo("Error: You must specify the MCP server command after '--'")
return
if not server_command:
typer.echo("Error: You must specify the MCP server command after '--'")
return
from mcpo.main import run
print(
f"Starting MCP OpenAPI Proxy on {host}:{port} with command: {' '.join(server_command)}"
)
if config:
print("Starting MCP OpenAPI Proxy with config file:", config)
else:
print(
f"Starting MCP OpenAPI Proxy on {host}:{port} with command: {' '.join(server_command)}"
)
env_dict = {}
if env:
for var in env:
key, value = env.split("=", 1)
env_dict[key] = value
# Set environment variables
for key, value in env_dict.items():
os.environ[key] = value
# Run your async run function from mcpo.main
asyncio.run(run(host, port, server_command))
asyncio.run(
run(
host,
port,
config=config,
name=name,
description=description,
version=version,
server_command=server_command,
)
)
if __name__ == "__main__":

View File

@ -1,54 +1,65 @@
from fastapi import FastAPI, Body
from fastapi.middleware.cors import CORSMiddleware
from starlette.routing import Mount
from pydantic import create_model
from contextlib import AsyncExitStack, asynccontextmanager
from mcp import ClientSession, StdioServerParameters, types
from mcp.client.stdio import stdio_client
import argparse
import sys
from typing import Dict, Any
import asyncio
from typing import Dict, Any, Callable
import uvicorn
import json
import os
async def create_dynamic_endpoints(app: FastAPI, session: ClientSession):
def get_python_type(param_type: str):
if param_type == "string":
return str
elif param_type == "integer":
return int
elif param_type == "boolean":
return bool
elif param_type == "number":
return float
elif param_type == "object":
return Dict[str, Any]
elif param_type == "array":
return list
else:
return str # Fallback
# Expand as needed. PRs welcome!
async def create_dynamic_endpoints(app: FastAPI):
session = app.state.session
if not session:
raise ValueError("Session is not initialized in the app state.")
result = await session.initialize()
server_info = getattr(result, "serverInfo", None)
if server_info:
app.title = server_info.name or app.title
app.description = (
f"{server_info.name} MCP Server" if server_info.name else app.description
)
app.version = server_info.version or app.version
tools_result = await session.list_tools()
tools = tools_result.tools
for tool in tools:
print(tool)
endpoint_name = tool.name
endpoint_description = tool.description
schema = tool.inputSchema
# Dynamically creating a Pydantic model for validation and openAPI coverage
# Build Pydantic model
model_fields = {}
required_fields = schema.get("required", [])
for param_name, param_schema in schema["properties"].items():
param_type = param_schema["type"]
param_type = param_schema.get("type", "string")
param_desc = param_schema.get("description", "")
python_type = str # default
if param_type == "string":
python_type = str
elif param_type == "integer":
python_type = int
elif param_type == "boolean":
python_type = bool
elif param_type == "number":
python_type = float
elif param_type == "object":
python_type = Dict[str, Any]
elif param_type == "array":
python_type = list
# Expand as needed. PRs welcome!
python_type = get_python_type(param_type)
default_value = ... if param_name in required_fields else None
model_fields[param_name] = (
python_type,
@ -57,18 +68,13 @@ async def create_dynamic_endpoints(app: FastAPI, session: ClientSession):
FormModel = create_model(f"{endpoint_name}_form_model", **model_fields)
def make_endpoint_func(endpoint_name: str, FormModel):
async def tool(form_data: FormModel):
def make_endpoint_func(endpoint_name: str, FormModel, session: ClientSession):
async def tool_endpoint(form_data: FormModel):
args = form_data.model_dump()
print(f"Calling {endpoint_name} with arguments:", args)
tool_call_result = await session.call_tool(
endpoint_name, arguments=args
)
result = await session.call_tool(endpoint_name, arguments=args)
response = []
for content in tool_call_result.content:
for content in result.content:
text = content.text
if isinstance(text, str):
try:
@ -76,14 +82,12 @@ async def create_dynamic_endpoints(app: FastAPI, session: ClientSession):
except json.JSONDecodeError:
pass
response.append(text)
return response
return tool
return tool_endpoint
tool = make_endpoint_func(endpoint_name, FormModel)
tool = make_endpoint_func(endpoint_name, FormModel, session)
# Add endpoint to FastAPI with tool descriptions
app.post(
f"/{endpoint_name}",
summary=endpoint_name.replace("_", " ").title(),
@ -91,56 +95,94 @@ async def create_dynamic_endpoints(app: FastAPI, session: ClientSession):
)(tool)
async def run(host: str, port: int, server_command: list[str]):
server_params = StdioServerParameters(
command=server_command[0],
args=server_command[1:],
env={**os.environ},
@asynccontextmanager
async def lifespan(app: FastAPI):
command = getattr(app.state, "command", None)
args = getattr(app.state, "args", [])
env = getattr(app.state, "env", {})
if not command:
async with AsyncExitStack() as stack:
for route in app.routes:
if isinstance(route, Mount) and isinstance(route.app, FastAPI):
await stack.enter_async_context(
route.app.router.lifespan_context(route.app), # noqa
)
yield
else:
server_params = StdioServerParameters(
command=command,
args=args,
env={**env},
)
async with stdio_client(server_params) as (reader, writer):
async with ClientSession(reader, writer) as session:
app.state.session = session
await create_dynamic_endpoints(app)
yield
async def run(host: str = "127.0.0.1", port: int = 8000, **kwargs):
config_path = kwargs.get("config")
server_command = kwargs.get("server_command")
name = kwargs.get("name") or "MCP OpenAPI Proxy"
description = (
kwargs.get("description") or "Automatically generated API from MCP Tool Schemas"
)
version = kwargs.get("version") or "1.0"
main_app = FastAPI(
title=name, description=description, version=version, lifespan=lifespan
)
# Open connection to MCP first:
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
result = await session.initialize()
main_app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
server_name = (
result.serverInfo.name
if hasattr(result, "serverInfo") and hasattr(result.serverInfo, "name")
else None
if server_command:
main_app.state.command = server_command[0]
main_app.state.args = server_command[1:]
main_app.state.env = os.environ.copy()
elif config_path:
with open(config_path, "r") as f:
config_data = json.load(f)
mcp_servers = config_data.get("mcpServers", {})
if not mcp_servers:
raise ValueError("No 'mcpServers' found in config file.")
for server_name, server_cfg in mcp_servers.items():
sub_app = FastAPI(
title=f"{server_name}",
description=f"{server_name} MCP Server",
version="1.0",
lifespan=lifespan,
)
server_description = (
f"{server_name.capitalize()} MCP OpenAPI Proxy"
if server_name
else "Automatically generated API endpoints based on MCP tool schemas."
)
server_version = (
result.serverInfo.version
if hasattr(result, "serverInfo")
and hasattr(result.serverInfo, "version")
else "1.0"
)
app = FastAPI(
title=server_name if server_name else "MCP OpenAPI Proxy",
description=server_description,
version=server_version,
)
origins = ["*"]
app.add_middleware(
sub_app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Dynamic endpoint creation
await create_dynamic_endpoints(app, session)
sub_app.state.command = server_cfg["command"]
sub_app.state.args = server_cfg.get("args", [])
sub_app.state.env = {**os.environ, **server_cfg.get("env", {})}
config = uvicorn.Config(app=app, host=host, port=port, log_level="info")
server = uvicorn.Server(config)
await server.serve()
main_app.mount(f"/{server_name}", sub_app)
else:
raise ValueError("You must provide either server_command or config.")
config = uvicorn.Config(app=main_app, host=host, port=port, log_level="info")
server = uvicorn.Server(config)
await server.serve()