diff --git a/backend/open_webui/storage/base_storage_provider.py b/backend/open_webui/storage/base_storage_provider.py index 2cf982bd3..08ed2256d 100644 --- a/backend/open_webui/storage/base_storage_provider.py +++ b/backend/open_webui/storage/base_storage_provider.py @@ -1,6 +1,6 @@ import abc import os -from typing import BinaryIO, Iterator, Tuple +from typing import AsyncIterator, BinaryIO, Iterator, Tuple from typing import BinaryIO, Tuple @@ -41,7 +41,7 @@ class StorageProvider(abc.ABC): """Uploads a file to the storage and returns the file content bytes and path.""" @abc.abstractmethod - async def get_file(self, file_path: str) -> Iterator[bytes]: + async def get_file(self, file_path: str) -> AsyncIterator[bytes]: """Downloads file content""" @abc.abstractmethod diff --git a/backend/open_webui/storage/local_storage_provider.py b/backend/open_webui/storage/local_storage_provider.py index a8281b1c9..cdc52e7cd 100644 --- a/backend/open_webui/storage/local_storage_provider.py +++ b/backend/open_webui/storage/local_storage_provider.py @@ -2,13 +2,13 @@ import os import shutil -from typing import AsyncGenerator, AsyncIterator, BinaryIO, Tuple +from typing import AsyncGenerator, AsyncIterator, BinaryIO, Generator, Iterator, Tuple from open_webui.constants import ERROR_MESSAGES from open_webui.config import UPLOAD_DIR from smart_open import open -from openwebui.backend.open_webui.storage.base_storage_provider import LocalFile, StorageProvider +from open_webui.storage.base_storage_provider import LocalFile, StorageProvider class LocalStorageProvider(StorageProvider): async def upload_file(self, file: BinaryIO, filename: str) -> Tuple[bytes, str]: @@ -22,7 +22,7 @@ class LocalStorageProvider(StorageProvider): f.write(contents) return contents, file_path - async def get_file(self, file_path: str) -> AsyncGenerator[bytes]: + async def get_file(self, file_path: str) -> AsyncIterator[bytes]: chunk_size = 8 * 1024 with open(file_path, 'rb') as file: while True: diff --git a/backend/open_webui/storage/provider.py b/backend/open_webui/storage/provider.py index c878f0dbb..f25a506aa 100644 --- a/backend/open_webui/storage/provider.py +++ b/backend/open_webui/storage/provider.py @@ -1,9 +1,9 @@ from open_webui.constants import ERROR_MESSAGES from open_webui.config import STORAGE_PROVIDER -from openwebui.backend.open_webui.storage.base_storage_provider import StorageProvider -from openwebui.backend.open_webui.storage.local_storage_provider import LocalStorageProvider -from openwebui.backend.open_webui.storage.s3_storage_provider import S3StorageProvider +from open_webui.storage.base_storage_provider import StorageProvider +from open_webui.storage.local_storage_provider import LocalStorageProvider +from open_webui.storage.s3_storage_provider import S3StorageProvider def get_storage_provider() -> StorageProvider: if STORAGE_PROVIDER == "s3": diff --git a/backend/open_webui/storage/s3_storage_provider.py b/backend/open_webui/storage/s3_storage_provider.py index d60cb40d6..cbf62ae59 100644 --- a/backend/open_webui/storage/s3_storage_provider.py +++ b/backend/open_webui/storage/s3_storage_provider.py @@ -1,7 +1,6 @@ import os import aioboto3 - from typing import AsyncGenerator, AsyncIterator, BinaryIO, Iterator, Tuple, Optional from open_webui.constants import ERROR_MESSAGES @@ -15,13 +14,10 @@ from open_webui.config import ( S3_ENDPOINT_URL, UPLOAD_DIR, ) -from mypy_boto3_s3.client import S3Client -from smart_open import open -import boto3 from typing import BinaryIO, Tuple, Optional -from openwebui.backend.open_webui.storage.base_storage_provider import LocalCachedFile, StorageProvider +from open_webui.storage.base_storage_provider import LocalCachedFile, StorageProvider class S3StorageProvider(StorageProvider): def __init__(self): @@ -46,19 +42,21 @@ class S3StorageProvider(StorageProvider): raise ValueError(ERROR_MESSAGES.EMPTY_CONTENT) try: - await self.get_client().put_object(Bucket=self.bucket_name, Key=f"{self.bucket_prefix}/{filename}", Body=contents) + async with self.get_client() as client: + await client.put_object(Bucket=self.bucket_name, Key=f"{self.bucket_prefix}/{filename}", Body=contents) return contents, f"s3://{self.bucket_name}/{self.bucket_prefix}/{filename}" except Exception as e: raise RuntimeError(f"Error uploading file to S3: {e}") - async def get_file(self, file_path: str) : + async def get_file(self, file_path: str) -> AsyncIterator[bytes]: """Downloads a file from S3 and returns the local file path.""" try: bucket_name, key = file_path.split("//")[1].split("/", 1) # local_file_path = f"{S3_LOCAL_CACHE_DIR}/{key}" # os.makedirs(os.path.dirname(local_file_path), exist_ok=True) - response = await self.get_client().get_object(Bucket=bucket_name, Key=key) - return response.get("Body").iter_chunks() + async with self.get_client() as client: + response = await client.get_object(Bucket=bucket_name, Key=key) + return response.get("Body").iter_chunks() except Exception as e: raise RuntimeError(f"Error downloading file {file_path} from S3: {e}") @@ -67,7 +65,8 @@ class S3StorageProvider(StorageProvider): bucket_name, key = file_path.split("//")[1].split("/", 1) local_file_path = f"{S3_LOCAL_CACHE_DIR}/{key}" os.makedirs(os.path.dirname(local_file_path), exist_ok=True) - await self.get_client().download_file(bucket_name, key, local_file_path) + async with self.get_client() as client: + await client.download_file(bucket_name, key, local_file_path) print(f"download s3 file to {local_file_path}") return LocalCachedFile(local_file_path) except Exception as e: @@ -76,18 +75,18 @@ class S3StorageProvider(StorageProvider): async def delete_file(self, filename: str) -> None: """Deletes a file from S3.""" try: - await self.get_client().delete_object(Bucket=self.bucket_name, Key=filename) + async with self.get_client() as client: + await client.delete_object(Bucket=self.bucket_name, Key=filename) except Exception as e: raise RuntimeError(f"Error deleting file {filename} from S3: {e}") async def delete_all_files(self) -> None: """Deletes all files from S3.""" try: - response = await self.get_client().list_objects_v2(Bucket=self.bucket_name, Prefix=self.bucket_prefix) - if "Contents" in response: - for content in response["Contents"]: - await self.get_client().delete_object( - Bucket=self.bucket_name, Key=content["Key"] - ) + async with self.get_client() as client: + response = await client.list_objects_v2(Bucket=self.bucket_name, Prefix=self.bucket_prefix) + if "Contents" in response: + for content in response["Contents"]: + await client.delete_object(Bucket=self.bucket_name, Key=content["Key"]) except Exception as e: raise RuntimeError(f"Error deleting all files from S3: {e}")