import json import logging from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Request, status, BackgroundTasks from pydantic import BaseModel from open_webui.socket.main import sio, get_user_ids_from_room from open_webui.models.users import Users, UserNameResponse from open_webui.models.channels import Channels, ChannelModel, ChannelForm from open_webui.models.messages import ( Messages, MessageModel, MessageResponse, MessageForm, ) from open_webui.config import ENABLE_ADMIN_CHAT_ACCESS, ENABLE_ADMIN_EXPORT from open_webui.constants import ERROR_MESSAGES from open_webui.env import SRC_LOG_LEVELS from open_webui.utils.auth import get_admin_user, get_verified_user from open_webui.utils.access_control import has_access, get_users_with_access from open_webui.utils.webhook import post_webhook log = logging.getLogger(__name__) log.setLevel(SRC_LOG_LEVELS["MODELS"]) router = APIRouter() ############################ # GetChatList ############################ @router.get("/", response_model=list[ChannelModel]) async def get_channels(user=Depends(get_verified_user)): if user.role == "admin": return Channels.get_channels() else: return Channels.get_channels_by_user_id(user.id) ############################ # CreateNewChannel ############################ @router.post("/create", response_model=Optional[ChannelModel]) async def create_new_channel(form_data: ChannelForm, user=Depends(get_admin_user)): try: channel = Channels.insert_new_channel(None, form_data, user.id) return ChannelModel(**channel.model_dump()) except Exception as e: log.exception(e) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() ) ############################ # GetChannelById ############################ @router.get("/{id}", response_model=Optional[ChannelModel]) async def get_channel_by_id(id: str, user=Depends(get_verified_user)): channel = Channels.get_channel_by_id(id) if not channel: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND ) if user.role != "admin" and not has_access( user.id, type="read", access_control=channel.access_control ): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() ) return ChannelModel(**channel.model_dump()) ############################ # UpdateChannelById ############################ @router.post("/{id}/update", response_model=Optional[ChannelModel]) async def update_channel_by_id( id: str, form_data: ChannelForm, user=Depends(get_admin_user) ): channel = Channels.get_channel_by_id(id) if not channel: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND ) try: channel = Channels.update_channel_by_id(id, form_data) return ChannelModel(**channel.model_dump()) except Exception as e: log.exception(e) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() ) ############################ # DeleteChannelById ############################ @router.delete("/{id}/delete", response_model=bool) async def delete_channel_by_id(id: str, user=Depends(get_admin_user)): channel = Channels.get_channel_by_id(id) if not channel: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND ) try: Channels.delete_channel_by_id(id) return True except Exception as e: log.exception(e) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() ) ############################ # GetChannelMessages ############################ class MessageUserResponse(MessageResponse): user: UserNameResponse @router.get("/{id}/messages", response_model=list[MessageUserResponse]) async def get_channel_messages( id: str, skip: int = 0, limit: int = 50, user=Depends(get_verified_user) ): channel = Channels.get_channel_by_id(id) if not channel: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND ) if user.role != "admin" and not has_access( user.id, type="read", access_control=channel.access_control ): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() ) message_list = Messages.get_messages_by_channel_id(id, skip, limit) users = {} messages = [] for message in message_list: if message.user_id not in users: user = Users.get_user_by_id(message.user_id) users[message.user_id] = user replies = Messages.get_replies_by_message_id(message.id) latest_reply_at = replies[0].created_at if replies else None messages.append( MessageUserResponse( **{ **message.model_dump(), "reply_count": len(replies), "latest_reply_at": latest_reply_at, "reactions": Messages.get_reactions_by_message_id(message.id), "user": UserNameResponse(**users[message.user_id].model_dump()), } ) ) return messages ############################ # PostNewMessage ############################ async def send_notification(webui_url, channel, message, active_user_ids): users = get_users_with_access("read", channel.access_control) for user in users: if user.id in active_user_ids: continue else: if user.settings: webhook_url = user.settings.ui.get("notifications", {}).get( "webhook_url", None ) if webhook_url: post_webhook( webhook_url, f"#{channel.name} - {webui_url}/channels/{channel.id}\n\n{message.content}", { "action": "channel", "message": message.content, "title": channel.name, "url": f"{webui_url}/channels/{channel.id}", }, ) @router.post("/{id}/messages/post", response_model=Optional[MessageModel]) async def post_new_message( request: Request, id: str, form_data: MessageForm, background_tasks: BackgroundTasks, user=Depends(get_verified_user), ): channel = Channels.get_channel_by_id(id) if not channel: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND ) if user.role != "admin" and not has_access( user.id, type="read", access_control=channel.access_control ): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() ) try: message = Messages.insert_new_message(form_data, channel.id, user.id) if message: event_data = { "channel_id": channel.id, "message_id": message.id, "data": { "type": "message", "data": MessageUserResponse( **{ **message.model_dump(), "reply_count": 0, "latest_reply_at": None, "reactions": Messages.get_reactions_by_message_id( message.id ), "user": UserNameResponse(**user.model_dump()), } ).model_dump(), }, "user": UserNameResponse(**user.model_dump()).model_dump(), "channel": channel.model_dump(), } await sio.emit( "channel-events", event_data, to=f"channel:{channel.id}", ) if message.parent_id: # If this message is a reply, emit to the parent message as well parent_message = Messages.get_message_by_id(message.parent_id) if parent_message: await sio.emit( "channel-events", { "channel_id": channel.id, "message_id": parent_message.id, "data": { "type": "message:reply", "data": MessageUserResponse( **{ **parent_message.model_dump(), "user": UserNameResponse( **Users.get_user_by_id( parent_message.user_id ).model_dump() ), } ).model_dump(), }, "user": UserNameResponse(**user.model_dump()).model_dump(), "channel": channel.model_dump(), }, to=f"channel:{channel.id}", ) active_user_ids = get_user_ids_from_room(f"channel:{channel.id}") background_tasks.add_task( send_notification, request.app.state.config.WEBUI_URL, channel, message, active_user_ids, ) return MessageModel(**message.model_dump()) except Exception as e: log.exception(e) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() ) ############################ # GetChannelMessage ############################ @router.get("/{id}/messages/{message_id}", response_model=Optional[MessageUserResponse]) async def get_channel_message( id: str, message_id: str, user=Depends(get_verified_user) ): channel = Channels.get_channel_by_id(id) if not channel: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND ) if user.role != "admin" and not has_access( user.id, type="read", access_control=channel.access_control ): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() ) message = Messages.get_message_by_id(message_id) if not message: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND ) if message.channel_id != id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() ) return MessageUserResponse( **{ **message.model_dump(), "user": UserNameResponse( **Users.get_user_by_id(message.user_id).model_dump() ), } ) ############################ # GetChannelThreadMessages ############################ @router.get( "/{id}/messages/{message_id}/thread", response_model=list[MessageUserResponse] ) async def get_channel_thread_messages( id: str, message_id: str, skip: int = 0, limit: int = 50, user=Depends(get_verified_user), ): channel = Channels.get_channel_by_id(id) if not channel: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND ) if user.role != "admin" and not has_access( user.id, type="read", access_control=channel.access_control ): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() ) message_list = Messages.get_messages_by_parent_id(id, message_id, skip, limit) users = {} messages = [] for message in message_list: if message.user_id not in users: user = Users.get_user_by_id(message.user_id) users[message.user_id] = user messages.append( MessageUserResponse( **{ **message.model_dump(), "reply_count": 0, "latest_reply_at": None, "reactions": Messages.get_reactions_by_message_id(message.id), "user": UserNameResponse(**users[message.user_id].model_dump()), } ) ) return messages ############################ # UpdateMessageById ############################ @router.post( "/{id}/messages/{message_id}/update", response_model=Optional[MessageModel] ) async def update_message_by_id( id: str, message_id: str, form_data: MessageForm, user=Depends(get_verified_user) ): channel = Channels.get_channel_by_id(id) if not channel: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND ) if user.role != "admin" and not has_access( user.id, type="read", access_control=channel.access_control ): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() ) message = Messages.get_message_by_id(message_id) if not message: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND ) if message.channel_id != id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() ) try: message = Messages.update_message_by_id(message_id, form_data) message = Messages.get_message_by_id(message_id) if message: await sio.emit( "channel-events", { "channel_id": channel.id, "message_id": message.id, "data": { "type": "message:update", "data": MessageUserResponse( **{ **message.model_dump(), "user": UserNameResponse( **user.model_dump() ).model_dump(), } ).model_dump(), }, "user": UserNameResponse(**user.model_dump()).model_dump(), "channel": channel.model_dump(), }, to=f"channel:{channel.id}", ) return MessageModel(**message.model_dump()) except Exception as e: log.exception(e) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() ) ############################ # AddReactionToMessage ############################ class ReactionForm(BaseModel): name: str @router.post("/{id}/messages/{message_id}/reactions/add", response_model=bool) async def add_reaction_to_message( id: str, message_id: str, form_data: ReactionForm, user=Depends(get_verified_user) ): channel = Channels.get_channel_by_id(id) if not channel: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND ) if user.role != "admin" and not has_access( user.id, type="read", access_control=channel.access_control ): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() ) message = Messages.get_message_by_id(message_id) if not message: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND ) if message.channel_id != id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() ) try: Messages.add_reaction_to_message(message_id, user.id, form_data.name) message = Messages.get_message_by_id(message_id) await sio.emit( "channel-events", { "channel_id": channel.id, "message_id": message.id, "data": { "type": "message:reaction:add", "data": { **message.model_dump(), "user": UserNameResponse( **Users.get_user_by_id(message.user_id).model_dump() ).model_dump(), "name": form_data.name, }, }, "user": UserNameResponse(**user.model_dump()).model_dump(), "channel": channel.model_dump(), }, to=f"channel:{channel.id}", ) return True except Exception as e: log.exception(e) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() ) ############################ # RemoveReactionById ############################ @router.post("/{id}/messages/{message_id}/reactions/remove", response_model=bool) async def remove_reaction_by_id_and_user_id_and_name( id: str, message_id: str, form_data: ReactionForm, user=Depends(get_verified_user) ): channel = Channels.get_channel_by_id(id) if not channel: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND ) if user.role != "admin" and not has_access( user.id, type="read", access_control=channel.access_control ): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() ) message = Messages.get_message_by_id(message_id) if not message: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND ) if message.channel_id != id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() ) try: Messages.remove_reaction_by_id_and_user_id_and_name( message_id, user.id, form_data.name ) message = Messages.get_message_by_id(message_id) await sio.emit( "channel-events", { "channel_id": channel.id, "message_id": message.id, "data": { "type": "message:reaction:remove", "data": { **message.model_dump(), "user": UserNameResponse( **Users.get_user_by_id(message.user_id).model_dump() ).model_dump(), "name": form_data.name, }, }, "user": UserNameResponse(**user.model_dump()).model_dump(), "channel": channel.model_dump(), }, to=f"channel:{channel.id}", ) return True except Exception as e: log.exception(e) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() ) ############################ # DeleteMessageById ############################ @router.delete("/{id}/messages/{message_id}/delete", response_model=bool) async def delete_message_by_id( id: str, message_id: str, user=Depends(get_verified_user) ): channel = Channels.get_channel_by_id(id) if not channel: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND ) if user.role != "admin" and not has_access( user.id, type="read", access_control=channel.access_control ): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() ) message = Messages.get_message_by_id(message_id) if not message: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND ) if message.channel_id != id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() ) try: Messages.delete_message_by_id(message_id) await sio.emit( "channel-events", { "channel_id": channel.id, "message_id": message.id, "data": { "type": "message:delete", "data": { **message.model_dump(), "user": UserNameResponse(**user.model_dump()).model_dump(), }, }, "user": UserNameResponse(**user.model_dump()).model_dump(), "channel": channel.model_dump(), }, to=f"channel:{channel.id}", ) if message.parent_id: # If this message is a reply, emit to the parent message as well parent_message = Messages.get_message_by_id(message.parent_id) if parent_message: await sio.emit( "channel-events", { "channel_id": channel.id, "message_id": parent_message.id, "data": { "type": "message:reply", "data": MessageUserResponse( **{ **parent_message.model_dump(), "user": UserNameResponse( **Users.get_user_by_id( parent_message.user_id ).model_dump() ), } ).model_dump(), }, "user": UserNameResponse(**user.model_dump()).model_dump(), "channel": channel.model_dump(), }, to=f"channel:{channel.id}", ) return True except Exception as e: log.exception(e) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() )