Files
APAW/scripts/e2e-mcp-stdio-test-v3.py

106 lines
4.0 KiB
Python

#!/usr/bin/env python3
"""
e2e-mcp-stdio-test-v3.py
E2E test with correct tool names from forgejo-mcp.
"""
import subprocess
import json
import sys
import base64
STDIO_CMD = ["bunx", "@ric_/forgejo-mcp"]
GITEA_API = "https://git.softuniq.eu/api/v1/repos/UniqueSoft/APAW"
USER, PASS = "NW", "eshkink0t"
def call_stdio(method, params=None, call_id=1):
env = {
**subprocess.os.environ,
"FORGEJO_URL": "https://git.softuniq.eu",
"FORGEJO_TOKEN": "ad1176845d1170f840193a700eb5319998c52601", # Personal access token instead of password
"LOG_LEVEL": "warn",
}
msgs = [
json.dumps({"jsonrpc":"2.0","method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"t","version":"1"}},"id":1}),
]
if method == "tools/list":
msgs.append(json.dumps({"jsonrpc":"2.0","method":"tools/list","params":{},"id":call_id}))
elif method == "tools/call":
msgs.append(json.dumps({"jsonrpc":"2.0","method":"tools/call","params":params,"id":call_id}))
proc = subprocess.Popen(STDIO_CMD, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env)
out, err = proc.communicate(input="\n".join(msgs) + "\n")
lines = [l for l in out.strip().splitlines() if l.strip()]
return json.loads(lines[-1]) if lines else None, err
def test_stdio():
print("="*60)
print("E2E MCP Stdio Test v3")
print("="*60)
# 1. Initialize
print("\n[1] Initialize...")
resp, err = call_stdio("initialize")
assert resp["result"]["serverInfo"]["name"] == "forgejo-mcp"
print("✅ Initialize OK")
# 2. tools/list
print("\n[2] List tools...")
resp2, err2 = call_stdio("tools/list", call_id=2)
tools = resp2.get("result", {}).get("tools", [])
assert len(tools) > 50, f"Got {len(tools)}"
tool_names = [t["name"] for t in tools]
print(f"✅ Tools: {len(tools)}")
issue_tool = None
for t in tool_names:
if "issue" in t and "list" not in t and "comment" not in t and "label" not in t:
issue_tool = t
break
print(f" Issue tool candidate: {issue_tool}")
# 3. get_issue
print("\n[3] Fetch issue #110...")
for tool_name in ["get_issue", "gitea_get_issue"]:
resp3, err3 = call_stdio("tools/call", params={"name": tool_name, "arguments": {"owner": "UniqueSoft", "repo": "APAW", "index": 110}}, call_id=3)
content_text = resp3.get("result", {}).get("content", [{}])[0].get("text", "")
if content_text and content_text.strip():
print(f" Tool '{tool_name}' returned data")
print(f" Content text length: {len(content_text)}")
print(f" First 500 chars of content: {repr(content_text[:500])}")
break
else:
print(f" Tool responses: {resp3}")
raise Exception("No tool returned data")
issue_data = json.loads(content_text)
assert issue_data.get("number") == 110, f"Unexpected: {issue_data}"
print(f"✅ Issue #{issue_data['number']} - {issue_data.get('title','N/A')}")
# 4. Verify checkpoint
print("\n[4] Verify checkpoint...")
assert "## GNS Checkpoint" in (issue_data.get("body") or ""), "No checkpoint"
print("✅ Checkpoint present")
# 5. REST consistency
print("\n[5] REST consistency...")
import urllib.request
creds = base64.b64encode(f"{USER}:{PASS}".encode()).decode()
req = urllib.request.Request(f"{GITEA_API}/issues/110", headers={"Accept": "application/json", "Authorization": f"Basic {creds}"})
with urllib.request.urlopen(req) as r:
rest = json.loads(r.read())
assert rest["title"] == issue_data["title"], "Mismatch"
print("✅ REST consistent")
print("\n" + "="*60)
print("✅ ALL E2E MCP STDIO TESTS PASSED")
print("="*60)
return 0
if __name__ == "__main__":
try:
sys.exit(test_stdio())
except Exception as e:
print(f"\n❌ FAILED: {e}")
import traceback
traceback.print_exc()
sys.exit(1)