from pydantic import BaseModel, ConfigDict from peewee import * from playhouse.shortcuts import model_to_dict from typing import List, Union, Optional import time from utils.misc import get_gravatar_url from apps.webui.internal.db import DB, JSONField from apps.webui.models.chats import Chats #################### # User DB Schema #################### class User(Model): id = CharField(unique=True) name = CharField() email = CharField() role = CharField() profile_image_url = TextField() last_active_at = BigIntegerField() updated_at = BigIntegerField() created_at = BigIntegerField() api_key = CharField(null=True, unique=True) settings = JSONField(null=True) info = JSONField(null=True) oauth_sub = TextField(null=True, unique=True) class Meta: database = DB class UserSettings(BaseModel): ui: Optional[dict] = {} model_config = ConfigDict(extra="allow") pass class UserModel(BaseModel): id: str name: str email: str role: str = "pending" profile_image_url: str last_active_at: int # timestamp in epoch updated_at: int # timestamp in epoch created_at: int # timestamp in epoch api_key: Optional[str] = None settings: Optional[UserSettings] = None info: Optional[dict] = None oauth_sub: Optional[str] = None #################### # Forms #################### class UserRoleUpdateForm(BaseModel): id: str role: str class UserUpdateForm(BaseModel): name: str email: str profile_image_url: str password: Optional[str] = None class UsersTable: def __init__(self, db): self.db = db self.db.create_tables([User]) def insert_new_user( self, id: str, name: str, email: str, profile_image_url: str = "/user.png", role: str = "pending", oauth_sub: Optional[str] = None, ) -> Optional[UserModel]: user = UserModel( **{ "id": id, "name": name, "email": email, "role": role, "profile_image_url": profile_image_url, "last_active_at": int(time.time()), "created_at": int(time.time()), "updated_at": int(time.time()), "oauth_sub": oauth_sub, } ) result = User.create(**user.model_dump()) if result: return user else: return None def get_user_by_id(self, id: str) -> Optional[UserModel]: try: user = User.get(User.id == id) return UserModel(**model_to_dict(user)) except: return None def get_user_by_api_key(self, api_key: str) -> Optional[UserModel]: try: user = User.get(User.api_key == api_key) return UserModel(**model_to_dict(user)) except: return None def get_user_by_email( self, email: str, oauth_user: bool = False ) -> Optional[UserModel]: try: conditions = ( (User.email == email, User.oauth_sub.is_null()) if not oauth_user else (User.email == email) ) user = User.get(*conditions) return UserModel(**model_to_dict(user)) except: return None def get_user_by_oauth_sub(self, sub: str) -> Optional[UserModel]: try: user = User.get(User.oauth_sub == sub) return UserModel(**model_to_dict(user)) except: return None def get_users(self, skip: int = 0, limit: int = 50) -> List[UserModel]: return [ UserModel(**model_to_dict(user)) for user in User.select() # .limit(limit).offset(skip) ] def get_num_users(self) -> Optional[int]: return User.select().count() def get_first_user(self) -> UserModel: try: user = User.select().order_by(User.created_at).first() return UserModel(**model_to_dict(user)) except: return None def update_user_role_by_id(self, id: str, role: str) -> Optional[UserModel]: try: query = User.update(role=role).where(User.id == id) query.execute() user = User.get(User.id == id) return UserModel(**model_to_dict(user)) except: return None def update_user_profile_image_url_by_id( self, id: str, profile_image_url: str ) -> Optional[UserModel]: try: query = User.update(profile_image_url=profile_image_url).where( User.id == id ) query.execute() user = User.get(User.id == id) return UserModel(**model_to_dict(user)) except: return None def update_user_last_active_by_id(self, id: str) -> Optional[UserModel]: try: query = User.update(last_active_at=int(time.time())).where(User.id == id) query.execute() user = User.get(User.id == id) return UserModel(**model_to_dict(user)) except: return None def update_user_oauth_sub_by_id( self, id: str, oauth_sub: str ) -> Optional[UserModel]: try: query = User.update(oauth_sub=oauth_sub).where(User.id == id) query.execute() user = User.get(User.id == id) return UserModel(**model_to_dict(user)) except: return None def update_user_by_id(self, id: str, updated: dict) -> Optional[UserModel]: try: query = User.update(**updated).where(User.id == id) query.execute() user = User.get(User.id == id) return UserModel(**model_to_dict(user)) except: return None def delete_user_by_id(self, id: str) -> bool: try: # Delete User Chats result = Chats.delete_chats_by_user_id(id) if result: # Delete User query = User.delete().where(User.id == id) query.execute() # Remove the rows, return number of rows removed. return True else: return False except: return False def update_user_api_key_by_id(self, id: str, api_key: str) -> str: try: query = User.update(api_key=api_key).where(User.id == id) result = query.execute() return True if result == 1 else False except: return False def get_user_api_key_by_id(self, id: str) -> Optional[str]: try: user = User.get(User.id == id) return user.api_key except: return None Users = UsersTable(DB)