from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi import HTTPException, status, Depends, Request from sqlalchemy.orm import Session from apps.webui.models.users import Users from pydantic import BaseModel from typing import Union, Optional from constants import ERROR_MESSAGES from passlib.context import CryptContext from datetime import datetime, timedelta import requests import jwt import uuid import logging import config logging.getLogger("passlib").setLevel(logging.ERROR) SESSION_SECRET = config.WEBUI_SECRET_KEY ALGORITHM = "HS256" ############## # Auth Utils ############## bearer_security = HTTPBearer(auto_error=False) pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def verify_password(plain_password, hashed_password): return ( pwd_context.verify(plain_password, hashed_password) if hashed_password else None ) def get_password_hash(password): return pwd_context.hash(password) def create_token(data: dict, expires_delta: Union[timedelta, None] = None) -> str: payload = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta payload.update({"exp": expire}) encoded_jwt = jwt.encode(payload, SESSION_SECRET, algorithm=ALGORITHM) return encoded_jwt def decode_token(token: str) -> Optional[dict]: try: decoded = jwt.decode(token, SESSION_SECRET, algorithms=[ALGORITHM]) return decoded except Exception as e: return None def extract_token_from_auth_header(auth_header: str): return auth_header[len("Bearer ") :] def create_api_key(): key = str(uuid.uuid4()).replace("-", "") return f"sk-{key}" def get_http_authorization_cred(auth_header: str): try: scheme, credentials = auth_header.split(" ") return HTTPAuthorizationCredentials(scheme=scheme, credentials=credentials) except: raise ValueError(ERROR_MESSAGES.INVALID_TOKEN) def get_current_user( request: Request, auth_token: HTTPAuthorizationCredentials = Depends(bearer_security), ): token = None if auth_token is not None: token = auth_token.credentials if token is None and "token" in request.cookies: token = request.cookies.get("token") if token is None: raise HTTPException(status_code=403, detail="Not authenticated") # auth by api key if token.startswith("sk-"): return get_current_user_by_api_key(token) # auth by jwt token data = decode_token(token) if data != None and "id" in data: user = Users.get_user_by_id(data["id"]) if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=ERROR_MESSAGES.INVALID_TOKEN, ) else: Users.update_user_last_active_by_id(user.id) return user else: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=ERROR_MESSAGES.UNAUTHORIZED, ) def get_current_user_by_api_key(db: Session, api_key: str): user = Users.get_user_by_api_key(db, api_key) if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=ERROR_MESSAGES.INVALID_TOKEN, ) else: Users.update_user_last_active_by_id(db, user.id) return user def get_verified_user(user=Depends(get_current_user)): if user.role not in {"user", "admin"}: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=ERROR_MESSAGES.ACCESS_PROHIBITED, ) return user def get_admin_user(user=Depends(get_current_user)): if user.role != "admin": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=ERROR_MESSAGES.ACCESS_PROHIBITED, ) return user