mirror of
https://github.com/open-webui/open-webui
synced 2025-01-10 04:47:42 +00:00
222 lines
6.5 KiB
Python
222 lines
6.5 KiB
Python
import json
|
|
import logging
|
|
import time
|
|
from typing import Optional
|
|
import uuid
|
|
|
|
from open_webui.apps.webui.internal.db import Base, get_db
|
|
from open_webui.env import SRC_LOG_LEVELS
|
|
|
|
from open_webui.apps.webui.models.files import FileMetadataResponse
|
|
from open_webui.apps.webui.models.users import Users, UserResponse
|
|
|
|
|
|
from pydantic import BaseModel, ConfigDict
|
|
from sqlalchemy import BigInteger, Column, String, Text, JSON
|
|
|
|
from open_webui.utils.access_control import has_access
|
|
|
|
log = logging.getLogger(__name__)
|
|
log.setLevel(SRC_LOG_LEVELS["MODELS"])
|
|
|
|
####################
|
|
# Knowledge DB Schema
|
|
####################
|
|
|
|
|
|
class Knowledge(Base):
|
|
__tablename__ = "knowledge"
|
|
|
|
id = Column(Text, unique=True, primary_key=True)
|
|
user_id = Column(Text)
|
|
|
|
name = Column(Text)
|
|
description = Column(Text)
|
|
|
|
data = Column(JSON, nullable=True)
|
|
meta = Column(JSON, nullable=True)
|
|
|
|
access_control = Column(JSON, nullable=True) # Controls data access levels.
|
|
# Defines access control rules for this entry.
|
|
# - `None`: Public access, available to all users with the "user" role.
|
|
# - `{}`: Private access, restricted exclusively to the owner.
|
|
# - Custom permissions: Specific access control for reading and writing;
|
|
# Can specify group or user-level restrictions:
|
|
# {
|
|
# "read": {
|
|
# "group_ids": ["group_id1", "group_id2"],
|
|
# "user_ids": ["user_id1", "user_id2"]
|
|
# },
|
|
# "write": {
|
|
# "group_ids": ["group_id1", "group_id2"],
|
|
# "user_ids": ["user_id1", "user_id2"]
|
|
# }
|
|
# }
|
|
|
|
created_at = Column(BigInteger)
|
|
updated_at = Column(BigInteger)
|
|
|
|
|
|
class KnowledgeModel(BaseModel):
|
|
model_config = ConfigDict(from_attributes=True)
|
|
|
|
id: str
|
|
user_id: str
|
|
|
|
name: str
|
|
description: str
|
|
|
|
data: Optional[dict] = None
|
|
meta: Optional[dict] = None
|
|
|
|
access_control: Optional[dict] = None
|
|
|
|
created_at: int # timestamp in epoch
|
|
updated_at: int # timestamp in epoch
|
|
|
|
|
|
####################
|
|
# Forms
|
|
####################
|
|
|
|
|
|
class KnowledgeUserModel(KnowledgeModel):
|
|
user: Optional[UserResponse] = None
|
|
|
|
|
|
class KnowledgeResponse(KnowledgeModel):
|
|
files: Optional[list[FileMetadataResponse | dict]] = None
|
|
|
|
|
|
class KnowledgeUserResponse(KnowledgeUserModel):
|
|
files: Optional[list[FileMetadataResponse | dict]] = None
|
|
|
|
|
|
class KnowledgeForm(BaseModel):
|
|
name: str
|
|
description: str
|
|
data: Optional[dict] = None
|
|
access_control: Optional[dict] = None
|
|
|
|
|
|
class KnowledgeTable:
|
|
def insert_new_knowledge(
|
|
self, user_id: str, form_data: KnowledgeForm
|
|
) -> Optional[KnowledgeModel]:
|
|
with get_db() as db:
|
|
knowledge = KnowledgeModel(
|
|
**{
|
|
**form_data.model_dump(),
|
|
"id": str(uuid.uuid4()),
|
|
"user_id": user_id,
|
|
"created_at": int(time.time()),
|
|
"updated_at": int(time.time()),
|
|
}
|
|
)
|
|
|
|
try:
|
|
result = Knowledge(**knowledge.model_dump())
|
|
db.add(result)
|
|
db.commit()
|
|
db.refresh(result)
|
|
if result:
|
|
return KnowledgeModel.model_validate(result)
|
|
else:
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
def get_knowledge_bases(self) -> list[KnowledgeUserModel]:
|
|
with get_db() as db:
|
|
knowledge_bases = []
|
|
for knowledge in (
|
|
db.query(Knowledge).order_by(Knowledge.updated_at.desc()).all()
|
|
):
|
|
user = Users.get_user_by_id(knowledge.user_id)
|
|
knowledge_bases.append(
|
|
KnowledgeUserModel.model_validate(
|
|
{
|
|
**KnowledgeModel.model_validate(knowledge).model_dump(),
|
|
"user": user.model_dump() if user else None,
|
|
}
|
|
)
|
|
)
|
|
return knowledge_bases
|
|
|
|
def get_knowledge_bases_by_user_id(
|
|
self, user_id: str, permission: str = "write"
|
|
) -> list[KnowledgeUserModel]:
|
|
knowledge_bases = self.get_knowledge_bases()
|
|
return [
|
|
knowledge_base
|
|
for knowledge_base in knowledge_bases
|
|
if knowledge_base.user_id == user_id
|
|
or has_access(user_id, permission, knowledge_base.access_control)
|
|
]
|
|
|
|
def get_knowledge_by_id(self, id: str) -> Optional[KnowledgeModel]:
|
|
try:
|
|
with get_db() as db:
|
|
knowledge = db.query(Knowledge).filter_by(id=id).first()
|
|
return KnowledgeModel.model_validate(knowledge) if knowledge else None
|
|
except Exception:
|
|
return None
|
|
|
|
def update_knowledge_by_id(
|
|
self, id: str, form_data: KnowledgeForm, overwrite: bool = False
|
|
) -> Optional[KnowledgeModel]:
|
|
try:
|
|
with get_db() as db:
|
|
knowledge = self.get_knowledge_by_id(id=id)
|
|
db.query(Knowledge).filter_by(id=id).update(
|
|
{
|
|
**form_data.model_dump(),
|
|
"updated_at": int(time.time()),
|
|
}
|
|
)
|
|
db.commit()
|
|
return self.get_knowledge_by_id(id=id)
|
|
except Exception as e:
|
|
log.exception(e)
|
|
return None
|
|
|
|
def update_knowledge_data_by_id(
|
|
self, id: str, data: dict
|
|
) -> Optional[KnowledgeModel]:
|
|
try:
|
|
with get_db() as db:
|
|
knowledge = self.get_knowledge_by_id(id=id)
|
|
db.query(Knowledge).filter_by(id=id).update(
|
|
{
|
|
"data": data,
|
|
"updated_at": int(time.time()),
|
|
}
|
|
)
|
|
db.commit()
|
|
return self.get_knowledge_by_id(id=id)
|
|
except Exception as e:
|
|
log.exception(e)
|
|
return None
|
|
|
|
def delete_knowledge_by_id(self, id: str) -> bool:
|
|
try:
|
|
with get_db() as db:
|
|
db.query(Knowledge).filter_by(id=id).delete()
|
|
db.commit()
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
def delete_all_knowledge(self) -> bool:
|
|
with get_db() as db:
|
|
try:
|
|
db.query(Knowledge).delete()
|
|
db.commit()
|
|
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
Knowledges = KnowledgeTable()
|