pipelines/utils/pipelines/auth.py
Timothy J. Baek dd98a71d0b refac
2024-06-02 11:43:41 -07:00

66 lines
1.5 KiB
Python

from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi import HTTPException, status, Depends
from pydantic import BaseModel
from typing import Union, Optional
from passlib.context import CryptContext
from datetime import datetime, timedelta
import jwt
import logging
import os
import requests
import uuid
SESSION_SECRET = os.getenv("SESSION_SECRET", " ")
ALGORITHM = "HS256"
##############
# Auth Utils
##############
bearer_security = HTTPBearer()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
return (
pwd_context.verify(plain_password, hashed_password) if hashed_password else None
)
def get_password_hash(password):
return pwd_context.hash(password)
def create_token(data: dict, expires_delta: Union[timedelta, None] = None) -> str:
payload = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
payload.update({"exp": expire})
encoded_jwt = jwt.encode(payload, SESSION_SECRET, algorithm=ALGORITHM)
return encoded_jwt
def decode_token(token: str) -> Optional[dict]:
try:
decoded = jwt.decode(token, SESSION_SECRET, algorithms=[ALGORITHM])
return decoded
except Exception as e:
return None
def extract_token_from_auth_header(auth_header: str):
return auth_header[len("Bearer ") :]
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(bearer_security),
) -> Optional[dict]:
token = credentials.credentials
return token