refac: folder rename web -> webui

This commit is contained in:
Timothy J. Baek
2024-05-26 01:15:48 -07:00
parent 6e89a481be
commit 1fce466253
35 changed files with 35 additions and 35 deletions

View File

@@ -0,0 +1,197 @@
from pydantic import BaseModel
from typing import List, Union, Optional
import time
import uuid
import logging
from peewee import *
from apps.webui.models.users import UserModel, Users
from utils.utils import verify_password
from apps.webui.internal.db import DB
from config import SRC_LOG_LEVELS
log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["MODELS"])
####################
# DB MODEL
####################
class Auth(Model):
id = CharField(unique=True)
email = CharField()
password = TextField()
active = BooleanField()
class Meta:
database = DB
class AuthModel(BaseModel):
id: str
email: str
password: str
active: bool = True
####################
# Forms
####################
class Token(BaseModel):
token: str
token_type: str
class ApiKey(BaseModel):
api_key: Optional[str] = None
class UserResponse(BaseModel):
id: str
email: str
name: str
role: str
profile_image_url: str
class SigninResponse(Token, UserResponse):
pass
class SigninForm(BaseModel):
email: str
password: str
class ProfileImageUrlForm(BaseModel):
profile_image_url: str
class UpdateProfileForm(BaseModel):
profile_image_url: str
name: str
class UpdatePasswordForm(BaseModel):
password: str
new_password: str
class SignupForm(BaseModel):
name: str
email: str
password: str
profile_image_url: Optional[str] = "/user.png"
class AddUserForm(SignupForm):
role: Optional[str] = "pending"
class AuthsTable:
def __init__(self, db):
self.db = db
self.db.create_tables([Auth])
def insert_new_auth(
self,
email: str,
password: str,
name: str,
profile_image_url: str = "/user.png",
role: str = "pending",
) -> Optional[UserModel]:
log.info("insert_new_auth")
id = str(uuid.uuid4())
auth = AuthModel(
**{"id": id, "email": email, "password": password, "active": True}
)
result = Auth.create(**auth.model_dump())
user = Users.insert_new_user(id, name, email, profile_image_url, role)
if result and user:
return user
else:
return None
def authenticate_user(self, email: str, password: str) -> Optional[UserModel]:
log.info(f"authenticate_user: {email}")
try:
auth = Auth.get(Auth.email == email, Auth.active == True)
if auth:
if verify_password(password, auth.password):
user = Users.get_user_by_id(auth.id)
return user
else:
return None
else:
return None
except:
return None
def authenticate_user_by_api_key(self, api_key: str) -> Optional[UserModel]:
log.info(f"authenticate_user_by_api_key: {api_key}")
# if no api_key, return None
if not api_key:
return None
try:
user = Users.get_user_by_api_key(api_key)
return user if user else None
except:
return False
def authenticate_user_by_trusted_header(self, email: str) -> Optional[UserModel]:
log.info(f"authenticate_user_by_trusted_header: {email}")
try:
auth = Auth.get(Auth.email == email, Auth.active == True)
if auth:
user = Users.get_user_by_id(auth.id)
return user
except:
return None
def update_user_password_by_id(self, id: str, new_password: str) -> bool:
try:
query = Auth.update(password=new_password).where(Auth.id == id)
result = query.execute()
return True if result == 1 else False
except:
return False
def update_email_by_id(self, id: str, email: str) -> bool:
try:
query = Auth.update(email=email).where(Auth.id == id)
result = query.execute()
return True if result == 1 else False
except:
return False
def delete_auth_by_id(self, id: str) -> bool:
try:
# Delete User
result = Users.delete_user_by_id(id)
if result:
# Delete Auth
query = Auth.delete().where(Auth.id == id)
query.execute() # Remove the rows, return number of rows removed.
return True
else:
return False
except:
return False
Auths = AuthsTable(DB)

View File

@@ -0,0 +1,318 @@
from pydantic import BaseModel
from typing import List, Union, Optional
from peewee import *
from playhouse.shortcuts import model_to_dict
import json
import uuid
import time
from apps.webui.internal.db import DB
####################
# Chat DB Schema
####################
class Chat(Model):
id = CharField(unique=True)
user_id = CharField()
title = TextField()
chat = TextField() # Save Chat JSON as Text
created_at = BigIntegerField()
updated_at = BigIntegerField()
share_id = CharField(null=True, unique=True)
archived = BooleanField(default=False)
class Meta:
database = DB
class ChatModel(BaseModel):
id: str
user_id: str
title: str
chat: str
created_at: int # timestamp in epoch
updated_at: int # timestamp in epoch
share_id: Optional[str] = None
archived: bool = False
####################
# Forms
####################
class ChatForm(BaseModel):
chat: dict
class ChatTitleForm(BaseModel):
title: str
class ChatResponse(BaseModel):
id: str
user_id: str
title: str
chat: dict
updated_at: int # timestamp in epoch
created_at: int # timestamp in epoch
share_id: Optional[str] = None # id of the chat to be shared
archived: bool
class ChatTitleIdResponse(BaseModel):
id: str
title: str
updated_at: int
created_at: int
class ChatTable:
def __init__(self, db):
self.db = db
db.create_tables([Chat])
def insert_new_chat(self, user_id: str, form_data: ChatForm) -> Optional[ChatModel]:
id = str(uuid.uuid4())
chat = ChatModel(
**{
"id": id,
"user_id": user_id,
"title": (
form_data.chat["title"] if "title" in form_data.chat else "New Chat"
),
"chat": json.dumps(form_data.chat),
"created_at": int(time.time()),
"updated_at": int(time.time()),
}
)
result = Chat.create(**chat.model_dump())
return chat if result else None
def update_chat_by_id(self, id: str, chat: dict) -> Optional[ChatModel]:
try:
query = Chat.update(
chat=json.dumps(chat),
title=chat["title"] if "title" in chat else "New Chat",
updated_at=int(time.time()),
).where(Chat.id == id)
query.execute()
chat = Chat.get(Chat.id == id)
return ChatModel(**model_to_dict(chat))
except:
return None
def insert_shared_chat_by_chat_id(self, chat_id: str) -> Optional[ChatModel]:
# Get the existing chat to share
chat = Chat.get(Chat.id == chat_id)
# Check if the chat is already shared
if chat.share_id:
return self.get_chat_by_id_and_user_id(chat.share_id, "shared")
# Create a new chat with the same data, but with a new ID
shared_chat = ChatModel(
**{
"id": str(uuid.uuid4()),
"user_id": f"shared-{chat_id}",
"title": chat.title,
"chat": chat.chat,
"created_at": chat.created_at,
"updated_at": int(time.time()),
}
)
shared_result = Chat.create(**shared_chat.model_dump())
# Update the original chat with the share_id
result = (
Chat.update(share_id=shared_chat.id).where(Chat.id == chat_id).execute()
)
return shared_chat if (shared_result and result) else None
def update_shared_chat_by_chat_id(self, chat_id: str) -> Optional[ChatModel]:
try:
print("update_shared_chat_by_id")
chat = Chat.get(Chat.id == chat_id)
print(chat)
query = Chat.update(
title=chat.title,
chat=chat.chat,
).where(Chat.id == chat.share_id)
query.execute()
chat = Chat.get(Chat.id == chat.share_id)
return ChatModel(**model_to_dict(chat))
except:
return None
def delete_shared_chat_by_chat_id(self, chat_id: str) -> bool:
try:
query = Chat.delete().where(Chat.user_id == f"shared-{chat_id}")
query.execute() # Remove the rows, return number of rows removed.
return True
except:
return False
def update_chat_share_id_by_id(
self, id: str, share_id: Optional[str]
) -> Optional[ChatModel]:
try:
query = Chat.update(
share_id=share_id,
).where(Chat.id == id)
query.execute()
chat = Chat.get(Chat.id == id)
return ChatModel(**model_to_dict(chat))
except:
return None
def toggle_chat_archive_by_id(self, id: str) -> Optional[ChatModel]:
try:
chat = self.get_chat_by_id(id)
query = Chat.update(
archived=(not chat.archived),
).where(Chat.id == id)
query.execute()
chat = Chat.get(Chat.id == id)
return ChatModel(**model_to_dict(chat))
except:
return None
def get_archived_chat_list_by_user_id(
self, user_id: str, skip: int = 0, limit: int = 50
) -> List[ChatModel]:
return [
ChatModel(**model_to_dict(chat))
for chat in Chat.select()
.where(Chat.archived == True)
.where(Chat.user_id == user_id)
.order_by(Chat.updated_at.desc())
# .limit(limit)
# .offset(skip)
]
def get_chat_list_by_user_id(
self, user_id: str, skip: int = 0, limit: int = 50
) -> List[ChatModel]:
return [
ChatModel(**model_to_dict(chat))
for chat in Chat.select()
.where(Chat.archived == False)
.where(Chat.user_id == user_id)
.order_by(Chat.updated_at.desc())
# .limit(limit)
# .offset(skip)
]
def get_chat_list_by_chat_ids(
self, chat_ids: List[str], skip: int = 0, limit: int = 50
) -> List[ChatModel]:
return [
ChatModel(**model_to_dict(chat))
for chat in Chat.select()
.where(Chat.archived == False)
.where(Chat.id.in_(chat_ids))
.order_by(Chat.updated_at.desc())
]
def get_chat_by_id(self, id: str) -> Optional[ChatModel]:
try:
chat = Chat.get(Chat.id == id)
return ChatModel(**model_to_dict(chat))
except:
return None
def get_chat_by_share_id(self, id: str) -> Optional[ChatModel]:
try:
chat = Chat.get(Chat.share_id == id)
if chat:
chat = Chat.get(Chat.id == id)
return ChatModel(**model_to_dict(chat))
else:
return None
except:
return None
def get_chat_by_id_and_user_id(self, id: str, user_id: str) -> Optional[ChatModel]:
try:
chat = Chat.get(Chat.id == id, Chat.user_id == user_id)
return ChatModel(**model_to_dict(chat))
except:
return None
def get_chats(self, skip: int = 0, limit: int = 50) -> List[ChatModel]:
return [
ChatModel(**model_to_dict(chat))
for chat in Chat.select().order_by(Chat.updated_at.desc())
# .limit(limit).offset(skip)
]
def get_chats_by_user_id(self, user_id: str) -> List[ChatModel]:
return [
ChatModel(**model_to_dict(chat))
for chat in Chat.select()
.where(Chat.user_id == user_id)
.order_by(Chat.updated_at.desc())
# .limit(limit).offset(skip)
]
def delete_chat_by_id(self, id: str) -> bool:
try:
query = Chat.delete().where((Chat.id == id))
query.execute() # Remove the rows, return number of rows removed.
return True and self.delete_shared_chat_by_chat_id(id)
except:
return False
def delete_chat_by_id_and_user_id(self, id: str, user_id: str) -> bool:
try:
query = Chat.delete().where((Chat.id == id) & (Chat.user_id == user_id))
query.execute() # Remove the rows, return number of rows removed.
return True and self.delete_shared_chat_by_chat_id(id)
except:
return False
def delete_chats_by_user_id(self, user_id: str) -> bool:
try:
self.delete_shared_chats_by_user_id(user_id)
query = Chat.delete().where(Chat.user_id == user_id)
query.execute() # Remove the rows, return number of rows removed.
return True
except:
return False
def delete_shared_chats_by_user_id(self, user_id: str) -> bool:
try:
shared_chat_ids = [
f"shared-{chat.id}"
for chat in Chat.select().where(Chat.user_id == user_id)
]
query = Chat.delete().where(Chat.user_id << shared_chat_ids)
query.execute() # Remove the rows, return number of rows removed.
return True
except:
return False
Chats = ChatTable(DB)

View File

@@ -0,0 +1,160 @@
from pydantic import BaseModel
from peewee import *
from playhouse.shortcuts import model_to_dict
from typing import List, Union, Optional
import time
import logging
from utils.utils import decode_token
from utils.misc import get_gravatar_url
from apps.webui.internal.db import DB
import json
from config import SRC_LOG_LEVELS
log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["MODELS"])
####################
# Documents DB Schema
####################
class Document(Model):
collection_name = CharField(unique=True)
name = CharField(unique=True)
title = TextField()
filename = TextField()
content = TextField(null=True)
user_id = CharField()
timestamp = BigIntegerField()
class Meta:
database = DB
class DocumentModel(BaseModel):
collection_name: str
name: str
title: str
filename: str
content: Optional[str] = None
user_id: str
timestamp: int # timestamp in epoch
####################
# Forms
####################
class DocumentResponse(BaseModel):
collection_name: str
name: str
title: str
filename: str
content: Optional[dict] = None
user_id: str
timestamp: int # timestamp in epoch
class DocumentUpdateForm(BaseModel):
name: str
title: str
class DocumentForm(DocumentUpdateForm):
collection_name: str
filename: str
content: Optional[str] = None
class DocumentsTable:
def __init__(self, db):
self.db = db
self.db.create_tables([Document])
def insert_new_doc(
self, user_id: str, form_data: DocumentForm
) -> Optional[DocumentModel]:
document = DocumentModel(
**{
**form_data.model_dump(),
"user_id": user_id,
"timestamp": int(time.time()),
}
)
try:
result = Document.create(**document.model_dump())
if result:
return document
else:
return None
except:
return None
def get_doc_by_name(self, name: str) -> Optional[DocumentModel]:
try:
document = Document.get(Document.name == name)
return DocumentModel(**model_to_dict(document))
except:
return None
def get_docs(self) -> List[DocumentModel]:
return [
DocumentModel(**model_to_dict(doc))
for doc in Document.select()
# .limit(limit).offset(skip)
]
def update_doc_by_name(
self, name: str, form_data: DocumentUpdateForm
) -> Optional[DocumentModel]:
try:
query = Document.update(
title=form_data.title,
name=form_data.name,
timestamp=int(time.time()),
).where(Document.name == name)
query.execute()
doc = Document.get(Document.name == form_data.name)
return DocumentModel(**model_to_dict(doc))
except Exception as e:
log.exception(e)
return None
def update_doc_content_by_name(
self, name: str, updated: dict
) -> Optional[DocumentModel]:
try:
doc = self.get_doc_by_name(name)
doc_content = json.loads(doc.content if doc.content else "{}")
doc_content = {**doc_content, **updated}
query = Document.update(
content=json.dumps(doc_content),
timestamp=int(time.time()),
).where(Document.name == name)
query.execute()
doc = Document.get(Document.name == name)
return DocumentModel(**model_to_dict(doc))
except Exception as e:
log.exception(e)
return None
def delete_doc_by_name(self, name: str) -> bool:
try:
query = Document.delete().where((Document.name == name))
query.execute() # Remove the rows, return number of rows removed.
return True
except:
return False
Documents = DocumentsTable(DB)

View File

@@ -0,0 +1,118 @@
from pydantic import BaseModel
from peewee import *
from playhouse.shortcuts import model_to_dict
from typing import List, Union, Optional
from apps.webui.internal.db import DB
from apps.webui.models.chats import Chats
import time
import uuid
####################
# Memory DB Schema
####################
class Memory(Model):
id = CharField(unique=True)
user_id = CharField()
content = TextField()
updated_at = BigIntegerField()
created_at = BigIntegerField()
class Meta:
database = DB
class MemoryModel(BaseModel):
id: str
user_id: str
content: str
updated_at: int # timestamp in epoch
created_at: int # timestamp in epoch
####################
# Forms
####################
class MemoriesTable:
def __init__(self, db):
self.db = db
self.db.create_tables([Memory])
def insert_new_memory(
self,
user_id: str,
content: str,
) -> Optional[MemoryModel]:
id = str(uuid.uuid4())
memory = MemoryModel(
**{
"id": id,
"user_id": user_id,
"content": content,
"created_at": int(time.time()),
"updated_at": int(time.time()),
}
)
result = Memory.create(**memory.model_dump())
if result:
return memory
else:
return None
def get_memories(self) -> List[MemoryModel]:
try:
memories = Memory.select()
return [MemoryModel(**model_to_dict(memory)) for memory in memories]
except:
return None
def get_memories_by_user_id(self, user_id: str) -> List[MemoryModel]:
try:
memories = Memory.select().where(Memory.user_id == user_id)
return [MemoryModel(**model_to_dict(memory)) for memory in memories]
except:
return None
def get_memory_by_id(self, id) -> Optional[MemoryModel]:
try:
memory = Memory.get(Memory.id == id)
return MemoryModel(**model_to_dict(memory))
except:
return None
def delete_memory_by_id(self, id: str) -> bool:
try:
query = Memory.delete().where(Memory.id == id)
query.execute() # Remove the rows, return number of rows removed.
return True
except:
return False
def delete_memories_by_user_id(self, user_id: str) -> bool:
try:
query = Memory.delete().where(Memory.user_id == user_id)
query.execute()
return True
except:
return False
def delete_memory_by_id_and_user_id(self, id: str, user_id: str) -> bool:
try:
query = Memory.delete().where(Memory.id == id, Memory.user_id == user_id)
query.execute()
return True
except:
return False
Memories = MemoriesTable(DB)

View File

@@ -0,0 +1,179 @@
import json
import logging
from typing import Optional
import peewee as pw
from peewee import *
from playhouse.shortcuts import model_to_dict
from pydantic import BaseModel, ConfigDict
from apps.webui.internal.db import DB, JSONField
from typing import List, Union, Optional
from config import SRC_LOG_LEVELS
import time
log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["MODELS"])
####################
# Models DB Schema
####################
# ModelParams is a model for the data stored in the params field of the Model table
class ModelParams(BaseModel):
model_config = ConfigDict(extra="allow")
pass
# ModelMeta is a model for the data stored in the meta field of the Model table
class ModelMeta(BaseModel):
profile_image_url: Optional[str] = "/favicon.png"
description: Optional[str] = None
"""
User-facing description of the model.
"""
capabilities: Optional[dict] = None
model_config = ConfigDict(extra="allow")
pass
class Model(pw.Model):
id = pw.TextField(unique=True)
"""
The model's id as used in the API. If set to an existing model, it will override the model.
"""
user_id = pw.TextField()
base_model_id = pw.TextField(null=True)
"""
An optional pointer to the actual model that should be used when proxying requests.
"""
name = pw.TextField()
"""
The human-readable display name of the model.
"""
params = JSONField()
"""
Holds a JSON encoded blob of parameters, see `ModelParams`.
"""
meta = JSONField()
"""
Holds a JSON encoded blob of metadata, see `ModelMeta`.
"""
updated_at = BigIntegerField()
created_at = BigIntegerField()
class Meta:
database = DB
class ModelModel(BaseModel):
id: str
user_id: str
base_model_id: Optional[str] = None
name: str
params: ModelParams
meta: ModelMeta
updated_at: int # timestamp in epoch
created_at: int # timestamp in epoch
####################
# Forms
####################
class ModelResponse(BaseModel):
id: str
name: str
meta: ModelMeta
updated_at: int # timestamp in epoch
created_at: int # timestamp in epoch
class ModelForm(BaseModel):
id: str
base_model_id: Optional[str] = None
name: str
meta: ModelMeta
params: ModelParams
class ModelsTable:
def __init__(
self,
db: pw.SqliteDatabase | pw.PostgresqlDatabase,
):
self.db = db
self.db.create_tables([Model])
def insert_new_model(
self, form_data: ModelForm, user_id: str
) -> Optional[ModelModel]:
model = ModelModel(
**{
**form_data.model_dump(),
"user_id": user_id,
"created_at": int(time.time()),
"updated_at": int(time.time()),
}
)
try:
result = Model.create(**model.model_dump())
if result:
return model
else:
return None
except Exception as e:
print(e)
return None
def get_all_models(self) -> List[ModelModel]:
return [ModelModel(**model_to_dict(model)) for model in Model.select()]
def get_model_by_id(self, id: str) -> Optional[ModelModel]:
try:
model = Model.get(Model.id == id)
return ModelModel(**model_to_dict(model))
except:
return None
def update_model_by_id(self, id: str, model: ModelForm) -> Optional[ModelModel]:
try:
# update only the fields that are present in the model
query = Model.update(**model.model_dump()).where(Model.id == id)
query.execute()
model = Model.get(Model.id == id)
return ModelModel(**model_to_dict(model))
except Exception as e:
print(e)
return None
def delete_model_by_id(self, id: str) -> bool:
try:
query = Model.delete().where(Model.id == id)
query.execute()
return True
except:
return False
Models = ModelsTable(DB)

View File

@@ -0,0 +1,118 @@
from pydantic import BaseModel
from peewee import *
from playhouse.shortcuts import model_to_dict
from typing import List, Union, Optional
import time
from utils.utils import decode_token
from utils.misc import get_gravatar_url
from apps.webui.internal.db import DB
import json
####################
# Prompts DB Schema
####################
class Prompt(Model):
command = CharField(unique=True)
user_id = CharField()
title = TextField()
content = TextField()
timestamp = BigIntegerField()
class Meta:
database = DB
class PromptModel(BaseModel):
command: str
user_id: str
title: str
content: str
timestamp: int # timestamp in epoch
####################
# Forms
####################
class PromptForm(BaseModel):
command: str
title: str
content: str
class PromptsTable:
def __init__(self, db):
self.db = db
self.db.create_tables([Prompt])
def insert_new_prompt(
self, user_id: str, form_data: PromptForm
) -> Optional[PromptModel]:
prompt = PromptModel(
**{
"user_id": user_id,
"command": form_data.command,
"title": form_data.title,
"content": form_data.content,
"timestamp": int(time.time()),
}
)
try:
result = Prompt.create(**prompt.model_dump())
if result:
return prompt
else:
return None
except:
return None
def get_prompt_by_command(self, command: str) -> Optional[PromptModel]:
try:
prompt = Prompt.get(Prompt.command == command)
return PromptModel(**model_to_dict(prompt))
except:
return None
def get_prompts(self) -> List[PromptModel]:
return [
PromptModel(**model_to_dict(prompt))
for prompt in Prompt.select()
# .limit(limit).offset(skip)
]
def update_prompt_by_command(
self, command: str, form_data: PromptForm
) -> Optional[PromptModel]:
try:
query = Prompt.update(
title=form_data.title,
content=form_data.content,
timestamp=int(time.time()),
).where(Prompt.command == command)
query.execute()
prompt = Prompt.get(Prompt.command == command)
return PromptModel(**model_to_dict(prompt))
except:
return None
def delete_prompt_by_command(self, command: str) -> bool:
try:
query = Prompt.delete().where((Prompt.command == command))
query.execute() # Remove the rows, return number of rows removed.
return True
except:
return False
Prompts = PromptsTable(DB)

View File

@@ -0,0 +1,237 @@
from pydantic import BaseModel
from typing import List, Union, Optional
from peewee import *
from playhouse.shortcuts import model_to_dict
import json
import uuid
import time
import logging
from apps.webui.internal.db import DB
from config import SRC_LOG_LEVELS
log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["MODELS"])
####################
# Tag DB Schema
####################
class Tag(Model):
id = CharField(unique=True)
name = CharField()
user_id = CharField()
data = TextField(null=True)
class Meta:
database = DB
class ChatIdTag(Model):
id = CharField(unique=True)
tag_name = CharField()
chat_id = CharField()
user_id = CharField()
timestamp = BigIntegerField()
class Meta:
database = DB
class TagModel(BaseModel):
id: str
name: str
user_id: str
data: Optional[str] = None
class ChatIdTagModel(BaseModel):
id: str
tag_name: str
chat_id: str
user_id: str
timestamp: int
####################
# Forms
####################
class ChatIdTagForm(BaseModel):
tag_name: str
chat_id: str
class TagChatIdsResponse(BaseModel):
chat_ids: List[str]
class ChatTagsResponse(BaseModel):
tags: List[str]
class TagTable:
def __init__(self, db):
self.db = db
db.create_tables([Tag, ChatIdTag])
def insert_new_tag(self, name: str, user_id: str) -> Optional[TagModel]:
id = str(uuid.uuid4())
tag = TagModel(**{"id": id, "user_id": user_id, "name": name})
try:
result = Tag.create(**tag.model_dump())
if result:
return tag
else:
return None
except Exception as e:
return None
def get_tag_by_name_and_user_id(
self, name: str, user_id: str
) -> Optional[TagModel]:
try:
tag = Tag.get(Tag.name == name, Tag.user_id == user_id)
return TagModel(**model_to_dict(tag))
except Exception as e:
return None
def add_tag_to_chat(
self, user_id: str, form_data: ChatIdTagForm
) -> Optional[ChatIdTagModel]:
tag = self.get_tag_by_name_and_user_id(form_data.tag_name, user_id)
if tag == None:
tag = self.insert_new_tag(form_data.tag_name, user_id)
id = str(uuid.uuid4())
chatIdTag = ChatIdTagModel(
**{
"id": id,
"user_id": user_id,
"chat_id": form_data.chat_id,
"tag_name": tag.name,
"timestamp": int(time.time()),
}
)
try:
result = ChatIdTag.create(**chatIdTag.model_dump())
if result:
return chatIdTag
else:
return None
except:
return None
def get_tags_by_user_id(self, user_id: str) -> List[TagModel]:
tag_names = [
ChatIdTagModel(**model_to_dict(chat_id_tag)).tag_name
for chat_id_tag in ChatIdTag.select()
.where(ChatIdTag.user_id == user_id)
.order_by(ChatIdTag.timestamp.desc())
]
return [
TagModel(**model_to_dict(tag))
for tag in Tag.select()
.where(Tag.user_id == user_id)
.where(Tag.name.in_(tag_names))
]
def get_tags_by_chat_id_and_user_id(
self, chat_id: str, user_id: str
) -> List[TagModel]:
tag_names = [
ChatIdTagModel(**model_to_dict(chat_id_tag)).tag_name
for chat_id_tag in ChatIdTag.select()
.where((ChatIdTag.user_id == user_id) & (ChatIdTag.chat_id == chat_id))
.order_by(ChatIdTag.timestamp.desc())
]
return [
TagModel(**model_to_dict(tag))
for tag in Tag.select()
.where(Tag.user_id == user_id)
.where(Tag.name.in_(tag_names))
]
def get_chat_ids_by_tag_name_and_user_id(
self, tag_name: str, user_id: str
) -> Optional[ChatIdTagModel]:
return [
ChatIdTagModel(**model_to_dict(chat_id_tag))
for chat_id_tag in ChatIdTag.select()
.where((ChatIdTag.user_id == user_id) & (ChatIdTag.tag_name == tag_name))
.order_by(ChatIdTag.timestamp.desc())
]
def count_chat_ids_by_tag_name_and_user_id(
self, tag_name: str, user_id: str
) -> int:
return (
ChatIdTag.select()
.where((ChatIdTag.tag_name == tag_name) & (ChatIdTag.user_id == user_id))
.count()
)
def delete_tag_by_tag_name_and_user_id(self, tag_name: str, user_id: str) -> bool:
try:
query = ChatIdTag.delete().where(
(ChatIdTag.tag_name == tag_name) & (ChatIdTag.user_id == user_id)
)
res = query.execute() # Remove the rows, return number of rows removed.
log.debug(f"res: {res}")
tag_count = self.count_chat_ids_by_tag_name_and_user_id(tag_name, user_id)
if tag_count == 0:
# Remove tag item from Tag col as well
query = Tag.delete().where(
(Tag.name == tag_name) & (Tag.user_id == user_id)
)
query.execute() # Remove the rows, return number of rows removed.
return True
except Exception as e:
log.error(f"delete_tag: {e}")
return False
def delete_tag_by_tag_name_and_chat_id_and_user_id(
self, tag_name: str, chat_id: str, user_id: str
) -> bool:
try:
query = ChatIdTag.delete().where(
(ChatIdTag.tag_name == tag_name)
& (ChatIdTag.chat_id == chat_id)
& (ChatIdTag.user_id == user_id)
)
res = query.execute() # Remove the rows, return number of rows removed.
log.debug(f"res: {res}")
tag_count = self.count_chat_ids_by_tag_name_and_user_id(tag_name, user_id)
if tag_count == 0:
# Remove tag item from Tag col as well
query = Tag.delete().where(
(Tag.name == tag_name) & (Tag.user_id == user_id)
)
query.execute() # Remove the rows, return number of rows removed.
return True
except Exception as e:
log.error(f"delete_tag: {e}")
return False
def delete_tags_by_chat_id_and_user_id(self, chat_id: str, user_id: str) -> bool:
tags = self.get_tags_by_chat_id_and_user_id(chat_id, user_id)
for tag in tags:
self.delete_tag_by_tag_name_and_chat_id_and_user_id(
tag.tag_name, chat_id, user_id
)
return True
Tags = TagTable(DB)

View File

@@ -0,0 +1,210 @@
from pydantic import BaseModel
from peewee import *
from playhouse.shortcuts import model_to_dict
from typing import List, Union, Optional
import time
from utils.misc import get_gravatar_url
from apps.webui.internal.db import DB
from apps.webui.models.chats import Chats
####################
# User DB Schema
####################
class User(Model):
id = CharField(unique=True)
name = CharField()
email = CharField()
role = CharField()
profile_image_url = TextField()
last_active_at = BigIntegerField()
updated_at = BigIntegerField()
created_at = BigIntegerField()
api_key = CharField(null=True, unique=True)
class Meta:
database = DB
class UserModel(BaseModel):
id: str
name: str
email: str
role: str = "pending"
profile_image_url: str
last_active_at: int # timestamp in epoch
updated_at: int # timestamp in epoch
created_at: int # timestamp in epoch
api_key: Optional[str] = None
####################
# Forms
####################
class UserRoleUpdateForm(BaseModel):
id: str
role: str
class UserUpdateForm(BaseModel):
name: str
email: str
profile_image_url: str
password: Optional[str] = None
class UsersTable:
def __init__(self, db):
self.db = db
self.db.create_tables([User])
def insert_new_user(
self,
id: str,
name: str,
email: str,
profile_image_url: str = "/user.png",
role: str = "pending",
) -> Optional[UserModel]:
user = UserModel(
**{
"id": id,
"name": name,
"email": email,
"role": role,
"profile_image_url": profile_image_url,
"last_active_at": int(time.time()),
"created_at": int(time.time()),
"updated_at": int(time.time()),
}
)
result = User.create(**user.model_dump())
if result:
return user
else:
return None
def get_user_by_id(self, id: str) -> Optional[UserModel]:
try:
user = User.get(User.id == id)
return UserModel(**model_to_dict(user))
except:
return None
def get_user_by_api_key(self, api_key: str) -> Optional[UserModel]:
try:
user = User.get(User.api_key == api_key)
return UserModel(**model_to_dict(user))
except:
return None
def get_user_by_email(self, email: str) -> Optional[UserModel]:
try:
user = User.get(User.email == email)
return UserModel(**model_to_dict(user))
except:
return None
def get_users(self, skip: int = 0, limit: int = 50) -> List[UserModel]:
return [
UserModel(**model_to_dict(user))
for user in User.select()
# .limit(limit).offset(skip)
]
def get_num_users(self) -> Optional[int]:
return User.select().count()
def get_first_user(self) -> UserModel:
try:
user = User.select().order_by(User.created_at).first()
return UserModel(**model_to_dict(user))
except:
return None
def update_user_role_by_id(self, id: str, role: str) -> Optional[UserModel]:
try:
query = User.update(role=role).where(User.id == id)
query.execute()
user = User.get(User.id == id)
return UserModel(**model_to_dict(user))
except:
return None
def update_user_profile_image_url_by_id(
self, id: str, profile_image_url: str
) -> Optional[UserModel]:
try:
query = User.update(profile_image_url=profile_image_url).where(
User.id == id
)
query.execute()
user = User.get(User.id == id)
return UserModel(**model_to_dict(user))
except:
return None
def update_user_last_active_by_id(self, id: str) -> Optional[UserModel]:
try:
query = User.update(last_active_at=int(time.time())).where(User.id == id)
query.execute()
user = User.get(User.id == id)
return UserModel(**model_to_dict(user))
except:
return None
def update_user_by_id(self, id: str, updated: dict) -> Optional[UserModel]:
try:
query = User.update(**updated).where(User.id == id)
query.execute()
user = User.get(User.id == id)
return UserModel(**model_to_dict(user))
except:
return None
def delete_user_by_id(self, id: str) -> bool:
try:
# Delete User Chats
result = Chats.delete_chats_by_user_id(id)
if result:
# Delete User
query = User.delete().where(User.id == id)
query.execute() # Remove the rows, return number of rows removed.
return True
else:
return False
except:
return False
def update_user_api_key_by_id(self, id: str, api_key: str) -> str:
try:
query = User.update(api_key=api_key).where(User.id == id)
result = query.execute()
return True if result == 1 else False
except:
return False
def get_user_api_key_by_id(self, id: str) -> Optional[str]:
try:
user = User.get(User.id == id)
return user.api_key
except:
return None
Users = UsersTable(DB)