mirror of
https://github.com/open-webui/open-webui
synced 2024-12-29 07:12:07 +00:00
348 lines
15 KiB
Python
348 lines
15 KiB
Python
import base64
|
|
import logging
|
|
import mimetypes
|
|
import uuid
|
|
|
|
import aiohttp
|
|
from authlib.integrations.starlette_client import OAuth
|
|
from authlib.oidc.core import UserInfo
|
|
from fastapi import (
|
|
HTTPException,
|
|
status,
|
|
)
|
|
from starlette.responses import RedirectResponse
|
|
|
|
from open_webui.models.auths import Auths
|
|
from open_webui.models.users import Users
|
|
from open_webui.models.groups import Groups, GroupModel, GroupUpdateForm
|
|
from open_webui.config import (
|
|
DEFAULT_USER_ROLE,
|
|
ENABLE_OAUTH_SIGNUP,
|
|
OAUTH_MERGE_ACCOUNTS_BY_EMAIL,
|
|
OAUTH_PROVIDERS,
|
|
ENABLE_OAUTH_ROLE_MANAGEMENT,
|
|
ENABLE_OAUTH_GROUP_MANAGEMENT,
|
|
OAUTH_ROLES_CLAIM,
|
|
OAUTH_GROUPS_CLAIM,
|
|
OAUTH_EMAIL_CLAIM,
|
|
OAUTH_PICTURE_CLAIM,
|
|
OAUTH_USERNAME_CLAIM,
|
|
OAUTH_ALLOWED_ROLES,
|
|
OAUTH_ADMIN_ROLES,
|
|
OAUTH_ALLOWED_DOMAINS,
|
|
WEBHOOK_URL,
|
|
JWT_EXPIRES_IN,
|
|
AppConfig,
|
|
)
|
|
from open_webui.constants import ERROR_MESSAGES
|
|
from open_webui.env import WEBUI_SESSION_COOKIE_SAME_SITE, WEBUI_SESSION_COOKIE_SECURE
|
|
from open_webui.utils.misc import parse_duration
|
|
from open_webui.utils.auth import get_password_hash, create_token
|
|
from open_webui.utils.webhook import post_webhook
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
auth_manager_config = AppConfig()
|
|
auth_manager_config.DEFAULT_USER_ROLE = DEFAULT_USER_ROLE
|
|
auth_manager_config.ENABLE_OAUTH_SIGNUP = ENABLE_OAUTH_SIGNUP
|
|
auth_manager_config.OAUTH_MERGE_ACCOUNTS_BY_EMAIL = OAUTH_MERGE_ACCOUNTS_BY_EMAIL
|
|
auth_manager_config.ENABLE_OAUTH_ROLE_MANAGEMENT = ENABLE_OAUTH_ROLE_MANAGEMENT
|
|
auth_manager_config.ENABLE_OAUTH_GROUP_MANAGEMENT = ENABLE_OAUTH_GROUP_MANAGEMENT
|
|
auth_manager_config.OAUTH_ROLES_CLAIM = OAUTH_ROLES_CLAIM
|
|
auth_manager_config.OAUTH_GROUPS_CLAIM = OAUTH_GROUPS_CLAIM
|
|
auth_manager_config.OAUTH_EMAIL_CLAIM = OAUTH_EMAIL_CLAIM
|
|
auth_manager_config.OAUTH_PICTURE_CLAIM = OAUTH_PICTURE_CLAIM
|
|
auth_manager_config.OAUTH_USERNAME_CLAIM = OAUTH_USERNAME_CLAIM
|
|
auth_manager_config.OAUTH_ALLOWED_ROLES = OAUTH_ALLOWED_ROLES
|
|
auth_manager_config.OAUTH_ADMIN_ROLES = OAUTH_ADMIN_ROLES
|
|
auth_manager_config.OAUTH_ALLOWED_DOMAINS = OAUTH_ALLOWED_DOMAINS
|
|
auth_manager_config.WEBHOOK_URL = WEBHOOK_URL
|
|
auth_manager_config.JWT_EXPIRES_IN = JWT_EXPIRES_IN
|
|
|
|
|
|
class OAuthManager:
|
|
def __init__(self):
|
|
self.oauth = OAuth()
|
|
for provider_name, provider_config in OAUTH_PROVIDERS.items():
|
|
self.oauth.register(
|
|
name=provider_name,
|
|
client_id=provider_config["client_id"],
|
|
client_secret=provider_config["client_secret"],
|
|
server_metadata_url=provider_config["server_metadata_url"],
|
|
client_kwargs={
|
|
"scope": provider_config["scope"],
|
|
},
|
|
redirect_uri=provider_config["redirect_uri"],
|
|
)
|
|
|
|
def get_client(self, provider_name):
|
|
return self.oauth.create_client(provider_name)
|
|
|
|
def get_user_role(self, user, user_data):
|
|
if user and Users.get_num_users() == 1:
|
|
# If the user is the only user, assign the role "admin" - actually repairs role for single user on login
|
|
return "admin"
|
|
if not user and Users.get_num_users() == 0:
|
|
# If there are no users, assign the role "admin", as the first user will be an admin
|
|
return "admin"
|
|
|
|
if auth_manager_config.ENABLE_OAUTH_ROLE_MANAGEMENT:
|
|
oauth_claim = auth_manager_config.OAUTH_ROLES_CLAIM
|
|
oauth_allowed_roles = auth_manager_config.OAUTH_ALLOWED_ROLES
|
|
oauth_admin_roles = auth_manager_config.OAUTH_ADMIN_ROLES
|
|
oauth_roles = None
|
|
role = "pending" # Default/fallback role if no matching roles are found
|
|
|
|
# Next block extracts the roles from the user data, accepting nested claims of any depth
|
|
if oauth_claim and oauth_allowed_roles and oauth_admin_roles:
|
|
claim_data = user_data
|
|
nested_claims = oauth_claim.split(".")
|
|
for nested_claim in nested_claims:
|
|
claim_data = claim_data.get(nested_claim, {})
|
|
oauth_roles = claim_data if isinstance(claim_data, list) else None
|
|
|
|
# If any roles are found, check if they match the allowed or admin roles
|
|
if oauth_roles:
|
|
# If role management is enabled, and matching roles are provided, use the roles
|
|
for allowed_role in oauth_allowed_roles:
|
|
# If the user has any of the allowed roles, assign the role "user"
|
|
if allowed_role in oauth_roles:
|
|
role = "user"
|
|
break
|
|
for admin_role in oauth_admin_roles:
|
|
# If the user has any of the admin roles, assign the role "admin"
|
|
if admin_role in oauth_roles:
|
|
role = "admin"
|
|
break
|
|
else:
|
|
if not user:
|
|
# If role management is disabled, use the default role for new users
|
|
role = auth_manager_config.DEFAULT_USER_ROLE
|
|
else:
|
|
# If role management is disabled, use the existing role for existing users
|
|
role = user.role
|
|
|
|
return role
|
|
|
|
def update_user_groups(self, user, user_data, default_permissions):
|
|
oauth_claim = auth_manager_config.OAUTH_GROUPS_CLAIM
|
|
|
|
user_oauth_groups: list[str] = user_data.get(oauth_claim, list())
|
|
user_current_groups: list[GroupModel] = Groups.get_groups_by_member_id(user.id)
|
|
all_available_groups: list[GroupModel] = Groups.get_groups()
|
|
|
|
# Remove groups that user is no longer a part of
|
|
for group_model in user_current_groups:
|
|
if group_model.name not in user_oauth_groups:
|
|
# Remove group from user
|
|
|
|
user_ids = group_model.user_ids
|
|
user_ids = [i for i in user_ids if i != user.id]
|
|
|
|
# In case a group is created, but perms are never assigned to the group by hitting "save"
|
|
group_permissions = group_model.permissions
|
|
if not group_permissions:
|
|
group_permissions = default_permissions
|
|
|
|
update_form = GroupUpdateForm(
|
|
name=group_model.name,
|
|
description=group_model.description,
|
|
permissions=group_permissions,
|
|
user_ids=user_ids,
|
|
)
|
|
Groups.update_group_by_id(
|
|
id=group_model.id, form_data=update_form, overwrite=False
|
|
)
|
|
|
|
# Add user to new groups
|
|
for group_model in all_available_groups:
|
|
if group_model.name in user_oauth_groups and not any(
|
|
gm.name == group_model.name for gm in user_current_groups
|
|
):
|
|
# Add user to group
|
|
|
|
user_ids = group_model.user_ids
|
|
user_ids.append(user.id)
|
|
|
|
# In case a group is created, but perms are never assigned to the group by hitting "save"
|
|
group_permissions = group_model.permissions
|
|
if not group_permissions:
|
|
group_permissions = default_permissions
|
|
|
|
update_form = GroupUpdateForm(
|
|
name=group_model.name,
|
|
description=group_model.description,
|
|
permissions=group_permissions,
|
|
user_ids=user_ids,
|
|
)
|
|
Groups.update_group_by_id(
|
|
id=group_model.id, form_data=update_form, overwrite=False
|
|
)
|
|
|
|
async def handle_login(self, provider, request):
|
|
if provider not in OAUTH_PROVIDERS:
|
|
raise HTTPException(404)
|
|
# If the provider has a custom redirect URL, use that, otherwise automatically generate one
|
|
redirect_uri = OAUTH_PROVIDERS[provider].get("redirect_uri") or request.url_for(
|
|
"oauth_callback", provider=provider
|
|
)
|
|
client = self.get_client(provider)
|
|
if client is None:
|
|
raise HTTPException(404)
|
|
return await client.authorize_redirect(request, redirect_uri)
|
|
|
|
async def handle_callback(self, provider, request, response):
|
|
if provider not in OAUTH_PROVIDERS:
|
|
raise HTTPException(404)
|
|
client = self.get_client(provider)
|
|
try:
|
|
token = await client.authorize_access_token(request)
|
|
except Exception as e:
|
|
log.warning(f"OAuth callback error: {e}")
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
|
|
user_data: UserInfo = token["userinfo"]
|
|
if not user_data:
|
|
user_data: UserInfo = await client.userinfo(token=token)
|
|
if not user_data:
|
|
log.warning(f"OAuth callback failed, user data is missing: {token}")
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
|
|
|
|
sub = user_data.get("sub")
|
|
if not sub:
|
|
log.warning(f"OAuth callback failed, sub is missing: {user_data}")
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
|
|
provider_sub = f"{provider}@{sub}"
|
|
email_claim = auth_manager_config.OAUTH_EMAIL_CLAIM
|
|
email = user_data.get(email_claim, "").lower()
|
|
# We currently mandate that email addresses are provided
|
|
if not email:
|
|
log.warning(f"OAuth callback failed, email is missing: {user_data}")
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
|
|
if (
|
|
"*" not in auth_manager_config.OAUTH_ALLOWED_DOMAINS
|
|
and email.split("@")[-1] not in auth_manager_config.OAUTH_ALLOWED_DOMAINS
|
|
):
|
|
log.warning(
|
|
f"OAuth callback failed, e-mail domain is not in the list of allowed domains: {user_data}"
|
|
)
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.INVALID_CRED)
|
|
|
|
# Check if the user exists
|
|
user = Users.get_user_by_oauth_sub(provider_sub)
|
|
|
|
if not user:
|
|
# If the user does not exist, check if merging is enabled
|
|
if auth_manager_config.OAUTH_MERGE_ACCOUNTS_BY_EMAIL:
|
|
# Check if the user exists by email
|
|
user = Users.get_user_by_email(email)
|
|
if user:
|
|
# Update the user with the new oauth sub
|
|
Users.update_user_oauth_sub_by_id(user.id, provider_sub)
|
|
|
|
if user:
|
|
determined_role = self.get_user_role(user, user_data)
|
|
if user.role != determined_role:
|
|
Users.update_user_role_by_id(user.id, determined_role)
|
|
|
|
if not user:
|
|
# If the user does not exist, check if signups are enabled
|
|
if auth_manager_config.ENABLE_OAUTH_SIGNUP:
|
|
# Check if an existing user with the same email already exists
|
|
existing_user = Users.get_user_by_email(
|
|
user_data.get("email", "").lower()
|
|
)
|
|
if existing_user:
|
|
raise HTTPException(400, detail=ERROR_MESSAGES.EMAIL_TAKEN)
|
|
|
|
picture_claim = auth_manager_config.OAUTH_PICTURE_CLAIM
|
|
picture_url = user_data.get(picture_claim, "")
|
|
if picture_url:
|
|
# Download the profile image into a base64 string
|
|
try:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(picture_url) as resp:
|
|
picture = await resp.read()
|
|
base64_encoded_picture = base64.b64encode(
|
|
picture
|
|
).decode("utf-8")
|
|
guessed_mime_type = mimetypes.guess_type(picture_url)[0]
|
|
if guessed_mime_type is None:
|
|
# assume JPG, browsers are tolerant enough of image formats
|
|
guessed_mime_type = "image/jpeg"
|
|
picture_url = f"data:{guessed_mime_type};base64,{base64_encoded_picture}"
|
|
except Exception as e:
|
|
log.error(
|
|
f"Error downloading profile image '{picture_url}': {e}"
|
|
)
|
|
picture_url = ""
|
|
if not picture_url:
|
|
picture_url = "/user.png"
|
|
username_claim = auth_manager_config.OAUTH_USERNAME_CLAIM
|
|
|
|
role = self.get_user_role(None, user_data)
|
|
|
|
user = Auths.insert_new_auth(
|
|
email=email,
|
|
password=get_password_hash(
|
|
str(uuid.uuid4())
|
|
), # Random password, not used
|
|
name=user_data.get(username_claim, "User"),
|
|
profile_image_url=picture_url,
|
|
role=role,
|
|
oauth_sub=provider_sub,
|
|
)
|
|
|
|
if auth_manager_config.WEBHOOK_URL:
|
|
post_webhook(
|
|
auth_manager_config.WEBHOOK_URL,
|
|
auth_manager_config.WEBHOOK_MESSAGES.USER_SIGNUP(user.name),
|
|
{
|
|
"action": "signup",
|
|
"message": auth_manager_config.WEBHOOK_MESSAGES.USER_SIGNUP(
|
|
user.name
|
|
),
|
|
"user": user.model_dump_json(exclude_none=True),
|
|
},
|
|
)
|
|
else:
|
|
raise HTTPException(
|
|
status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.ACCESS_PROHIBITED
|
|
)
|
|
|
|
jwt_token = create_token(
|
|
data={"id": user.id},
|
|
expires_delta=parse_duration(auth_manager_config.JWT_EXPIRES_IN),
|
|
)
|
|
|
|
if auth_manager_config.ENABLE_OAUTH_GROUP_MANAGEMENT:
|
|
self.update_user_groups(
|
|
user=user,
|
|
user_data=user_data,
|
|
default_permissions=request.app.state.config.USER_PERMISSIONS,
|
|
)
|
|
|
|
# Set the cookie token
|
|
response.set_cookie(
|
|
key="token",
|
|
value=jwt_token,
|
|
httponly=True, # Ensures the cookie is not accessible via JavaScript
|
|
samesite=WEBUI_SESSION_COOKIE_SAME_SITE,
|
|
secure=WEBUI_SESSION_COOKIE_SECURE,
|
|
)
|
|
|
|
if ENABLE_OAUTH_SIGNUP.value:
|
|
oauth_id_token = token.get("id_token")
|
|
response.set_cookie(
|
|
key="oauth_id_token",
|
|
value=oauth_id_token,
|
|
httponly=True,
|
|
samesite=WEBUI_SESSION_COOKIE_SAME_SITE,
|
|
secure=WEBUI_SESSION_COOKIE_SECURE,
|
|
)
|
|
# Redirect back to the frontend with the JWT token
|
|
redirect_url = f"{request.base_url}auth#token={jwt_token}"
|
|
return RedirectResponse(url=redirect_url, headers=response.headers)
|
|
|
|
|
|
oauth_manager = OAuthManager()
|