diff --git a/backend/open_webui/routers/pipelines.py b/backend/open_webui/routers/pipelines.py index 7a42acffc..20fcd75ee 100644 --- a/backend/open_webui/routers/pipelines.py +++ b/backend/open_webui/routers/pipelines.py @@ -13,7 +13,6 @@ import aiohttp import os import logging import shutil -import requests from pydantic import BaseModel from starlette.responses import FileResponse from typing import Optional @@ -217,7 +216,7 @@ async def upload_pipeline( os.makedirs(upload_folder, exist_ok=True) file_path = os.path.join(upload_folder, filename) - r = None + response = None try: # Save the uploaded file with open(file_path, "wb") as buffer: @@ -226,16 +225,25 @@ async def upload_pipeline( url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] key = request.app.state.config.OPENAI_API_KEYS[urlIdx] - with open(file_path, "rb") as f: - files = {"file": f} - r = requests.post( - f"{url}/pipelines/upload", - headers={"Authorization": f"Bearer {key}"}, - files=files, + headers = {"Authorization": f"Bearer {key}"} + + async with aiohttp.ClientSession(trust_env=True) as session: + form_data = aiohttp.FormData() + form_data.add_field( + "file", + open(file_path, "rb"), + filename=filename, + content_type="application/octet-stream", ) - r.raise_for_status() - data = r.json() + async with session.post( + f"{url}/pipelines/upload", + headers=headers, + data=form_data, + ssl=AIOHTTP_CLIENT_SESSION_SSL, + ) as response: + response.raise_for_status() + data = await response.json() return {**data} except Exception as e: @@ -244,10 +252,10 @@ async def upload_pipeline( detail = None status_code = status.HTTP_404_NOT_FOUND - if r is not None: - status_code = r.status_code + if response is not None: + status_code = response.status try: - res = r.json() + res = await response.json() if "detail" in res: detail = res["detail"] except Exception: @@ -272,21 +280,22 @@ class AddPipelineForm(BaseModel): async def add_pipeline( request: Request, form_data: AddPipelineForm, user=Depends(get_admin_user) ): - r = None + response = None try: urlIdx = form_data.urlIdx url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] key = request.app.state.config.OPENAI_API_KEYS[urlIdx] - r = requests.post( - f"{url}/pipelines/add", - headers={"Authorization": f"Bearer {key}"}, - json={"url": form_data.url}, - ) - - r.raise_for_status() - data = r.json() + async with aiohttp.ClientSession(trust_env=True) as session: + async with session.post( + f"{url}/pipelines/add", + headers={"Authorization": f"Bearer {key}"}, + json={"url": form_data.url}, + ssl=AIOHTTP_CLIENT_SESSION_SSL, + ) as response: + response.raise_for_status() + data = await response.json() return {**data} except Exception as e: @@ -294,16 +303,18 @@ async def add_pipeline( log.exception(f"Connection error: {e}") detail = None - if r is not None: + if response is not None: try: - res = r.json() + res = await response.json() if "detail" in res: detail = res["detail"] except Exception: pass raise HTTPException( - status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), + status_code=( + response.status if response is not None else status.HTTP_404_NOT_FOUND + ), detail=detail if detail else "Pipeline not found", ) @@ -317,21 +328,22 @@ class DeletePipelineForm(BaseModel): async def delete_pipeline( request: Request, form_data: DeletePipelineForm, user=Depends(get_admin_user) ): - r = None + response = None try: urlIdx = form_data.urlIdx url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] key = request.app.state.config.OPENAI_API_KEYS[urlIdx] - r = requests.delete( - f"{url}/pipelines/delete", - headers={"Authorization": f"Bearer {key}"}, - json={"id": form_data.id}, - ) - - r.raise_for_status() - data = r.json() + async with aiohttp.ClientSession(trust_env=True) as session: + async with session.delete( + f"{url}/pipelines/delete", + headers={"Authorization": f"Bearer {key}"}, + json={"id": form_data.id}, + ssl=AIOHTTP_CLIENT_SESSION_SSL, + ) as response: + response.raise_for_status() + data = await response.json() return {**data} except Exception as e: @@ -339,16 +351,18 @@ async def delete_pipeline( log.exception(f"Connection error: {e}") detail = None - if r is not None: + if response is not None: try: - res = r.json() + res = await response.json() if "detail" in res: detail = res["detail"] except Exception: pass raise HTTPException( - status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), + status_code=( + response.status if response is not None else status.HTTP_404_NOT_FOUND + ), detail=detail if detail else "Pipeline not found", ) @@ -357,15 +371,19 @@ async def delete_pipeline( async def get_pipelines( request: Request, urlIdx: Optional[int] = None, user=Depends(get_admin_user) ): - r = None + response = None try: url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] key = request.app.state.config.OPENAI_API_KEYS[urlIdx] - r = requests.get(f"{url}/pipelines", headers={"Authorization": f"Bearer {key}"}) - - r.raise_for_status() - data = r.json() + async with aiohttp.ClientSession(trust_env=True) as session: + async with session.get( + f"{url}/pipelines", + headers={"Authorization": f"Bearer {key}"}, + ssl=AIOHTTP_CLIENT_SESSION_SSL, + ) as response: + response.raise_for_status() + data = await response.json() return {**data} except Exception as e: @@ -373,16 +391,18 @@ async def get_pipelines( log.exception(f"Connection error: {e}") detail = None - if r is not None: + if response is not None: try: - res = r.json() + res = await response.json() if "detail" in res: detail = res["detail"] except Exception: pass raise HTTPException( - status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), + status_code=( + response.status if response is not None else status.HTTP_404_NOT_FOUND + ), detail=detail if detail else "Pipeline not found", ) @@ -394,17 +414,19 @@ async def get_pipeline_valves( pipeline_id: str, user=Depends(get_admin_user), ): - r = None + response = None try: url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] key = request.app.state.config.OPENAI_API_KEYS[urlIdx] - r = requests.get( - f"{url}/{pipeline_id}/valves", headers={"Authorization": f"Bearer {key}"} - ) - - r.raise_for_status() - data = r.json() + async with aiohttp.ClientSession(trust_env=True) as session: + async with session.get( + f"{url}/{pipeline_id}/valves", + headers={"Authorization": f"Bearer {key}"}, + ssl=AIOHTTP_CLIENT_SESSION_SSL, + ) as response: + response.raise_for_status() + data = await response.json() return {**data} except Exception as e: @@ -412,16 +434,18 @@ async def get_pipeline_valves( log.exception(f"Connection error: {e}") detail = None - if r is not None: + if response is not None: try: - res = r.json() + res = await response.json() if "detail" in res: detail = res["detail"] except Exception: pass raise HTTPException( - status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), + status_code=( + response.status if response is not None else status.HTTP_404_NOT_FOUND + ), detail=detail if detail else "Pipeline not found", ) @@ -433,18 +457,19 @@ async def get_pipeline_valves_spec( pipeline_id: str, user=Depends(get_admin_user), ): - r = None + response = None try: url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] key = request.app.state.config.OPENAI_API_KEYS[urlIdx] - r = requests.get( - f"{url}/{pipeline_id}/valves/spec", - headers={"Authorization": f"Bearer {key}"}, - ) - - r.raise_for_status() - data = r.json() + async with aiohttp.ClientSession(trust_env=True) as session: + async with session.get( + f"{url}/{pipeline_id}/valves/spec", + headers={"Authorization": f"Bearer {key}"}, + ssl=AIOHTTP_CLIENT_SESSION_SSL, + ) as response: + response.raise_for_status() + data = await response.json() return {**data} except Exception as e: @@ -452,16 +477,18 @@ async def get_pipeline_valves_spec( log.exception(f"Connection error: {e}") detail = None - if r is not None: + if response is not None: try: - res = r.json() + res = await response.json() if "detail" in res: detail = res["detail"] except Exception: pass raise HTTPException( - status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), + status_code=( + response.status if response is not None else status.HTTP_404_NOT_FOUND + ), detail=detail if detail else "Pipeline not found", ) @@ -474,19 +501,20 @@ async def update_pipeline_valves( form_data: dict, user=Depends(get_admin_user), ): - r = None + response = None try: url = request.app.state.config.OPENAI_API_BASE_URLS[urlIdx] key = request.app.state.config.OPENAI_API_KEYS[urlIdx] - r = requests.post( - f"{url}/{pipeline_id}/valves/update", - headers={"Authorization": f"Bearer {key}"}, - json={**form_data}, - ) - - r.raise_for_status() - data = r.json() + async with aiohttp.ClientSession(trust_env=True) as session: + async with session.post( + f"{url}/{pipeline_id}/valves/update", + headers={"Authorization": f"Bearer {key}"}, + json={**form_data}, + ssl=AIOHTTP_CLIENT_SESSION_SSL, + ) as response: + response.raise_for_status() + data = await response.json() return {**data} except Exception as e: @@ -495,15 +523,17 @@ async def update_pipeline_valves( detail = None - if r is not None: + if response is not None: try: - res = r.json() + res = await response.json() if "detail" in res: detail = res["detail"] except Exception: pass raise HTTPException( - status_code=(r.status_code if r is not None else status.HTTP_404_NOT_FOUND), + status_code=( + response.status if response is not None else status.HTTP_404_NOT_FOUND + ), detail=detail if detail else "Pipeline not found", )