mirror of
https://github.com/open-webui/open-webui
synced 2024-11-06 00:32:05 +00:00
224 lines
5.6 KiB
Python
224 lines
5.6 KiB
Python
import re
|
|
import requests
|
|
from fastapi import (
|
|
FastAPI,
|
|
Request,
|
|
Depends,
|
|
HTTPException,
|
|
status,
|
|
UploadFile,
|
|
File,
|
|
Form,
|
|
)
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from faster_whisper import WhisperModel
|
|
|
|
from constants import ERROR_MESSAGES
|
|
from utils.utils import (
|
|
get_current_user,
|
|
get_admin_user,
|
|
)
|
|
from utils.misc import calculate_sha256
|
|
from typing import Optional
|
|
from pydantic import BaseModel
|
|
from config import AUTOMATIC1111_BASE_URL
|
|
|
|
app = FastAPI()
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
app.state.AUTOMATIC1111_BASE_URL = AUTOMATIC1111_BASE_URL
|
|
app.state.ENABLED = app.state.AUTOMATIC1111_BASE_URL != ""
|
|
app.state.IMAGE_SIZE = "512x512"
|
|
app.state.IMAGE_STEPS = 50
|
|
|
|
|
|
@app.get("/enabled", response_model=bool)
|
|
async def get_enable_status(request: Request, user=Depends(get_admin_user)):
|
|
return app.state.ENABLED
|
|
|
|
|
|
@app.get("/enabled/toggle", response_model=bool)
|
|
async def toggle_enabled(request: Request, user=Depends(get_admin_user)):
|
|
try:
|
|
r = requests.head(app.state.AUTOMATIC1111_BASE_URL)
|
|
app.state.ENABLED = not app.state.ENABLED
|
|
return app.state.ENABLED
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=ERROR_MESSAGES.DEFAULT(e))
|
|
|
|
|
|
class UrlUpdateForm(BaseModel):
|
|
url: str
|
|
|
|
|
|
@app.get("/url")
|
|
async def get_openai_url(user=Depends(get_admin_user)):
|
|
return {"AUTOMATIC1111_BASE_URL": app.state.AUTOMATIC1111_BASE_URL}
|
|
|
|
|
|
@app.post("/url/update")
|
|
async def update_openai_url(form_data: UrlUpdateForm, user=Depends(get_admin_user)):
|
|
|
|
if form_data.url == "":
|
|
app.state.AUTOMATIC1111_BASE_URL = AUTOMATIC1111_BASE_URL
|
|
else:
|
|
app.state.AUTOMATIC1111_BASE_URL = form_data.url.strip("/")
|
|
|
|
return {
|
|
"AUTOMATIC1111_BASE_URL": app.state.AUTOMATIC1111_BASE_URL,
|
|
"status": True,
|
|
}
|
|
|
|
|
|
class ImageSizeUpdateForm(BaseModel):
|
|
size: str
|
|
|
|
|
|
@app.get("/size")
|
|
async def get_image_size(user=Depends(get_admin_user)):
|
|
return {"IMAGE_SIZE": app.state.IMAGE_SIZE}
|
|
|
|
|
|
@app.post("/size/update")
|
|
async def update_image_size(
|
|
form_data: ImageSizeUpdateForm, user=Depends(get_admin_user)
|
|
):
|
|
pattern = r"^\d+x\d+$" # Regular expression pattern
|
|
if re.match(pattern, form_data.size):
|
|
app.state.IMAGE_SIZE = form_data.size
|
|
return {
|
|
"IMAGE_SIZE": app.state.IMAGE_SIZE,
|
|
"status": True,
|
|
}
|
|
else:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=ERROR_MESSAGES.INCORRECT_FORMAT(" (e.g., 512x512)."),
|
|
)
|
|
|
|
|
|
class ImageStepsUpdateForm(BaseModel):
|
|
steps: int
|
|
|
|
|
|
@app.get("/steps")
|
|
async def get_image_size(user=Depends(get_admin_user)):
|
|
return {"IMAGE_STEPS": app.state.IMAGE_STEPS}
|
|
|
|
|
|
@app.post("/steps/update")
|
|
async def update_image_size(
|
|
form_data: ImageStepsUpdateForm, user=Depends(get_admin_user)
|
|
):
|
|
if form_data.steps >= 0:
|
|
app.state.IMAGE_STEPS = form_data.steps
|
|
return {
|
|
"IMAGE_STEPS": app.state.IMAGE_STEPS,
|
|
"status": True,
|
|
}
|
|
else:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=ERROR_MESSAGES.INCORRECT_FORMAT(" (e.g., 50)."),
|
|
)
|
|
|
|
|
|
@app.get("/models")
|
|
def get_models(user=Depends(get_current_user)):
|
|
try:
|
|
r = requests.get(url=f"{app.state.AUTOMATIC1111_BASE_URL}/sdapi/v1/sd-models")
|
|
models = r.json()
|
|
return models
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=ERROR_MESSAGES.DEFAULT(e))
|
|
|
|
|
|
@app.get("/models/default")
|
|
async def get_default_model(user=Depends(get_admin_user)):
|
|
try:
|
|
r = requests.get(url=f"{app.state.AUTOMATIC1111_BASE_URL}/sdapi/v1/options")
|
|
options = r.json()
|
|
|
|
return {"model": options["sd_model_checkpoint"]}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=ERROR_MESSAGES.DEFAULT(e))
|
|
|
|
|
|
class UpdateModelForm(BaseModel):
|
|
model: str
|
|
|
|
|
|
def set_model_handler(model: str):
|
|
r = requests.get(url=f"{app.state.AUTOMATIC1111_BASE_URL}/sdapi/v1/options")
|
|
options = r.json()
|
|
|
|
if model != options["sd_model_checkpoint"]:
|
|
options["sd_model_checkpoint"] = model
|
|
r = requests.post(
|
|
url=f"{app.state.AUTOMATIC1111_BASE_URL}/sdapi/v1/options", json=options
|
|
)
|
|
|
|
return options
|
|
|
|
|
|
@app.post("/models/default/update")
|
|
def update_default_model(
|
|
form_data: UpdateModelForm,
|
|
user=Depends(get_current_user),
|
|
):
|
|
return set_model_handler(form_data.model)
|
|
|
|
|
|
class GenerateImageForm(BaseModel):
|
|
model: Optional[str] = None
|
|
prompt: str
|
|
n: int = 1
|
|
size: str = "512x512"
|
|
negative_prompt: Optional[str] = None
|
|
|
|
|
|
@app.post("/generations")
|
|
def generate_image(
|
|
form_data: GenerateImageForm,
|
|
user=Depends(get_current_user),
|
|
):
|
|
|
|
print(form_data)
|
|
|
|
try:
|
|
if form_data.model:
|
|
set_model_handler(form_data.model)
|
|
|
|
width, height = tuple(map(int, app.state.IMAGE_SIZE.split("x")))
|
|
|
|
data = {
|
|
"prompt": form_data.prompt,
|
|
"batch_size": form_data.n,
|
|
"width": width,
|
|
"height": height,
|
|
}
|
|
|
|
if app.state.IMAGE_STEPS != None:
|
|
data["steps"] = app.state.IMAGE_STEPS
|
|
|
|
if form_data.negative_prompt != None:
|
|
data["negative_prompt"] = form_data.negative_prompt
|
|
|
|
print(data)
|
|
|
|
r = requests.post(
|
|
url=f"{app.state.AUTOMATIC1111_BASE_URL}/sdapi/v1/txt2img",
|
|
json=data,
|
|
)
|
|
|
|
return r.json()
|
|
except Exception as e:
|
|
print(e)
|
|
raise HTTPException(status_code=400, detail=ERROR_MESSAGES.DEFAULT(e))
|