feat: channel webhooks

This commit is contained in:
Timothy Jaeryang Baek
2026-01-09 02:30:15 +04:00
parent 48bdb3f266
commit cd296fcf0d
10 changed files with 1012 additions and 12 deletions

View File

@@ -1,4 +1,5 @@
import json
import secrets
import time
import uuid
from typing import Optional
@@ -245,6 +246,11 @@ class CreateChannelForm(ChannelForm):
type: Optional[str] = None
class ChannelWebhookForm(BaseModel):
name: str
profile_image_url: Optional[str] = None
class ChannelTable:
def _collect_unique_user_ids(
@@ -945,5 +951,115 @@ class ChannelTable:
db.commit()
return True
####################
# Webhook Methods
####################
def insert_webhook(
self,
channel_id: str,
user_id: str,
form_data: ChannelWebhookForm,
db: Optional[Session] = None,
) -> Optional[ChannelWebhookModel]:
with get_db_context(db) as db:
webhook = ChannelWebhookModel(
id=str(uuid.uuid4()),
channel_id=channel_id,
user_id=user_id,
name=form_data.name,
profile_image_url=form_data.profile_image_url,
token=secrets.token_urlsafe(32),
last_used_at=None,
created_at=int(time.time_ns()),
updated_at=int(time.time_ns()),
)
db.add(ChannelWebhook(**webhook.model_dump()))
db.commit()
return webhook
def get_webhooks_by_channel_id(
self, channel_id: str, db: Optional[Session] = None
) -> list[ChannelWebhookModel]:
with get_db_context(db) as db:
webhooks = (
db.query(ChannelWebhook)
.filter(ChannelWebhook.channel_id == channel_id)
.all()
)
return [ChannelWebhookModel.model_validate(w) for w in webhooks]
def get_webhook_by_id(
self, webhook_id: str, db: Optional[Session] = None
) -> Optional[ChannelWebhookModel]:
with get_db_context(db) as db:
webhook = (
db.query(ChannelWebhook)
.filter(ChannelWebhook.id == webhook_id)
.first()
)
return ChannelWebhookModel.model_validate(webhook) if webhook else None
def get_webhook_by_id_and_token(
self, webhook_id: str, token: str, db: Optional[Session] = None
) -> Optional[ChannelWebhookModel]:
with get_db_context(db) as db:
webhook = (
db.query(ChannelWebhook)
.filter(
ChannelWebhook.id == webhook_id,
ChannelWebhook.token == token,
)
.first()
)
return ChannelWebhookModel.model_validate(webhook) if webhook else None
def update_webhook_by_id(
self,
webhook_id: str,
form_data: ChannelWebhookForm,
db: Optional[Session] = None,
) -> Optional[ChannelWebhookModel]:
with get_db_context(db) as db:
webhook = (
db.query(ChannelWebhook)
.filter(ChannelWebhook.id == webhook_id)
.first()
)
if not webhook:
return None
webhook.name = form_data.name
webhook.profile_image_url = form_data.profile_image_url
webhook.updated_at = int(time.time_ns())
db.commit()
return ChannelWebhookModel.model_validate(webhook)
def update_webhook_last_used_at(
self, webhook_id: str, db: Optional[Session] = None
) -> bool:
with get_db_context(db) as db:
webhook = (
db.query(ChannelWebhook)
.filter(ChannelWebhook.id == webhook_id)
.first()
)
if not webhook:
return False
webhook.last_used_at = int(time.time_ns())
db.commit()
return True
def delete_webhook_by_id(
self, webhook_id: str, db: Optional[Session] = None
) -> bool:
with get_db_context(db) as db:
result = (
db.query(ChannelWebhook)
.filter(ChannelWebhook.id == webhook_id)
.delete()
)
db.commit()
return result > 0
Channels = ChannelTable()

View File

@@ -199,11 +199,32 @@ class MessageTable:
if include_thread_replies:
thread_replies = self.get_thread_replies_by_message_id(id, db=db)
user = Users.get_user_by_id(message.user_id, db=db)
# Check if message was sent by webhook (webhook info in meta takes precedence)
webhook_info = message.meta.get("webhook") if message.meta else None
if webhook_info and webhook_info.get("id"):
# Look up webhook by ID to get current name
webhook = Channels.get_webhook_by_id(webhook_info.get("id"), db=db)
if webhook:
user_info = {
"id": webhook.id,
"name": webhook.name,
"role": "webhook",
}
else:
# Webhook was deleted, use placeholder
user_info = {
"id": webhook_info.get("id"),
"name": "Deleted Webhook",
"role": "webhook",
}
else:
user = Users.get_user_by_id(message.user_id, db=db)
user_info = user.model_dump() if user else None
return MessageResponse.model_validate(
{
**MessageModel.model_validate(message).model_dump(),
"user": user.model_dump() if user else None,
"user": user_info,
"reply_to_message": (
reply_to_message.model_dump() if reply_to_message else None
),
@@ -235,10 +256,29 @@ class MessageTable:
if message.reply_to_id
else None
)
webhook_info = message.meta.get("webhook") if message.meta else None
user_info = None
if webhook_info and webhook_info.get("id"):
webhook = Channels.get_webhook_by_id(webhook_info.get("id"), db=db)
if webhook:
user_info = {
"id": webhook.id,
"name": webhook.name,
"role": "webhook",
}
else:
user_info = {
"id": webhook_info.get("id"),
"name": "Deleted Webhook",
"role": "webhook",
}
messages.append(
MessageReplyToResponse.model_validate(
{
**MessageModel.model_validate(message).model_dump(),
"user": user_info,
"reply_to_message": (
reply_to_message.model_dump()
if reply_to_message
@@ -284,10 +324,29 @@ class MessageTable:
if message.reply_to_id
else None
)
webhook_info = message.meta.get("webhook") if message.meta else None
user_info = None
if webhook_info and webhook_info.get("id"):
webhook = Channels.get_webhook_by_id(webhook_info.get("id"), db=db)
if webhook:
user_info = {
"id": webhook.id,
"name": webhook.name,
"role": "webhook",
}
else:
user_info = {
"id": webhook_info.get("id"),
"name": "Deleted Webhook",
"role": "webhook",
}
messages.append(
MessageReplyToResponse.model_validate(
{
**MessageModel.model_validate(message).model_dump(),
"user": user_info,
"reply_to_message": (
reply_to_message.model_dump()
if reply_to_message
@@ -334,10 +393,29 @@ class MessageTable:
if message.reply_to_id
else None
)
webhook_info = message.meta.get("webhook") if message.meta else None
user_info = None
if webhook_info and webhook_info.get("id"):
webhook = Channels.get_webhook_by_id(webhook_info.get("id"), db=db)
if webhook:
user_info = {
"id": webhook.id,
"name": webhook.name,
"role": "webhook",
}
else:
user_info = {
"id": webhook_info.get("id"),
"name": "Deleted Webhook",
"role": "webhook",
}
messages.append(
MessageReplyToResponse.model_validate(
{
**MessageModel.model_validate(message).model_dump(),
"user": user_info,
"reply_to_message": (
reply_to_message.model_dump()
if reply_to_message

View File

@@ -1,9 +1,12 @@
import json
import logging
import base64
import io
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Request, status, BackgroundTasks
from fastapi.responses import Response, StreamingResponse, FileResponse
from pydantic import BaseModel
from pydantic import field_validator
@@ -29,6 +32,8 @@ from open_webui.models.channels import (
ChannelForm,
ChannelResponse,
CreateChannelForm,
ChannelWebhookModel,
ChannelWebhookForm,
)
from open_webui.models.messages import (
Messages,
@@ -43,6 +48,7 @@ from open_webui.utils.files import get_image_base64_from_file_id
from open_webui.config import ENABLE_ADMIN_CHAT_ACCESS, ENABLE_ADMIN_EXPORT
from open_webui.constants import ERROR_MESSAGES
from open_webui.env import STATIC_DIR
from open_webui.utils.models import (
@@ -822,6 +828,11 @@ async def get_channel_messages(
thread_replies[0].created_at if thread_replies else None
)
# Use message.user if present (for webhooks), otherwise look up by user_id
user_info = message.user
if user_info is None and message.user_id in users:
user_info = UserNameResponse(**users[message.user_id].model_dump())
messages.append(
MessageUserResponse(
**{
@@ -831,7 +842,7 @@ async def get_channel_messages(
"reactions": Messages.get_reactions_by_message_id(
message.id, db=db
),
"user": UserNameResponse(**users[message.user_id].model_dump()),
"user": user_info,
}
)
)
@@ -889,6 +900,19 @@ async def get_pinned_channel_messages(
messages = []
for message in message_list:
# Check for webhook identity in meta
webhook_info = message.meta.get("webhook") if message.meta else None
if webhook_info:
user_info = UserNameResponse(
id=webhook_info.get("id"),
name=webhook_info.get("name"),
role="webhook",
)
elif message.user_id in users:
user_info = UserNameResponse(**users[message.user_id].model_dump())
else:
user_info = None
messages.append(
MessageWithReactionsResponse(
**{
@@ -896,7 +920,7 @@ async def get_pinned_channel_messages(
"reactions": Messages.get_reactions_by_message_id(
message.id, db=db
),
"user": UserNameResponse(**users[message.user_id].model_dump()),
"user": user_info,
}
)
)
@@ -1476,6 +1500,11 @@ async def get_channel_thread_messages(
messages = []
for message in message_list:
# Use message.user if present (for webhooks), otherwise look up by user_id
user_info = message.user
if user_info is None and message.user_id in users:
user_info = UserNameResponse(**users[message.user_id].model_dump())
messages.append(
MessageUserResponse(
**{
@@ -1485,7 +1514,7 @@ async def get_channel_thread_messages(
"reactions": Messages.get_reactions_by_message_id(
message.id, db=db
),
"user": UserNameResponse(**users[message.user_id].model_dump()),
"user": user_info,
}
)
)
@@ -1835,3 +1864,262 @@ async def delete_message_by_id(
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
############################
# Webhooks
############################
@router.get("/webhooks/{webhook_id}/profile/image")
async def get_webhook_profile_image(
webhook_id: str,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
"""Get webhook profile image by webhook ID."""
webhook = Channels.get_webhook_by_id(webhook_id, db=db)
if not webhook:
# Return default favicon if webhook not found
return FileResponse(f"{STATIC_DIR}/favicon.png")
if webhook.profile_image_url:
# Check if it's url or base64
if webhook.profile_image_url.startswith("http"):
return Response(
status_code=status.HTTP_302_FOUND,
headers={"Location": webhook.profile_image_url},
)
elif webhook.profile_image_url.startswith("data:image"):
try:
header, base64_data = webhook.profile_image_url.split(",", 1)
image_data = base64.b64decode(base64_data)
image_buffer = io.BytesIO(image_data)
media_type = header.split(";")[0].lstrip("data:")
return StreamingResponse(
image_buffer,
media_type=media_type,
headers={"Content-Disposition": "inline"},
)
except Exception as e:
pass
# Return default favicon if no profile image
return FileResponse(f"{STATIC_DIR}/favicon.png")
@router.get("/{id}/webhooks", response_model=list[ChannelWebhookModel])
async def get_channel_webhooks(
request: Request,
id: str,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
# Only channel managers can view webhooks
if (
not Channels.is_user_channel_manager(channel.id, user.id, db=db)
and user.role != "admin"
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.UNAUTHORIZED
)
return Channels.get_webhooks_by_channel_id(id, db=db)
@router.post("/{id}/webhooks/create", response_model=ChannelWebhookModel)
async def create_channel_webhook(
request: Request,
id: str,
form_data: ChannelWebhookForm,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
# Only channel managers can create webhooks
if (
not Channels.is_user_channel_manager(channel.id, user.id, db=db)
and user.role != "admin"
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.UNAUTHORIZED
)
webhook = Channels.insert_webhook(id, user.id, form_data, db=db)
if not webhook:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
return webhook
@router.post("/{id}/webhooks/{webhook_id}/update", response_model=ChannelWebhookModel)
async def update_channel_webhook(
request: Request,
id: str,
webhook_id: str,
form_data: ChannelWebhookForm,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
# Only channel managers can update webhooks
if (
not Channels.is_user_channel_manager(channel.id, user.id, db=db)
and user.role != "admin"
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.UNAUTHORIZED
)
webhook = Channels.get_webhook_by_id(webhook_id, db=db)
if not webhook or webhook.channel_id != id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
updated = Channels.update_webhook_by_id(webhook_id, form_data, db=db)
if not updated:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT()
)
return updated
@router.delete("/{id}/webhooks/{webhook_id}/delete", response_model=bool)
async def delete_channel_webhook(
request: Request,
id: str,
webhook_id: str,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
check_channels_access(request)
channel = Channels.get_channel_by_id(id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
# Only channel managers can delete webhooks
if (
not Channels.is_user_channel_manager(channel.id, user.id, db=db)
and user.role != "admin"
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.UNAUTHORIZED
)
webhook = Channels.get_webhook_by_id(webhook_id, db=db)
if not webhook or webhook.channel_id != id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
return Channels.delete_webhook_by_id(webhook_id, db=db)
############################
# Public Webhook Endpoint
############################
class WebhookMessageForm(BaseModel):
content: str
@router.post("/webhooks/{webhook_id}/{token}")
async def post_webhook_message(
request: Request,
webhook_id: str,
token: str,
form_data: WebhookMessageForm,
db: Session = Depends(get_session),
):
"""Public endpoint to post messages via webhook. No authentication required."""
check_channels_access(request)
# Validate webhook
webhook = Channels.get_webhook_by_id_and_token(webhook_id, token, db=db)
if not webhook:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid webhook URL",
)
channel = Channels.get_channel_by_id(webhook.channel_id, db=db)
if not channel:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND
)
# Create message with webhook identity stored in meta
message = Messages.insert_new_message(
MessageForm(content=form_data.content, meta={"webhook": {"id": webhook.id}}),
webhook.channel_id,
webhook.user_id, # Required for DB but webhook info in meta takes precedence
db=db,
)
if not message:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Failed to create message",
)
# Update last_used_at
Channels.update_webhook_last_used_at(webhook_id, db=db)
# Get full message and emit event
message = Messages.get_message_by_id(message.id, db=db)
event_data = {
"channel_id": channel.id,
"message_id": message.id,
"data": {
"type": "message",
"data": {
**message.model_dump(),
"user": {
"id": webhook.id,
"name": webhook.name,
"role": "webhook",
},
},
},
"user": {
"id": webhook.id,
"name": webhook.name,
"role": "webhook",
},
"channel": channel.model_dump(),
}
await sio.emit(
"events:channel",
event_data,
to=f"channel:{channel.id}",
)
return {"success": True, "message_id": message.id}