mirror of
https://github.com/open-webui/open-webui
synced 2025-01-01 08:42:14 +00:00
247 lines
8.0 KiB
Python
247 lines
8.0 KiB
Python
import time
|
|
import logging
|
|
import sys
|
|
|
|
from aiocache import cached
|
|
from fastapi import Request
|
|
|
|
from open_webui.routers import openai, ollama
|
|
from open_webui.functions import get_function_models
|
|
|
|
|
|
from open_webui.models.functions import Functions
|
|
from open_webui.models.models import Models
|
|
|
|
|
|
from open_webui.utils.plugin import load_function_module_by_id
|
|
from open_webui.utils.access_control import has_access
|
|
|
|
|
|
from open_webui.config import (
|
|
DEFAULT_ARENA_MODEL,
|
|
)
|
|
|
|
from open_webui.env import SRC_LOG_LEVELS, GLOBAL_LOG_LEVEL
|
|
|
|
|
|
logging.basicConfig(stream=sys.stdout, level=GLOBAL_LOG_LEVEL)
|
|
log = logging.getLogger(__name__)
|
|
log.setLevel(SRC_LOG_LEVELS["MAIN"])
|
|
|
|
|
|
async def get_all_base_models(request: Request):
|
|
function_models = []
|
|
openai_models = []
|
|
ollama_models = []
|
|
|
|
if request.app.state.config.ENABLE_OPENAI_API:
|
|
openai_models = await openai.get_all_models(request)
|
|
openai_models = openai_models["data"]
|
|
|
|
if request.app.state.config.ENABLE_OLLAMA_API:
|
|
ollama_models = await ollama.get_all_models(request)
|
|
ollama_models = [
|
|
{
|
|
"id": model["model"],
|
|
"name": model["name"],
|
|
"object": "model",
|
|
"created": int(time.time()),
|
|
"owned_by": "ollama",
|
|
"ollama": model,
|
|
}
|
|
for model in ollama_models["models"]
|
|
]
|
|
|
|
function_models = await get_function_models(request)
|
|
models = function_models + openai_models + ollama_models
|
|
|
|
return models
|
|
|
|
|
|
@cached(ttl=3)
|
|
async def get_all_models(request):
|
|
models = await get_all_base_models(request)
|
|
|
|
# If there are no models, return an empty list
|
|
if len(models) == 0:
|
|
return []
|
|
|
|
# Add arena models
|
|
if request.app.state.config.ENABLE_EVALUATION_ARENA_MODELS:
|
|
arena_models = []
|
|
if len(request.app.state.config.EVALUATION_ARENA_MODELS) > 0:
|
|
arena_models = [
|
|
{
|
|
"id": model["id"],
|
|
"name": model["name"],
|
|
"info": {
|
|
"meta": model["meta"],
|
|
},
|
|
"object": "model",
|
|
"created": int(time.time()),
|
|
"owned_by": "arena",
|
|
"arena": True,
|
|
}
|
|
for model in request.app.state.config.EVALUATION_ARENA_MODELS
|
|
]
|
|
else:
|
|
# Add default arena model
|
|
arena_models = [
|
|
{
|
|
"id": DEFAULT_ARENA_MODEL["id"],
|
|
"name": DEFAULT_ARENA_MODEL["name"],
|
|
"info": {
|
|
"meta": DEFAULT_ARENA_MODEL["meta"],
|
|
},
|
|
"object": "model",
|
|
"created": int(time.time()),
|
|
"owned_by": "arena",
|
|
"arena": True,
|
|
}
|
|
]
|
|
models = models + arena_models
|
|
|
|
global_action_ids = [
|
|
function.id for function in Functions.get_global_action_functions()
|
|
]
|
|
enabled_action_ids = [
|
|
function.id
|
|
for function in Functions.get_functions_by_type("action", active_only=True)
|
|
]
|
|
|
|
custom_models = Models.get_all_models()
|
|
for custom_model in custom_models:
|
|
if custom_model.base_model_id is None:
|
|
for model in models:
|
|
if (
|
|
custom_model.id == model["id"]
|
|
or custom_model.id == model["id"].split(":")[0]
|
|
):
|
|
if custom_model.is_active:
|
|
model["name"] = custom_model.name
|
|
model["info"] = custom_model.model_dump()
|
|
|
|
action_ids = []
|
|
if "info" in model and "meta" in model["info"]:
|
|
action_ids.extend(
|
|
model["info"]["meta"].get("actionIds", [])
|
|
)
|
|
|
|
model["action_ids"] = action_ids
|
|
else:
|
|
models.remove(model)
|
|
|
|
elif custom_model.is_active and (
|
|
custom_model.id not in [model["id"] for model in models]
|
|
):
|
|
owned_by = "openai"
|
|
pipe = None
|
|
action_ids = []
|
|
|
|
for model in models:
|
|
if (
|
|
custom_model.base_model_id == model["id"]
|
|
or custom_model.base_model_id == model["id"].split(":")[0]
|
|
):
|
|
owned_by = model["owned_by"]
|
|
if "pipe" in model:
|
|
pipe = model["pipe"]
|
|
break
|
|
|
|
if custom_model.meta:
|
|
meta = custom_model.meta.model_dump()
|
|
if "actionIds" in meta:
|
|
action_ids.extend(meta["actionIds"])
|
|
|
|
models.append(
|
|
{
|
|
"id": f"{custom_model.id}",
|
|
"name": custom_model.name,
|
|
"object": "model",
|
|
"created": custom_model.created_at,
|
|
"owned_by": owned_by,
|
|
"info": custom_model.model_dump(),
|
|
"preset": True,
|
|
**({"pipe": pipe} if pipe is not None else {}),
|
|
"action_ids": action_ids,
|
|
}
|
|
)
|
|
|
|
# Process action_ids to get the actions
|
|
def get_action_items_from_module(function, module):
|
|
actions = []
|
|
if hasattr(module, "actions"):
|
|
actions = module.actions
|
|
return [
|
|
{
|
|
"id": f"{function.id}.{action['id']}",
|
|
"name": action.get("name", f"{function.name} ({action['id']})"),
|
|
"description": function.meta.description,
|
|
"icon_url": action.get(
|
|
"icon_url", function.meta.manifest.get("icon_url", None)
|
|
),
|
|
}
|
|
for action in actions
|
|
]
|
|
else:
|
|
return [
|
|
{
|
|
"id": function.id,
|
|
"name": function.name,
|
|
"description": function.meta.description,
|
|
"icon_url": function.meta.manifest.get("icon_url", None),
|
|
}
|
|
]
|
|
|
|
def get_function_module_by_id(function_id):
|
|
if function_id in request.app.state.FUNCTIONS:
|
|
function_module = request.app.state.FUNCTIONS[function_id]
|
|
else:
|
|
function_module, _, _ = load_function_module_by_id(function_id)
|
|
request.app.state.FUNCTIONS[function_id] = function_module
|
|
|
|
for model in models:
|
|
action_ids = [
|
|
action_id
|
|
for action_id in list(set(model.pop("action_ids", []) + global_action_ids))
|
|
if action_id in enabled_action_ids
|
|
]
|
|
|
|
model["actions"] = []
|
|
for action_id in action_ids:
|
|
action_function = Functions.get_function_by_id(action_id)
|
|
if action_function is None:
|
|
raise Exception(f"Action not found: {action_id}")
|
|
|
|
function_module = get_function_module_by_id(action_id)
|
|
model["actions"].extend(
|
|
get_action_items_from_module(action_function, function_module)
|
|
)
|
|
log.debug(f"get_all_models() returned {len(models)} models")
|
|
|
|
request.app.state.MODELS = {model["id"]: model for model in models}
|
|
return models
|
|
|
|
|
|
def check_model_access(user, model):
|
|
if model.get("arena"):
|
|
if not has_access(
|
|
user.id,
|
|
type="read",
|
|
access_control=model.get("info", {})
|
|
.get("meta", {})
|
|
.get("access_control", {}),
|
|
):
|
|
raise Exception("Model not found")
|
|
else:
|
|
model_info = Models.get_model_by_id(model.get("id"))
|
|
if not model_info:
|
|
raise Exception("Model not found")
|
|
elif not (
|
|
user.id == model_info.user_id
|
|
or has_access(
|
|
user.id, type="read", access_control=model_info.access_control
|
|
)
|
|
):
|
|
raise Exception("Model not found")
|