mirror of
https://github.com/open-webui/open-webui
synced 2024-11-17 14:02:51 +00:00
5d934d7d15
LDAP will be used as default if no other auth form is enabled. LDAP now will work with ENABLE_LOGIN_FORM = false. Fixed exception "User does not match the record." Now LDAP login is case insensitive. Integrated with onboarding feature.
699 lines
24 KiB
Python
699 lines
24 KiB
Python
import re
|
|
import uuid
|
|
import time
|
|
import datetime
|
|
import logging
|
|
|
|
from open_webui.apps.webui.models.auths import (
|
|
AddUserForm,
|
|
ApiKey,
|
|
Auths,
|
|
Token,
|
|
LdapForm,
|
|
SigninForm,
|
|
SigninResponse,
|
|
SignupForm,
|
|
UpdatePasswordForm,
|
|
UpdateProfileForm,
|
|
UserResponse,
|
|
)
|
|
from open_webui.apps.webui.models.users import Users
|
|
from open_webui.config import WEBUI_AUTH
|
|
from open_webui.constants import ERROR_MESSAGES, WEBHOOK_MESSAGES
|
|
from open_webui.env import (
|
|
WEBUI_AUTH_TRUSTED_EMAIL_HEADER,
|
|
WEBUI_AUTH_TRUSTED_NAME_HEADER,
|
|
WEBUI_SESSION_COOKIE_SAME_SITE,
|
|
WEBUI_SESSION_COOKIE_SECURE,
|
|
SRC_LOG_LEVELS,
|
|
)
|
|
from fastapi import APIRouter, Depends, HTTPException, Request, status
|
|
from fastapi.responses import Response
|
|
from pydantic import BaseModel
|
|
from open_webui.utils.misc import parse_duration, validate_email_format
|
|
from open_webui.utils.utils import (
|
|
create_api_key,
|
|
create_token,
|
|
get_admin_user,
|
|
get_verified_user,
|
|
get_current_user,
|
|
get_password_hash,
|
|
)
|
|
from open_webui.utils.webhook import post_webhook
|
|
from typing import Optional, List
|
|
|
|
from ldap3 import Server, Connection, ALL, Tls
|
|
from ssl import CERT_REQUIRED, PROTOCOL_TLS
|
|
|
|
router = APIRouter()
|
|
|
|
log = logging.getLogger(__name__)
|
|
log.setLevel(SRC_LOG_LEVELS["MAIN"])
|
|
|
|
############################
|
|
# GetSessionUser
|
|
############################
|
|
|
|
|
|
class SessionUserResponse(Token, UserResponse):
|
|
expires_at: Optional[int] = None
|
|
|
|
|
|
@router.get("/", response_model=SessionUserResponse)
|
|
async def get_session_user(
|
|
request: Request, response: Response, user=Depends(get_current_user)
|
|
):
|
|
expires_delta = parse_duration(request.app.state.config.JWT_EXPIRES_IN)
|
|
expires_at = None
|
|
if expires_delta:
|
|
expires_at = int(time.time()) + int(expires_delta.total_seconds())
|
|
|
|
token = create_token(
|
|
data={"id": user.id},
|
|
expires_delta=expires_delta,
|
|
)
|
|
|
|
datetime_expires_at = (
|
|
datetime.datetime.fromtimestamp(expires_at, datetime.timezone.utc)
|
|
if expires_at
|
|
else None
|
|
)
|
|
|
|
# Set the cookie token
|
|
response.set_cookie(
|
|
key="token",
|
|
value=token,
|
|
expires=datetime_expires_at,
|
|
httponly=True, # Ensures the cookie is not accessible via JavaScript
|
|
samesite=WEBUI_SESSION_COOKIE_SAME_SITE,
|
|
secure=WEBUI_SESSION_COOKIE_SECURE,
|
|
)
|
|
|
|
return {
|
|
"token": token,
|
|
"token_type": "Bearer",
|
|
"expires_at": expires_at,
|
|
"id": user.id,
|
|
"email": user.email,
|
|
"name": user.name,
|
|
"role": user.role,
|
|
"profile_image_url": user.profile_image_url,
|
|
}
|
|
|
|
|
|
############################
|
|
# Update Profile
|
|
############################
|
|
|
|
|
|
@router.post("/update/profile", response_model=UserResponse)
|
|
async def update_profile(
|
|
form_data: UpdateProfileForm, session_user=Depends(get_verified_user)
|
|
):
|
|
if session_user:
|
|
user = Users.update_user_by_id(
|
|
session_user.id,
|
|
{"profile_image_url": form_data.profile_image_url, "name": form_data.name},
|
|
)
|
|
if user:
|
|
return user
|
|
else:
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.DEFAULT())
|
|
else:
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
|
|
|
|
|
|
############################
|
|
# Update Password
|
|
############################
|
|
|
|
|
|
@router.post("/update/password", response_model=bool)
|
|
async def update_password(
|
|
form_data: UpdatePasswordForm, session_user=Depends(get_current_user)
|
|
):
|
|
if WEBUI_AUTH_TRUSTED_EMAIL_HEADER:
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.ACTION_PROHIBITED)
|
|
if session_user:
|
|
user = Auths.authenticate_user(session_user.email, form_data.password)
|
|
|
|
if user:
|
|
hashed = get_password_hash(form_data.new_password)
|
|
return Auths.update_user_password_by_id(user.id, hashed)
|
|
else:
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_PASSWORD)
|
|
else:
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
|
|
|
|
|
|
############################
|
|
# LDAP Authentication
|
|
############################
|
|
@router.post("/ldap", response_model=SigninResponse)
|
|
async def ldap_auth(request: Request, response: Response, form_data: LdapForm):
|
|
ENABLE_LDAP = request.app.state.config.ENABLE_LDAP
|
|
LDAP_SERVER_LABEL = request.app.state.config.LDAP_SERVER_LABEL
|
|
LDAP_SERVER_HOST = request.app.state.config.LDAP_SERVER_HOST
|
|
LDAP_SERVER_PORT = request.app.state.config.LDAP_SERVER_PORT
|
|
LDAP_ATTRIBUTE_FOR_USERNAME = request.app.state.config.LDAP_ATTRIBUTE_FOR_USERNAME
|
|
LDAP_SEARCH_BASE = request.app.state.config.LDAP_SEARCH_BASE
|
|
LDAP_SEARCH_FILTERS = request.app.state.config.LDAP_SEARCH_FILTERS
|
|
LDAP_APP_DN = request.app.state.config.LDAP_APP_DN
|
|
LDAP_APP_PASSWORD = request.app.state.config.LDAP_APP_PASSWORD
|
|
LDAP_USE_TLS = request.app.state.config.LDAP_USE_TLS
|
|
LDAP_CA_CERT_FILE = request.app.state.config.LDAP_CA_CERT_FILE
|
|
LDAP_CIPHERS = request.app.state.config.LDAP_CIPHERS if request.app.state.config.LDAP_CIPHERS else 'ALL'
|
|
|
|
if not ENABLE_LDAP:
|
|
raise HTTPException(400, detail="LDAP authentication is not enabled")
|
|
|
|
try:
|
|
tls = Tls(validate=CERT_REQUIRED, version=PROTOCOL_TLS, ca_certs_file=LDAP_CA_CERT_FILE, ciphers=LDAP_CIPHERS)
|
|
except Exception as e:
|
|
log.error(f"An error occurred on TLS: {str(e)}")
|
|
raise HTTPException(400, detail=str(e))
|
|
|
|
try:
|
|
server = Server(host=LDAP_SERVER_HOST, port=LDAP_SERVER_PORT, get_info=ALL, use_ssl=LDAP_USE_TLS, tls=tls)
|
|
connection_app = Connection(server, LDAP_APP_DN, LDAP_APP_PASSWORD, auto_bind='NONE', authentication='SIMPLE')
|
|
if not connection_app.bind():
|
|
raise HTTPException(400, detail="Application account bind failed")
|
|
|
|
search_success = connection_app.search(
|
|
search_base=LDAP_SEARCH_BASE,
|
|
search_filter=f'(&({LDAP_ATTRIBUTE_FOR_USERNAME}={form_data.user.lower()}){LDAP_SEARCH_FILTERS})',
|
|
attributes=[f'{LDAP_ATTRIBUTE_FOR_USERNAME}', 'mail', 'cn']
|
|
)
|
|
|
|
if not search_success:
|
|
raise HTTPException(400, detail="User not found in the LDAP server")
|
|
|
|
entry = connection_app.entries[0]
|
|
username = str(entry[f'{LDAP_ATTRIBUTE_FOR_USERNAME}']).lower()
|
|
mail = str(entry['mail'])
|
|
cn = str(entry['cn'])
|
|
user_dn = entry.entry_dn
|
|
|
|
if username == form_data.user.lower():
|
|
connection_user = Connection(server, user_dn, form_data.password, auto_bind='NONE', authentication='SIMPLE')
|
|
if not connection_user.bind():
|
|
raise HTTPException(400, f"Authentication failed for {form_data.user}")
|
|
|
|
user = Users.get_user_by_email(mail)
|
|
if not user:
|
|
|
|
try:
|
|
hashed = get_password_hash(form_data.password)
|
|
user = Auths.insert_new_auth(
|
|
mail,
|
|
hashed,
|
|
cn
|
|
)
|
|
|
|
if not user:
|
|
raise HTTPException(500, detail=ERROR_MESSAGES.CREATE_USER_ERROR)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as err:
|
|
raise HTTPException(500, detail=ERROR_MESSAGES.DEFAULT(err))
|
|
|
|
user = Auths.authenticate_user(mail, password=str(form_data.password))
|
|
|
|
if user:
|
|
token = create_token(
|
|
data={"id": user.id},
|
|
expires_delta=parse_duration(request.app.state.config.JWT_EXPIRES_IN),
|
|
)
|
|
|
|
# Set the cookie token
|
|
response.set_cookie(
|
|
key="token",
|
|
value=token,
|
|
httponly=True, # Ensures the cookie is not accessible via JavaScript
|
|
)
|
|
|
|
return {
|
|
"token": token,
|
|
"token_type": "Bearer",
|
|
"id": user.id,
|
|
"email": user.email,
|
|
"name": user.name,
|
|
"role": user.role,
|
|
"profile_image_url": user.profile_image_url,
|
|
}
|
|
else:
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
|
|
else:
|
|
raise HTTPException(400, f"User {form_data.user} does not match the record. Search result: {str(entry[f'{LDAP_ATTRIBUTE_FOR_USERNAME}'])}")
|
|
except Exception as e:
|
|
raise HTTPException(400, detail=str(e))
|
|
|
|
|
|
############################
|
|
# SignIn
|
|
############################
|
|
|
|
|
|
@router.post("/signin", response_model=SessionUserResponse)
|
|
async def signin(request: Request, response: Response, form_data: SigninForm):
|
|
if WEBUI_AUTH_TRUSTED_EMAIL_HEADER:
|
|
if WEBUI_AUTH_TRUSTED_EMAIL_HEADER not in request.headers:
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_TRUSTED_HEADER)
|
|
|
|
trusted_email = request.headers[WEBUI_AUTH_TRUSTED_EMAIL_HEADER].lower()
|
|
trusted_name = trusted_email
|
|
if WEBUI_AUTH_TRUSTED_NAME_HEADER:
|
|
trusted_name = request.headers.get(
|
|
WEBUI_AUTH_TRUSTED_NAME_HEADER, trusted_email
|
|
)
|
|
if not Users.get_user_by_email(trusted_email.lower()):
|
|
await signup(
|
|
request,
|
|
response,
|
|
SignupForm(
|
|
email=trusted_email, password=str(uuid.uuid4()), name=trusted_name
|
|
),
|
|
)
|
|
user = Auths.authenticate_user_by_trusted_header(trusted_email)
|
|
elif WEBUI_AUTH == False:
|
|
admin_email = "admin@localhost"
|
|
admin_password = "admin"
|
|
|
|
if Users.get_user_by_email(admin_email.lower()):
|
|
user = Auths.authenticate_user(admin_email.lower(), admin_password)
|
|
else:
|
|
if Users.get_num_users() != 0:
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.EXISTING_USERS)
|
|
|
|
await signup(
|
|
request,
|
|
response,
|
|
SignupForm(email=admin_email, password=admin_password, name="User"),
|
|
)
|
|
|
|
user = Auths.authenticate_user(admin_email.lower(), admin_password)
|
|
else:
|
|
user = Auths.authenticate_user(form_data.email.lower(), form_data.password)
|
|
|
|
if user:
|
|
|
|
expires_delta = parse_duration(request.app.state.config.JWT_EXPIRES_IN)
|
|
expires_at = None
|
|
if expires_delta:
|
|
expires_at = int(time.time()) + int(expires_delta.total_seconds())
|
|
|
|
token = create_token(
|
|
data={"id": user.id},
|
|
expires_delta=expires_delta,
|
|
)
|
|
|
|
datetime_expires_at = (
|
|
datetime.datetime.fromtimestamp(expires_at, datetime.timezone.utc)
|
|
if expires_at
|
|
else None
|
|
)
|
|
|
|
# Set the cookie token
|
|
response.set_cookie(
|
|
key="token",
|
|
value=token,
|
|
expires=datetime_expires_at,
|
|
httponly=True, # Ensures the cookie is not accessible via JavaScript
|
|
samesite=WEBUI_SESSION_COOKIE_SAME_SITE,
|
|
secure=WEBUI_SESSION_COOKIE_SECURE,
|
|
)
|
|
|
|
return {
|
|
"token": token,
|
|
"token_type": "Bearer",
|
|
"expires_at": expires_at,
|
|
"id": user.id,
|
|
"email": user.email,
|
|
"name": user.name,
|
|
"role": user.role,
|
|
"profile_image_url": user.profile_image_url,
|
|
}
|
|
else:
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
|
|
|
|
|
|
############################
|
|
# SignUp
|
|
############################
|
|
|
|
|
|
@router.post("/signup", response_model=SessionUserResponse)
|
|
async def signup(request: Request, response: Response, form_data: SignupForm):
|
|
if WEBUI_AUTH:
|
|
if (
|
|
not request.app.state.config.ENABLE_SIGNUP
|
|
or not request.app.state.config.ENABLE_LOGIN_FORM
|
|
):
|
|
raise HTTPException(
|
|
status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.ACCESS_PROHIBITED
|
|
)
|
|
else:
|
|
if Users.get_num_users() != 0:
|
|
raise HTTPException(
|
|
status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.ACCESS_PROHIBITED
|
|
)
|
|
|
|
if not validate_email_format(form_data.email.lower()):
|
|
raise HTTPException(
|
|
status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.INVALID_EMAIL_FORMAT
|
|
)
|
|
|
|
if Users.get_user_by_email(form_data.email.lower()):
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.EMAIL_TAKEN)
|
|
|
|
try:
|
|
role = (
|
|
"admin"
|
|
if Users.get_num_users() == 0
|
|
else request.app.state.config.DEFAULT_USER_ROLE
|
|
)
|
|
|
|
if Users.get_num_users() == 0:
|
|
# Disable signup after the first user is created
|
|
request.app.state.config.ENABLE_SIGNUP = False
|
|
|
|
hashed = get_password_hash(form_data.password)
|
|
user = Auths.insert_new_auth(
|
|
form_data.email.lower(),
|
|
hashed,
|
|
form_data.name,
|
|
form_data.profile_image_url,
|
|
role,
|
|
)
|
|
|
|
if user:
|
|
expires_delta = parse_duration(request.app.state.config.JWT_EXPIRES_IN)
|
|
expires_at = None
|
|
if expires_delta:
|
|
expires_at = int(time.time()) + int(expires_delta.total_seconds())
|
|
|
|
token = create_token(
|
|
data={"id": user.id},
|
|
expires_delta=expires_delta,
|
|
)
|
|
|
|
datetime_expires_at = (
|
|
datetime.datetime.fromtimestamp(expires_at, datetime.timezone.utc)
|
|
if expires_at
|
|
else None
|
|
)
|
|
|
|
# Set the cookie token
|
|
response.set_cookie(
|
|
key="token",
|
|
value=token,
|
|
expires=datetime_expires_at,
|
|
httponly=True, # Ensures the cookie is not accessible via JavaScript
|
|
samesite=WEBUI_SESSION_COOKIE_SAME_SITE,
|
|
secure=WEBUI_SESSION_COOKIE_SECURE,
|
|
)
|
|
|
|
if request.app.state.config.WEBHOOK_URL:
|
|
post_webhook(
|
|
request.app.state.config.WEBHOOK_URL,
|
|
WEBHOOK_MESSAGES.USER_SIGNUP(user.name),
|
|
{
|
|
"action": "signup",
|
|
"message": WEBHOOK_MESSAGES.USER_SIGNUP(user.name),
|
|
"user": user.model_dump_json(exclude_none=True),
|
|
},
|
|
)
|
|
|
|
return {
|
|
"token": token,
|
|
"token_type": "Bearer",
|
|
"expires_at": expires_at,
|
|
"id": user.id,
|
|
"email": user.email,
|
|
"name": user.name,
|
|
"role": user.role,
|
|
"profile_image_url": user.profile_image_url,
|
|
}
|
|
else:
|
|
raise HTTPException(500, detail=ERROR_MESSAGES.CREATE_USER_ERROR)
|
|
except Exception as err:
|
|
raise HTTPException(500, detail=ERROR_MESSAGES.DEFAULT(err))
|
|
|
|
|
|
@router.get("/signout")
|
|
async def signout(response: Response):
|
|
response.delete_cookie("token")
|
|
return {"status": True}
|
|
|
|
|
|
############################
|
|
# AddUser
|
|
############################
|
|
|
|
|
|
@router.post("/add", response_model=SigninResponse)
|
|
async def add_user(form_data: AddUserForm, user=Depends(get_admin_user)):
|
|
if not validate_email_format(form_data.email.lower()):
|
|
raise HTTPException(
|
|
status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.INVALID_EMAIL_FORMAT
|
|
)
|
|
|
|
if Users.get_user_by_email(form_data.email.lower()):
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.EMAIL_TAKEN)
|
|
|
|
try:
|
|
print(form_data)
|
|
hashed = get_password_hash(form_data.password)
|
|
user = Auths.insert_new_auth(
|
|
form_data.email.lower(),
|
|
hashed,
|
|
form_data.name,
|
|
form_data.profile_image_url,
|
|
form_data.role,
|
|
)
|
|
|
|
if user:
|
|
token = create_token(data={"id": user.id})
|
|
return {
|
|
"token": token,
|
|
"token_type": "Bearer",
|
|
"id": user.id,
|
|
"email": user.email,
|
|
"name": user.name,
|
|
"role": user.role,
|
|
"profile_image_url": user.profile_image_url,
|
|
}
|
|
else:
|
|
raise HTTPException(500, detail=ERROR_MESSAGES.CREATE_USER_ERROR)
|
|
except Exception as err:
|
|
raise HTTPException(500, detail=ERROR_MESSAGES.DEFAULT(err))
|
|
|
|
|
|
############################
|
|
# GetAdminDetails
|
|
############################
|
|
|
|
|
|
@router.get("/admin/details")
|
|
async def get_admin_details(request: Request, user=Depends(get_current_user)):
|
|
if request.app.state.config.SHOW_ADMIN_DETAILS:
|
|
admin_email = request.app.state.config.ADMIN_EMAIL
|
|
admin_name = None
|
|
|
|
print(admin_email, admin_name)
|
|
|
|
if admin_email:
|
|
admin = Users.get_user_by_email(admin_email)
|
|
if admin:
|
|
admin_name = admin.name
|
|
else:
|
|
admin = Users.get_first_user()
|
|
if admin:
|
|
admin_email = admin.email
|
|
admin_name = admin.name
|
|
|
|
return {
|
|
"name": admin_name,
|
|
"email": admin_email,
|
|
}
|
|
else:
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.ACTION_PROHIBITED)
|
|
|
|
|
|
############################
|
|
# ToggleSignUp
|
|
############################
|
|
|
|
|
|
@router.get("/admin/config")
|
|
async def get_admin_config(request: Request, user=Depends(get_admin_user)):
|
|
return {
|
|
"SHOW_ADMIN_DETAILS": request.app.state.config.SHOW_ADMIN_DETAILS,
|
|
"ENABLE_SIGNUP": request.app.state.config.ENABLE_SIGNUP,
|
|
"DEFAULT_USER_ROLE": request.app.state.config.DEFAULT_USER_ROLE,
|
|
"JWT_EXPIRES_IN": request.app.state.config.JWT_EXPIRES_IN,
|
|
"ENABLE_COMMUNITY_SHARING": request.app.state.config.ENABLE_COMMUNITY_SHARING,
|
|
"ENABLE_MESSAGE_RATING": request.app.state.config.ENABLE_MESSAGE_RATING,
|
|
}
|
|
|
|
|
|
class AdminConfig(BaseModel):
|
|
SHOW_ADMIN_DETAILS: bool
|
|
ENABLE_SIGNUP: bool
|
|
DEFAULT_USER_ROLE: str
|
|
JWT_EXPIRES_IN: str
|
|
ENABLE_COMMUNITY_SHARING: bool
|
|
ENABLE_MESSAGE_RATING: bool
|
|
|
|
|
|
@router.post("/admin/config")
|
|
async def update_admin_config(
|
|
request: Request, form_data: AdminConfig, user=Depends(get_admin_user)
|
|
):
|
|
request.app.state.config.SHOW_ADMIN_DETAILS = form_data.SHOW_ADMIN_DETAILS
|
|
request.app.state.config.ENABLE_SIGNUP = form_data.ENABLE_SIGNUP
|
|
|
|
if form_data.DEFAULT_USER_ROLE in ["pending", "user", "admin"]:
|
|
request.app.state.config.DEFAULT_USER_ROLE = form_data.DEFAULT_USER_ROLE
|
|
|
|
pattern = r"^(-1|0|(-?\d+(\.\d+)?)(ms|s|m|h|d|w))$"
|
|
|
|
# Check if the input string matches the pattern
|
|
if re.match(pattern, form_data.JWT_EXPIRES_IN):
|
|
request.app.state.config.JWT_EXPIRES_IN = form_data.JWT_EXPIRES_IN
|
|
|
|
request.app.state.config.ENABLE_COMMUNITY_SHARING = (
|
|
form_data.ENABLE_COMMUNITY_SHARING
|
|
)
|
|
request.app.state.config.ENABLE_MESSAGE_RATING = form_data.ENABLE_MESSAGE_RATING
|
|
|
|
return {
|
|
"SHOW_ADMIN_DETAILS": request.app.state.config.SHOW_ADMIN_DETAILS,
|
|
"ENABLE_SIGNUP": request.app.state.config.ENABLE_SIGNUP,
|
|
"DEFAULT_USER_ROLE": request.app.state.config.DEFAULT_USER_ROLE,
|
|
"JWT_EXPIRES_IN": request.app.state.config.JWT_EXPIRES_IN,
|
|
"ENABLE_COMMUNITY_SHARING": request.app.state.config.ENABLE_COMMUNITY_SHARING,
|
|
"ENABLE_MESSAGE_RATING": request.app.state.config.ENABLE_MESSAGE_RATING,
|
|
}
|
|
|
|
|
|
class LdapServerConfig(BaseModel):
|
|
label: str
|
|
host: str
|
|
port: Optional[int] = None
|
|
attribute_for_username: str = 'uid'
|
|
app_dn: str
|
|
app_dn_password: str
|
|
search_base: str
|
|
search_filters: str = ''
|
|
use_tls: bool = True
|
|
certificate_path: Optional[str] = None
|
|
ciphers: Optional[str] = 'ALL'
|
|
|
|
@router.get("/admin/config/ldap/server", response_model=LdapServerConfig)
|
|
async def get_ldap_server(
|
|
request: Request, user=Depends(get_admin_user)
|
|
):
|
|
return {
|
|
"label": request.app.state.config.LDAP_SERVER_LABEL,
|
|
"host": request.app.state.config.LDAP_SERVER_HOST,
|
|
"port": request.app.state.config.LDAP_SERVER_PORT,
|
|
"attribute_for_username": request.app.state.config.LDAP_ATTRIBUTE_FOR_USERNAME,
|
|
"app_dn": request.app.state.config.LDAP_APP_DN,
|
|
"app_dn_password": request.app.state.config.LDAP_APP_PASSWORD,
|
|
"search_base": request.app.state.config.LDAP_SEARCH_BASE,
|
|
"search_filters": request.app.state.config.LDAP_SEARCH_FILTERS,
|
|
"use_tls": request.app.state.config.LDAP_USE_TLS,
|
|
"certificate_path": request.app.state.config.LDAP_CA_CERT_FILE,
|
|
"ciphers": request.app.state.config.LDAP_CIPHERS
|
|
}
|
|
|
|
@router.post("/admin/config/ldap/server")
|
|
async def update_ldap_server(
|
|
request: Request, form_data: LdapServerConfig, user=Depends(get_admin_user)
|
|
):
|
|
required_fields = ['label', 'host', 'attribute_for_username', 'app_dn', 'app_dn_password', 'search_base']
|
|
for key in required_fields:
|
|
value = getattr(form_data, key)
|
|
if not value:
|
|
raise HTTPException(400, detail=f"Required field {key} is empty")
|
|
|
|
if form_data.use_tls and not form_data.certificate_path:
|
|
raise HTTPException(400, detail="TLS is enabled but certificate file path is missing")
|
|
|
|
request.app.state.config.LDAP_SERVER_LABEL = form_data.label
|
|
request.app.state.config.LDAP_SERVER_HOST = form_data.host
|
|
request.app.state.config.LDAP_SERVER_PORT = form_data.port
|
|
request.app.state.config.LDAP_ATTRIBUTE_FOR_USERNAME = form_data.attribute_for_username
|
|
request.app.state.config.LDAP_APP_DN = form_data.app_dn
|
|
request.app.state.config.LDAP_APP_PASSWORD = form_data.app_dn_password
|
|
request.app.state.config.LDAP_SEARCH_BASE = form_data.search_base
|
|
request.app.state.config.LDAP_SEARCH_FILTERS = form_data.search_filters
|
|
request.app.state.config.LDAP_USE_TLS = form_data.use_tls
|
|
request.app.state.config.LDAP_CA_CERT_FILE = form_data.certificate_path
|
|
request.app.state.config.LDAP_CIPHERS = form_data.ciphers
|
|
|
|
return {
|
|
"label": request.app.state.config.LDAP_SERVER_LABEL,
|
|
"host": request.app.state.config.LDAP_SERVER_HOST,
|
|
"port": request.app.state.config.LDAP_SERVER_PORT,
|
|
"attribute_for_username": request.app.state.config.LDAP_ATTRIBUTE_FOR_USERNAME,
|
|
"app_dn": request.app.state.config.LDAP_APP_DN,
|
|
"app_dn_password": request.app.state.config.LDAP_APP_PASSWORD,
|
|
"search_base": request.app.state.config.LDAP_SEARCH_BASE,
|
|
"search_filters": request.app.state.config.LDAP_SEARCH_FILTERS,
|
|
"use_tls": request.app.state.config.LDAP_USE_TLS,
|
|
"certificate_path": request.app.state.config.LDAP_CA_CERT_FILE,
|
|
"ciphers": request.app.state.config.LDAP_CIPHERS
|
|
}
|
|
|
|
@router.get("/admin/config/ldap")
|
|
async def get_ldap_config(request: Request, user=Depends(get_admin_user)):
|
|
return {"ENABLE_LDAP": request.app.state.config.ENABLE_LDAP}
|
|
|
|
class LdapConfigForm(BaseModel):
|
|
enable_ldap: Optional[bool] = None
|
|
|
|
@router.post("/admin/config/ldap")
|
|
async def update_ldap_config(request: Request, form_data: LdapConfigForm, user=Depends(get_admin_user)):
|
|
request.app.state.config.ENABLE_LDAP = form_data.enable_ldap
|
|
return {"ENABLE_LDAP": request.app.state.config.ENABLE_LDAP}
|
|
|
|
|
|
############################
|
|
# API Key
|
|
############################
|
|
|
|
|
|
# create api key
|
|
@router.post("/api_key", response_model=ApiKey)
|
|
async def create_api_key_(user=Depends(get_current_user)):
|
|
api_key = create_api_key()
|
|
success = Users.update_user_api_key_by_id(user.id, api_key)
|
|
if success:
|
|
return {
|
|
"api_key": api_key,
|
|
}
|
|
else:
|
|
raise HTTPException(500, detail=ERROR_MESSAGES.CREATE_API_KEY_ERROR)
|
|
|
|
|
|
# delete api key
|
|
@router.delete("/api_key", response_model=bool)
|
|
async def delete_api_key(user=Depends(get_current_user)):
|
|
success = Users.update_user_api_key_by_id(user.id, None)
|
|
return success
|
|
|
|
|
|
# get api key
|
|
@router.get("/api_key", response_model=ApiKey)
|
|
async def get_api_key(user=Depends(get_current_user)):
|
|
api_key = Users.get_user_api_key_by_id(user.id)
|
|
if api_key:
|
|
return {
|
|
"api_key": api_key,
|
|
}
|
|
else:
|
|
raise HTTPException(404, detail=ERROR_MESSAGES.API_KEY_NOT_FOUND)
|