mirror of
https://github.com/open-webui/pipelines
synced 2025-06-26 18:15:58 +00:00
refac
This commit is contained in:
65
utils/auth.py
Normal file
65
utils/auth.py
Normal file
@@ -0,0 +1,65 @@
|
||||
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||||
from fastapi import HTTPException, status, Depends
|
||||
|
||||
from pydantic import BaseModel
|
||||
from typing import Union, Optional
|
||||
|
||||
|
||||
from passlib.context import CryptContext
|
||||
from datetime import datetime, timedelta
|
||||
import jwt
|
||||
import logging
|
||||
import os
|
||||
|
||||
import requests
|
||||
import uuid
|
||||
|
||||
SESSION_SECRET = os.getenv("SESSION_SECRET", " ")
|
||||
ALGORITHM = "HS256"
|
||||
|
||||
##############
|
||||
# Auth Utils
|
||||
##############
|
||||
|
||||
bearer_security = HTTPBearer()
|
||||
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
||||
|
||||
|
||||
def verify_password(plain_password, hashed_password):
|
||||
return (
|
||||
pwd_context.verify(plain_password, hashed_password) if hashed_password else None
|
||||
)
|
||||
|
||||
|
||||
def get_password_hash(password):
|
||||
return pwd_context.hash(password)
|
||||
|
||||
|
||||
def create_token(data: dict, expires_delta: Union[timedelta, None] = None) -> str:
|
||||
payload = data.copy()
|
||||
|
||||
if expires_delta:
|
||||
expire = datetime.utcnow() + expires_delta
|
||||
payload.update({"exp": expire})
|
||||
|
||||
encoded_jwt = jwt.encode(payload, SESSION_SECRET, algorithm=ALGORITHM)
|
||||
return encoded_jwt
|
||||
|
||||
|
||||
def decode_token(token: str) -> Optional[dict]:
|
||||
try:
|
||||
decoded = jwt.decode(token, SESSION_SECRET, algorithms=[ALGORITHM])
|
||||
return decoded
|
||||
except Exception as e:
|
||||
return None
|
||||
|
||||
|
||||
def extract_token_from_auth_header(auth_header: str):
|
||||
return auth_header[len("Bearer ") :]
|
||||
|
||||
|
||||
def get_current_user(
|
||||
credentials: HTTPAuthorizationCredentials = Depends(bearer_security),
|
||||
) -> Optional[dict]:
|
||||
token = credentials.credentials
|
||||
return token
|
||||
@@ -1,58 +1,29 @@
|
||||
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||||
from fastapi import HTTPException, status, Depends
|
||||
|
||||
from pydantic import BaseModel
|
||||
from typing import Union, Optional
|
||||
|
||||
|
||||
from passlib.context import CryptContext
|
||||
from datetime import datetime, timedelta
|
||||
import jwt
|
||||
import logging
|
||||
import os
|
||||
|
||||
import requests
|
||||
import uuid
|
||||
import time
|
||||
|
||||
SESSION_SECRET = os.getenv("SESSION_SECRET", " ")
|
||||
ALGORITHM = "HS256"
|
||||
|
||||
##############
|
||||
# Auth Utils
|
||||
##############
|
||||
|
||||
bearer_security = HTTPBearer()
|
||||
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
||||
from typing import List
|
||||
from schemas import OpenAIChatMessage
|
||||
|
||||
|
||||
def verify_password(plain_password, hashed_password):
|
||||
return (
|
||||
pwd_context.verify(plain_password, hashed_password) if hashed_password else None
|
||||
)
|
||||
def stream_message_template(model: str, message: str):
|
||||
return {
|
||||
"id": f"{model}-{str(uuid.uuid4())}",
|
||||
"object": "chat.completion.chunk",
|
||||
"created": int(time.time()),
|
||||
"model": model,
|
||||
"choices": [
|
||||
{
|
||||
"index": 0,
|
||||
"delta": {"content": message},
|
||||
"logprobs": None,
|
||||
"finish_reason": None,
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
def get_password_hash(password):
|
||||
return pwd_context.hash(password)
|
||||
|
||||
|
||||
def create_token(data: dict, expires_delta: Union[timedelta, None] = None) -> str:
|
||||
payload = data.copy()
|
||||
|
||||
if expires_delta:
|
||||
expire = datetime.utcnow() + expires_delta
|
||||
payload.update({"exp": expire})
|
||||
|
||||
encoded_jwt = jwt.encode(payload, SESSION_SECRET, algorithm=ALGORITHM)
|
||||
return encoded_jwt
|
||||
|
||||
|
||||
def decode_token(token: str) -> Optional[dict]:
|
||||
try:
|
||||
decoded = jwt.decode(token, SESSION_SECRET, algorithms=[ALGORITHM])
|
||||
return decoded
|
||||
except Exception as e:
|
||||
return None
|
||||
|
||||
|
||||
def extract_token_from_auth_header(auth_header: str):
|
||||
return auth_header[len("Bearer ") :]
|
||||
def get_last_user_message(messages: List[dict]) -> str:
|
||||
for message in reversed(messages):
|
||||
if message.role == "user":
|
||||
return message.content
|
||||
return None
|
||||
|
||||
Reference in New Issue
Block a user