refac: async pipelines requests

This commit is contained in:
Timothy Jaeryang Baek
2026-02-11 18:24:30 -06:00
parent d02e826c9d
commit 2372b70031

View File

@@ -13,7 +13,6 @@ import aiohttp
import os
import logging
import shutil
import requests
from pydantic import BaseModel
from starlette.responses import FileResponse
from typing import Optional
@@ -217,7 +216,7 @@ async def upload_pipeline(
os.makedirs(upload_folder, exist_ok=True)
file_path = os.path.join(upload_folder, filename)
r = None
response = None
try:
# Save the uploaded file
with open(file_path, "wb") as buffer:
@@ -226,16 +225,25 @@ async def upload_pipeline(
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
with open(file_path, "rb") as f:
files = {"file": f}
r = requests.post(
f"{url}/pipelines/upload",
headers={"Authorization": f"Bearer {key}"},
files=files,
headers = {"Authorization": f"Bearer {key}"}
async with aiohttp.ClientSession(trust_env=True) as session:
form_data = aiohttp.FormData()
form_data.add_field(
"file",
open(file_path, "rb"),
filename=filename,
content_type="application/octet-stream",
)
r.raise_for_status()
data = r.json()
async with session.post(
f"{url}/pipelines/upload",
headers=headers,
data=form_data,
ssl=AIOHTTP_CLIENT_SESSION_SSL,
) as response:
response.raise_for_status()
data = await response.json()
return {**data}
except Exception as e:
@@ -244,10 +252,10 @@ async def upload_pipeline(
detail = None
status_code = status.HTTP_404_NOT_FOUND
if r is not None:
status_code = r.status_code
if response is not None:
status_code = response.status
try:
res = r.json()
res = await response.json()
if "detail" in res:
detail = res["detail"]
except Exception:
@@ -272,21 +280,22 @@ class AddPipelineForm(BaseModel):
async def add_pipeline(
request: Request, form_data: AddPipelineForm, user=Depends(get_admin_user)
):
r = None
response = None
try:
urlIdx = form_data.urlIdx
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
r = requests.post(
f"{url}/pipelines/add",
headers={"Authorization": f"Bearer {key}"},
json={"url": form_data.url},
)
r.raise_for_status()
data = r.json()
async with aiohttp.ClientSession(trust_env=True) as session:
async with session.post(
f"{url}/pipelines/add",
headers={"Authorization": f"Bearer {key}"},
json={"url": form_data.url},
ssl=AIOHTTP_CLIENT_SESSION_SSL,
) as response:
response.raise_for_status()
data = await response.json()
return {**data}
except Exception as e:
@@ -294,16 +303,18 @@ async def add_pipeline(
log.exception(f"Connection error: {e}")
detail = None
if r is not None:
if response is not None:
try:
res = r.json()
res = await response.json()
if "detail" in res:
detail = res["detail"]
except Exception:
pass
raise HTTPException(
status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
status_code=(
response.status if response is not None else status.HTTP_404_NOT_FOUND
),
detail=detail if detail else "Pipeline not found",
)
@@ -317,21 +328,22 @@ class DeletePipelineForm(BaseModel):
async def delete_pipeline(
request: Request, form_data: DeletePipelineForm, user=Depends(get_admin_user)
):
r = None
response = None
try:
urlIdx = form_data.urlIdx
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
r = requests.delete(
f"{url}/pipelines/delete",
headers={"Authorization": f"Bearer {key}"},
json={"id": form_data.id},
)
r.raise_for_status()
data = r.json()
async with aiohttp.ClientSession(trust_env=True) as session:
async with session.delete(
f"{url}/pipelines/delete",
headers={"Authorization": f"Bearer {key}"},
json={"id": form_data.id},
ssl=AIOHTTP_CLIENT_SESSION_SSL,
) as response:
response.raise_for_status()
data = await response.json()
return {**data}
except Exception as e:
@@ -339,16 +351,18 @@ async def delete_pipeline(
log.exception(f"Connection error: {e}")
detail = None
if r is not None:
if response is not None:
try:
res = r.json()
res = await response.json()
if "detail" in res:
detail = res["detail"]
except Exception:
pass
raise HTTPException(
status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
status_code=(
response.status if response is not None else status.HTTP_404_NOT_FOUND
),
detail=detail if detail else "Pipeline not found",
)
@@ -357,15 +371,19 @@ async def delete_pipeline(
async def get_pipelines(
request: Request, urlIdx: Optional[int] = None, user=Depends(get_admin_user)
):
r = None
response = None
try:
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
r = requests.get(f"{url}/pipelines", headers={"Authorization": f"Bearer {key}"})
r.raise_for_status()
data = r.json()
async with aiohttp.ClientSession(trust_env=True) as session:
async with session.get(
f"{url}/pipelines",
headers={"Authorization": f"Bearer {key}"},
ssl=AIOHTTP_CLIENT_SESSION_SSL,
) as response:
response.raise_for_status()
data = await response.json()
return {**data}
except Exception as e:
@@ -373,16 +391,18 @@ async def get_pipelines(
log.exception(f"Connection error: {e}")
detail = None
if r is not None:
if response is not None:
try:
res = r.json()
res = await response.json()
if "detail" in res:
detail = res["detail"]
except Exception:
pass
raise HTTPException(
status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
status_code=(
response.status if response is not None else status.HTTP_404_NOT_FOUND
),
detail=detail if detail else "Pipeline not found",
)
@@ -394,17 +414,19 @@ async def get_pipeline_valves(
pipeline_id: str,
user=Depends(get_admin_user),
):
r = None
response = None
try:
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
r = requests.get(
f"{url}/{pipeline_id}/valves", headers={"Authorization": f"Bearer {key}"}
)
r.raise_for_status()
data = r.json()
async with aiohttp.ClientSession(trust_env=True) as session:
async with session.get(
f"{url}/{pipeline_id}/valves",
headers={"Authorization": f"Bearer {key}"},
ssl=AIOHTTP_CLIENT_SESSION_SSL,
) as response:
response.raise_for_status()
data = await response.json()
return {**data}
except Exception as e:
@@ -412,16 +434,18 @@ async def get_pipeline_valves(
log.exception(f"Connection error: {e}")
detail = None
if r is not None:
if response is not None:
try:
res = r.json()
res = await response.json()
if "detail" in res:
detail = res["detail"]
except Exception:
pass
raise HTTPException(
status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
status_code=(
response.status if response is not None else status.HTTP_404_NOT_FOUND
),
detail=detail if detail else "Pipeline not found",
)
@@ -433,18 +457,19 @@ async def get_pipeline_valves_spec(
pipeline_id: str,
user=Depends(get_admin_user),
):
r = None
response = None
try:
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
r = requests.get(
f"{url}/{pipeline_id}/valves/spec",
headers={"Authorization": f"Bearer {key}"},
)
r.raise_for_status()
data = r.json()
async with aiohttp.ClientSession(trust_env=True) as session:
async with session.get(
f"{url}/{pipeline_id}/valves/spec",
headers={"Authorization": f"Bearer {key}"},
ssl=AIOHTTP_CLIENT_SESSION_SSL,
) as response:
response.raise_for_status()
data = await response.json()
return {**data}
except Exception as e:
@@ -452,16 +477,18 @@ async def get_pipeline_valves_spec(
log.exception(f"Connection error: {e}")
detail = None
if r is not None:
if response is not None:
try:
res = r.json()
res = await response.json()
if "detail" in res:
detail = res["detail"]
except Exception:
pass
raise HTTPException(
status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
status_code=(
response.status if response is not None else status.HTTP_404_NOT_FOUND
),
detail=detail if detail else "Pipeline not found",
)
@@ -474,19 +501,20 @@ async def update_pipeline_valves(
form_data: dict,
user=Depends(get_admin_user),
):
r = None
response = None
try:
url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx]
key = request.app.state.config.OPENAI_API_KEYS[urlIdx]
r = requests.post(
f"{url}/{pipeline_id}/valves/update",
headers={"Authorization": f"Bearer {key}"},
json={**form_data},
)
r.raise_for_status()
data = r.json()
async with aiohttp.ClientSession(trust_env=True) as session:
async with session.post(
f"{url}/{pipeline_id}/valves/update",
headers={"Authorization": f"Bearer {key}"},
json={**form_data},
ssl=AIOHTTP_CLIENT_SESSION_SSL,
) as response:
response.raise_for_status()
data = await response.json()
return {**data}
except Exception as e:
@@ -495,15 +523,17 @@ async def update_pipeline_valves(
detail = None
if r is not None:
if response is not None:
try:
res = r.json()
res = await response.json()
if "detail" in res:
detail = res["detail"]
except Exception:
pass
raise HTTPException(
status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND),
status_code=(
response.status if response is not None else status.HTTP_404_NOT_FOUND
),
detail=detail if detail else "Pipeline not found",
)