mirror of
https://github.com/open-webui/open-webui
synced 2025-01-07 19:45:27 +00:00
275 lines
7.4 KiB
Python
275 lines
7.4 KiB
Python
import logging
|
|
import time
|
|
from typing import Optional
|
|
|
|
from open_webui.apps.webui.internal.db import Base, JSONField, get_db
|
|
from open_webui.env import SRC_LOG_LEVELS
|
|
|
|
from open_webui.apps.webui.models.users import Users, UserResponse
|
|
|
|
|
|
from pydantic import BaseModel, ConfigDict
|
|
|
|
from sqlalchemy import or_, and_, func
|
|
from sqlalchemy.dialects import postgresql, sqlite
|
|
from sqlalchemy import BigInteger, Column, Text, JSON, Boolean
|
|
|
|
|
|
from open_webui.utils.access_control import has_access
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
log.setLevel(SRC_LOG_LEVELS["MODELS"])
|
|
|
|
|
|
####################
|
|
# Models DB Schema
|
|
####################
|
|
|
|
|
|
# ModelParams is a model for the data stored in the params field of the Model table
|
|
class ModelParams(BaseModel):
|
|
model_config = ConfigDict(extra="allow")
|
|
pass
|
|
|
|
|
|
# ModelMeta is a model for the data stored in the meta field of the Model table
|
|
class ModelMeta(BaseModel):
|
|
profile_image_url: Optional[str] = "/static/favicon.png"
|
|
|
|
description: Optional[str] = None
|
|
"""
|
|
User-facing description of the model.
|
|
"""
|
|
|
|
capabilities: Optional[dict] = None
|
|
|
|
model_config = ConfigDict(extra="allow")
|
|
|
|
pass
|
|
|
|
|
|
class Model(Base):
|
|
__tablename__ = "model"
|
|
|
|
id = Column(Text, primary_key=True)
|
|
"""
|
|
The model's id as used in the API. If set to an existing model, it will override the model.
|
|
"""
|
|
user_id = Column(Text)
|
|
|
|
base_model_id = Column(Text, nullable=True)
|
|
"""
|
|
An optional pointer to the actual model that should be used when proxying requests.
|
|
"""
|
|
|
|
name = Column(Text)
|
|
"""
|
|
The human-readable display name of the model.
|
|
"""
|
|
|
|
params = Column(JSONField)
|
|
"""
|
|
Holds a JSON encoded blob of parameters, see `ModelParams`.
|
|
"""
|
|
|
|
meta = Column(JSONField)
|
|
"""
|
|
Holds a JSON encoded blob of metadata, see `ModelMeta`.
|
|
"""
|
|
|
|
access_control = Column(JSON, nullable=True) # Controls data access levels.
|
|
# Defines access control rules for this entry.
|
|
# - `None`: Public access, available to all users with the "user" role.
|
|
# - `{}`: Private access, restricted exclusively to the owner.
|
|
# - Custom permissions: Specific access control for reading and writing;
|
|
# Can specify group or user-level restrictions:
|
|
# {
|
|
# "read": {
|
|
# "group_ids": ["group_id1", "group_id2"],
|
|
# "user_ids": ["user_id1", "user_id2"]
|
|
# },
|
|
# "write": {
|
|
# "group_ids": ["group_id1", "group_id2"],
|
|
# "user_ids": ["user_id1", "user_id2"]
|
|
# }
|
|
# }
|
|
|
|
is_active = Column(Boolean, default=True)
|
|
|
|
updated_at = Column(BigInteger)
|
|
created_at = Column(BigInteger)
|
|
|
|
|
|
class ModelModel(BaseModel):
|
|
id: str
|
|
user_id: str
|
|
base_model_id: Optional[str] = None
|
|
|
|
name: str
|
|
params: ModelParams
|
|
meta: ModelMeta
|
|
|
|
access_control: Optional[dict] = None
|
|
|
|
is_active: bool
|
|
updated_at: int # timestamp in epoch
|
|
created_at: int # timestamp in epoch
|
|
|
|
model_config = ConfigDict(from_attributes=True)
|
|
|
|
|
|
####################
|
|
# Forms
|
|
####################
|
|
|
|
|
|
class ModelUserResponse(ModelModel):
|
|
user: Optional[UserResponse] = None
|
|
|
|
|
|
class ModelResponse(ModelModel):
|
|
pass
|
|
|
|
|
|
class ModelForm(BaseModel):
|
|
id: str
|
|
base_model_id: Optional[str] = None
|
|
name: str
|
|
meta: ModelMeta
|
|
params: ModelParams
|
|
access_control: Optional[dict] = None
|
|
is_active: bool = True
|
|
|
|
|
|
class ModelsTable:
|
|
def insert_new_model(
|
|
self, form_data: ModelForm, user_id: str
|
|
) -> Optional[ModelModel]:
|
|
model = ModelModel(
|
|
**{
|
|
**form_data.model_dump(),
|
|
"user_id": user_id,
|
|
"created_at": int(time.time()),
|
|
"updated_at": int(time.time()),
|
|
}
|
|
)
|
|
try:
|
|
with get_db() as db:
|
|
result = Model(**model.model_dump())
|
|
db.add(result)
|
|
db.commit()
|
|
db.refresh(result)
|
|
|
|
if result:
|
|
return ModelModel.model_validate(result)
|
|
else:
|
|
return None
|
|
except Exception as e:
|
|
print(e)
|
|
return None
|
|
|
|
def get_all_models(self) -> list[ModelModel]:
|
|
with get_db() as db:
|
|
return [ModelModel.model_validate(model) for model in db.query(Model).all()]
|
|
|
|
def get_models(self) -> list[ModelUserResponse]:
|
|
with get_db() as db:
|
|
models = []
|
|
for model in db.query(Model).filter(Model.base_model_id != None).all():
|
|
user = Users.get_user_by_id(model.user_id)
|
|
models.append(
|
|
ModelUserResponse.model_validate(
|
|
{
|
|
**ModelModel.model_validate(model).model_dump(),
|
|
"user": user.model_dump() if user else None,
|
|
}
|
|
)
|
|
)
|
|
return models
|
|
|
|
def get_base_models(self) -> list[ModelModel]:
|
|
with get_db() as db:
|
|
return [
|
|
ModelModel.model_validate(model)
|
|
for model in db.query(Model).filter(Model.base_model_id == None).all()
|
|
]
|
|
|
|
def get_models_by_user_id(
|
|
self, user_id: str, permission: str = "write"
|
|
) -> list[ModelUserResponse]:
|
|
models = self.get_models()
|
|
return [
|
|
model
|
|
for model in models
|
|
if model.user_id == user_id
|
|
or has_access(user_id, permission, model.access_control)
|
|
]
|
|
|
|
def get_model_by_id(self, id: str) -> Optional[ModelModel]:
|
|
try:
|
|
with get_db() as db:
|
|
model = db.get(Model, id)
|
|
return ModelModel.model_validate(model)
|
|
except Exception:
|
|
return None
|
|
|
|
def toggle_model_by_id(self, id: str) -> Optional[ModelModel]:
|
|
with get_db() as db:
|
|
try:
|
|
is_active = db.query(Model).filter_by(id=id).first().is_active
|
|
|
|
db.query(Model).filter_by(id=id).update(
|
|
{
|
|
"is_active": not is_active,
|
|
"updated_at": int(time.time()),
|
|
}
|
|
)
|
|
db.commit()
|
|
|
|
return self.get_model_by_id(id)
|
|
except Exception:
|
|
return None
|
|
|
|
def update_model_by_id(self, id: str, model: ModelForm) -> Optional[ModelModel]:
|
|
try:
|
|
with get_db() as db:
|
|
# update only the fields that are present in the model
|
|
result = (
|
|
db.query(Model)
|
|
.filter_by(id=id)
|
|
.update(model.model_dump(exclude={"id"}))
|
|
)
|
|
db.commit()
|
|
|
|
model = db.get(Model, id)
|
|
db.refresh(model)
|
|
return ModelModel.model_validate(model)
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
return None
|
|
|
|
def delete_model_by_id(self, id: str) -> bool:
|
|
try:
|
|
with get_db() as db:
|
|
db.query(Model).filter_by(id=id).delete()
|
|
db.commit()
|
|
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
def delete_all_models(self) -> bool:
|
|
try:
|
|
with get_db() as db:
|
|
db.query(Model).delete()
|
|
db.commit()
|
|
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
Models = ModelsTable()
|