Files
APAW/scripts/e2e-mcp-stdio-test-v2.py

95 lines
3.7 KiB
Python

#!/usr/bin/env python3
"""
e2e-mcp-stdio-test-v2.py
Minimal E2E test for MCP stdio transport via @ric_/forgejo-mcp.
Uses subprocess.communicate() to avoid pipe deadlock.
"""
import subprocess
import json
import sys
import base64
STDIO_CMD = ["bunx", "@ric_/forgejo-mcp"]
GITEA_API = "https://git.softuniq.eu/api/v1/repos/UniqueSoft/APAW"
USER, PASS = "NW", "eshkink0t"
def test_stdio():
print("="*60)
print("E2E MCP Stdio Test v2")
print("="*60)
env = {
**subprocess.os.environ,
"FORGEJO_URL": "https://git.softuniq.eu",
"FORGEJO_TOKEN": PASS,
"LOG_LEVEL": "warn",
}
# 1. Initialize
print("\n[1] Initialize...")
proc = subprocess.Popen(
STDIO_CMD,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
env=env,
)
req = json.dumps({"jsonrpc": "2.0", "method": "initialize", "params": {"protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": {"name": "e2e-test", "version": "1.0"}}, "id": 1})
out, err = proc.communicate(input=req + "\n")
print("stderr:", err.strip()[:200])
resp = json.loads(out.strip().splitlines()[-1])
assert resp["result"]["serverInfo"]["name"] == "forgejo-mcp", f"Unexpected: {resp}"
print("✅ Initialize OK")
# 2. tools/list
print("\n[2] List tools...")
proc2 = subprocess.Popen(STDIO_CMD, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env)
out2, err2 = proc2.communicate(
input=json.dumps({"jsonrpc":"2.0","method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"t","version":"1"}},"id":1}) + "\n" +
json.dumps({"jsonrpc":"2.0","method":"tools/list","params":{},"id":2}) + "\n"
)
lines = [l for l in out2.strip().splitlines() if l.strip()]
resp2 = json.loads(lines[-1])
tools = resp2.get("result", {}).get("tools", [])
assert len(tools) > 50, f"Expected >50 tools, got {len(tools)}"
print(f"✅ Tools: {len(tools)}")
# 3. get_issue
print("\n[3] gitea_get_issue #110...")
proc3 = subprocess.Popen(STDIO_CMD, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env)
out3, err3 = proc3.communicate(
input=json.dumps({"jsonrpc":"2.0","method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"t","version":"1"}},"id":1}) + "\n" +
json.dumps({"jsonrpc":"2.0","method":"tools/call","params":{"name":"get_issue","arguments":{"owner":"UniqueSoft","repo":"APAW","issue_number":110}},"id":3}) + "\n"
)
lines3 = [l for l in out3.strip().splitlines() if l.strip()]
resp3 = json.loads(lines3[-1])
content = json.loads(resp3["result"]["content"][0]["text"])
assert content.get("number") == 110, f"Unexpected issue: {content}"
print(f"✅ Issue #{content['number']} - {content.get('title','N/A')}")
# 4. REST consistency
print("\n[4] REST consistency...")
import urllib.request
creds = base64.b64encode(f"{USER}:{PASS}".encode()).decode()
req4 = urllib.request.Request(f"{GITEA_API}/issues/110", headers={"Accept": "application/json", "Authorization": f"Basic {creds}"})
with urllib.request.urlopen(req4) as r:
rest = json.loads(r.read())
assert rest["title"] == content["title"], "Title mismatch"
print("✅ REST consistent")
print("\n" + "="*60)
print("✅ ALL E2E MCP STDIO TESTS PASSED")
print("="*60)
return 0
if __name__ == "__main__":
try:
sys.exit(test_stdio())
except Exception as e:
print(f"\n❌ FAILED: {e}")
import traceback
traceback.print_exc()
sys.exit(1)