open-webui/backend/open_webui/apps/webui/utils.py

124 lines
4.2 KiB
Python
Raw Normal View History

2024-06-11 06:40:27 +00:00
import os
2024-06-24 03:31:40 +00:00
import re
2024-08-08 07:46:14 +00:00
import subprocess
2024-08-27 22:10:27 +00:00
import sys
from importlib import util
2024-06-11 06:40:27 +00:00
from open_webui.apps.webui.models.functions import Functions
from open_webui.apps.webui.models.tools import Tools
from open_webui.config import FUNCTIONS_DIR, TOOLS_DIR
2024-06-11 06:40:27 +00:00
2024-06-24 03:31:40 +00:00
def extract_frontmatter(file_path):
"""
Extract frontmatter as a dictionary from the specified file path.
"""
frontmatter = {}
2024-06-24 03:37:41 +00:00
frontmatter_started = False
frontmatter_ended = False
frontmatter_pattern = re.compile(r"^\s*([a-z_]+):\s*(.*)\s*$", re.IGNORECASE)
try:
with open(file_path, "r", encoding="utf-8") as file:
2024-06-24 17:37:57 +00:00
first_line = file.readline()
if first_line.strip() != '"""':
# The file doesn't start with triple quotes
return {}
frontmatter_started = True
2024-06-24 03:37:41 +00:00
for line in file:
if '"""' in line:
2024-06-24 17:37:57 +00:00
if frontmatter_started:
2024-06-24 03:37:41 +00:00
frontmatter_ended = True
break
if frontmatter_started and not frontmatter_ended:
match = frontmatter_pattern.match(line)
if match:
key, value = match.groups()
frontmatter[key.strip()] = value.strip()
2024-06-24 17:37:57 +00:00
2024-06-24 03:37:41 +00:00
except FileNotFoundError:
print(f"Error: The file {file_path} does not exist.")
return {}
except Exception as e:
print(f"An error occurred: {e}")
return {}
2024-06-24 03:31:40 +00:00
return frontmatter
2024-06-11 06:40:27 +00:00
def load_toolkit_module_by_id(toolkit_id):
toolkit_path = os.path.join(TOOLS_DIR, f"{toolkit_id}.py")
if not os.path.exists(toolkit_path):
tool = Tools.get_tool_by_id(toolkit_id)
if tool:
with open(toolkit_path, "w") as file:
file.write(tool.content)
else:
raise Exception(f"Toolkit not found: {toolkit_id}")
2024-06-11 06:40:27 +00:00
spec = util.spec_from_file_location(toolkit_id, toolkit_path)
module = util.module_from_spec(spec)
2024-06-24 03:31:40 +00:00
frontmatter = extract_frontmatter(toolkit_path)
2024-06-11 06:40:27 +00:00
try:
2024-08-08 07:46:14 +00:00
install_frontmatter_requirements(frontmatter.get("requirements", ""))
2024-06-11 06:40:27 +00:00
spec.loader.exec_module(module)
print(f"Loaded module: {module.__name__}")
if hasattr(module, "Tools"):
2024-06-24 03:31:40 +00:00
return module.Tools(), frontmatter
2024-06-11 06:40:27 +00:00
else:
raise Exception("No Tools class found")
except Exception as e:
print(f"Error loading module: {toolkit_id}")
# Move the file to the error folder
os.rename(toolkit_path, f"{toolkit_path}.error")
raise e
2024-06-20 07:37:02 +00:00
def load_function_module_by_id(function_id):
function_path = os.path.join(FUNCTIONS_DIR, f"{function_id}.py")
if not os.path.exists(function_path):
function = Functions.get_function_by_id(function_id)
if function:
with open(function_path, "w") as file:
file.write(function.content)
else:
raise Exception(f"Function not found: {function_id}")
2024-06-20 07:37:02 +00:00
spec = util.spec_from_file_location(function_id, function_path)
module = util.module_from_spec(spec)
2024-06-24 03:31:40 +00:00
frontmatter = extract_frontmatter(function_path)
2024-06-20 07:37:02 +00:00
try:
2024-08-08 07:46:14 +00:00
install_frontmatter_requirements(frontmatter.get("requirements", ""))
2024-06-20 07:37:02 +00:00
spec.loader.exec_module(module)
print(f"Loaded module: {module.__name__}")
if hasattr(module, "Pipe"):
2024-06-24 03:31:40 +00:00
return module.Pipe(), "pipe", frontmatter
2024-06-20 07:37:02 +00:00
elif hasattr(module, "Filter"):
2024-06-24 03:31:40 +00:00
return module.Filter(), "filter", frontmatter
2024-07-12 01:41:00 +00:00
elif hasattr(module, "Action"):
return module.Action(), "action", frontmatter
2024-06-20 07:37:02 +00:00
else:
raise Exception("No Function class found")
except Exception as e:
print(f"Error loading module: {function_id}")
# Move the file to the error folder
os.rename(function_path, f"{function_path}.error")
raise e
2024-08-08 07:46:14 +00:00
2024-08-14 12:49:18 +00:00
2024-08-08 07:46:14 +00:00
def install_frontmatter_requirements(requirements):
if requirements:
2024-08-14 12:49:18 +00:00
req_list = [req.strip() for req in requirements.split(",")]
2024-08-08 07:46:14 +00:00
for req in req_list:
print(f"Installing requirement: {req}")
subprocess.check_call([sys.executable, "-m", "pip", "install", req])
else:
2024-08-14 12:49:18 +00:00
print("No requirements found in frontmatter.")