open-webui/backend/apps/webui/models/auths.py
2024-06-27 07:48:08 +02:00

204 lines
4.9 KiB
Python

from pydantic import BaseModel
from typing import Optional
import uuid
import logging
from sqlalchemy import String, Column, Boolean
from sqlalchemy.orm import Session
from apps.webui.models.users import UserModel, Users
from utils.utils import verify_password
from apps.webui.internal.db import Base, get_session
from config import SRC_LOG_LEVELS
log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["MODELS"])
####################
# DB MODEL
####################
class Auth(Base):
__tablename__ = "auth"
id = Column(String, primary_key=True)
email = Column(String)
password = Column(String)
active = Column(Boolean)
class AuthModel(BaseModel):
id: str
email: str
password: str
active: bool = True
####################
# Forms
####################
class Token(BaseModel):
token: str
token_type: str
class ApiKey(BaseModel):
api_key: Optional[str] = None
class UserResponse(BaseModel):
id: str
email: str
name: str
role: str
profile_image_url: str
class SigninResponse(Token, UserResponse):
pass
class SigninForm(BaseModel):
email: str
password: str
class ProfileImageUrlForm(BaseModel):
profile_image_url: str
class UpdateProfileForm(BaseModel):
profile_image_url: str
name: str
class UpdatePasswordForm(BaseModel):
password: str
new_password: str
class SignupForm(BaseModel):
name: str
email: str
password: str
profile_image_url: Optional[str] = "/user.png"
class AddUserForm(SignupForm):
role: Optional[str] = "pending"
class AuthsTable:
def insert_new_auth(
self,
email: str,
password: str,
name: str,
profile_image_url: str = "/user.png",
role: str = "pending",
oauth_sub: Optional[str] = None,
) -> Optional[UserModel]:
with get_session() as db:
log.info("insert_new_auth")
id = str(uuid.uuid4())
auth = AuthModel(
**{"id": id, "email": email, "password": password, "active": True}
)
result = Auth(**auth.model_dump())
db.add(result)
user = Users.insert_new_user(
id, name, email, profile_image_url, role, oauth_sub
)
db.commit()
db.refresh(result)
if result and user:
return user
else:
return None
def authenticate_user(self, email: str, password: str) -> Optional[UserModel]:
log.info(f"authenticate_user: {email}")
with get_session() as db:
try:
auth = db.query(Auth).filter_by(email=email, active=True).first()
if auth:
if verify_password(password, auth.password):
user = Users.get_user_by_id(auth.id)
return user
else:
return None
else:
return None
except:
return None
def authenticate_user_by_api_key(self, api_key: str) -> Optional[UserModel]:
log.info(f"authenticate_user_by_api_key: {api_key}")
with get_session() as db:
# if no api_key, return None
if not api_key:
return None
try:
user = Users.get_user_by_api_key(api_key)
return user if user else None
except:
return False
def authenticate_user_by_trusted_header(self, email: str) -> Optional[UserModel]:
log.info(f"authenticate_user_by_trusted_header: {email}")
with get_session() as db:
try:
auth = db.query(Auth).filter(email=email, active=True).first()
if auth:
user = Users.get_user_by_id(auth.id)
return user
except:
return None
def update_user_password_by_id(self, id: str, new_password: str) -> bool:
with get_session() as db:
try:
result = (
db.query(Auth).filter_by(id=id).update({"password": new_password})
)
return True if result == 1 else False
except:
return False
def update_email_by_id(self, id: str, email: str) -> bool:
with get_session() as db:
try:
result = db.query(Auth).filter_by(id=id).update({"email": email})
return True if result == 1 else False
except:
return False
def delete_auth_by_id(self, id: str) -> bool:
with get_session() as db:
try:
# Delete User
result = Users.delete_user_by_id(id)
if result:
db.query(Auth).filter_by(id=id).delete()
return True
else:
return False
except:
return False
Auths = AuthsTable()