enh: code interpreter jupyter support

This commit is contained in:
Timothy Jaeryang Baek
2025-02-10 02:25:02 -08:00
parent 29c781875d
commit f9c5819314
8 changed files with 552 additions and 9 deletions

View File

@@ -0,0 +1,153 @@
import asyncio
import json
import uuid
import websockets
import requests
from urllib.parse import urljoin
async def execute_code_jupyter(
jupyter_url, code, token=None, password=None, timeout=10
):
"""
Executes Python code in a Jupyter kernel.
Supports authentication with a token or password.
:param jupyter_url: Jupyter server URL (e.g., "http://localhost:8888")
:param code: Code to execute
:param token: Jupyter authentication token (optional)
:param password: Jupyter password (optional)
:param timeout: WebSocket timeout in seconds (default: 10s)
:return: Dictionary with stdout, stderr, and result
"""
session = requests.Session() # Maintain cookies
headers = {} # Headers for requests
# Authenticate using password
if password and not token:
try:
login_url = urljoin(jupyter_url, "/login")
response = session.get(login_url)
response.raise_for_status()
# Retrieve `_xsrf` token
xsrf_token = session.cookies.get("_xsrf")
if not xsrf_token:
raise ValueError("Failed to fetch _xsrf token")
# Send login request
login_data = {"_xsrf": xsrf_token, "password": password}
login_response = session.post(
login_url, data=login_data, cookies=session.cookies
)
login_response.raise_for_status()
# Update headers with `_xsrf`
headers["X-XSRFToken"] = xsrf_token
except Exception as e:
return {
"stdout": "",
"stderr": f"Authentication Error: {str(e)}",
"result": "",
}
# Construct API URLs with authentication token if provided
params = f"?token={token}" if token else ""
kernel_url = urljoin(jupyter_url, f"/api/kernels{params}")
try:
# Include cookies if authenticating with password
response = session.post(kernel_url, headers=headers, cookies=session.cookies)
response.raise_for_status()
kernel_id = response.json()["id"]
# Construct WebSocket URL
websocket_url = urljoin(
jupyter_url.replace("http", "ws"),
f"/api/kernels/{kernel_id}/channels{params}",
)
# **IMPORTANT:** Include authentication cookies for WebSockets
ws_headers = {}
if password and not token:
ws_headers["X-XSRFToken"] = session.cookies.get("_xsrf")
cookies = {name: value for name, value in session.cookies.items()}
ws_headers["Cookie"] = "; ".join(
[f"{name}={value}" for name, value in cookies.items()]
)
# Connect to the WebSocket
async with websockets.connect(
websocket_url, additional_headers=ws_headers
) as ws:
msg_id = str(uuid.uuid4())
# Send execution request
execute_request = {
"header": {
"msg_id": msg_id,
"msg_type": "execute_request",
"username": "user",
"session": str(uuid.uuid4()),
"date": "",
"version": "5.3",
},
"parent_header": {},
"metadata": {},
"content": {
"code": code,
"silent": False,
"store_history": True,
"user_expressions": {},
"allow_stdin": False,
"stop_on_error": True,
},
"channel": "shell",
}
await ws.send(json.dumps(execute_request))
# Collect execution results
stdout, stderr, result = "", "", None
while True:
try:
message = await asyncio.wait_for(ws.recv(), timeout)
message_data = json.loads(message)
if message_data.get("parent_header", {}).get("msg_id") == msg_id:
msg_type = message_data.get("msg_type")
if msg_type == "stream":
if message_data["content"]["name"] == "stdout":
stdout += message_data["content"]["text"]
elif message_data["content"]["name"] == "stderr":
stderr += message_data["content"]["text"]
elif msg_type in ("execute_result", "display_data"):
result = message_data["content"]["data"].get(
"text/plain", ""
)
elif msg_type == "error":
stderr += "\n".join(message_data["content"]["traceback"])
elif (
msg_type == "status"
and message_data["content"]["execution_state"] == "idle"
):
break
except asyncio.TimeoutError:
stderr += "\nExecution timed out."
break
except Exception as e:
return {"stdout": "", "stderr": f"Error: {str(e)}", "result": ""}
finally:
# Shutdown the kernel
if kernel_id:
requests.delete(
f"{kernel_url}/{kernel_id}", headers=headers, cookies=session.cookies
)
return {
"stdout": stdout.strip(),
"stderr": stderr.strip(),
"result": result.strip() if result else "",
}
# Example Usage
# asyncio.run(execute_code_jupyter("http://localhost:8888", "print('Hello, world!')", token="your-token"))
# asyncio.run(execute_code_jupyter("http://localhost:8888", "print('Hello, world!')", password="your-password"))

View File

@@ -72,7 +72,7 @@ from open_webui.utils.filter import (
get_sorted_filter_ids,
process_filter_functions,
)
from open_webui.utils.code_interpreter import execute_code_jupyter
from open_webui.tasks import create_task
@@ -1651,15 +1651,45 @@ async def process_chat_response(
output = ""
try:
if content_blocks[-1]["attributes"].get("type") == "code":
output = await event_caller(
{
"type": "execute:python",
"data": {
"id": str(uuid4()),
"code": content_blocks[-1]["content"],
},
code = content_blocks[-1]["content"]
if (
request.app.state.config.CODE_INTERPRETER_ENGINE
== "pyodide"
):
output = await event_caller(
{
"type": "execute:python",
"data": {
"id": str(uuid4()),
"code": code,
},
}
)
elif (
request.app.state.config.CODE_INTERPRETER_ENGINE
== "jupyter"
):
output = await execute_code_jupyter(
request.app.state.config.CODE_INTERPRETER_JUPYTER_URL,
code,
(
request.app.state.config.CODE_INTERPRETER_JUPYTER_AUTH_TOKEN
if request.app.state.config.CODE_INTERPRETER_JUPYTER_AUTH
== "token"
else None
),
(
request.app.state.config.CODE_INTERPRETER_JUPYTER_AUTH_PASSWORD
if request.app.state.config.CODE_INTERPRETER_JUPYTER_AUTH
== "password"
else None
),
)
else:
output = {
"stdout": "Code interpreter engine not configured."
}
)
if isinstance(output, dict):
stdout = output.get("stdout", "")