mirror of
https://github.com/open-webui/open-webui
synced 2025-02-22 21:32:32 +00:00
Use async method from aioboto3
This commit is contained in:
parent
63e996fb9d
commit
15e4ef01a8
@ -1,6 +1,6 @@
|
||||
import abc
|
||||
import os
|
||||
from typing import BinaryIO, Iterator, Tuple
|
||||
from typing import AsyncIterator, BinaryIO, Iterator, Tuple
|
||||
|
||||
from typing import BinaryIO, Tuple
|
||||
|
||||
@ -41,7 +41,7 @@ class StorageProvider(abc.ABC):
|
||||
"""Uploads a file to the storage and returns the file content bytes and path."""
|
||||
|
||||
@abc.abstractmethod
|
||||
async def get_file(self, file_path: str) -> Iterator[bytes]:
|
||||
async def get_file(self, file_path: str) -> AsyncIterator[bytes]:
|
||||
"""Downloads file content"""
|
||||
|
||||
@abc.abstractmethod
|
||||
|
@ -2,13 +2,13 @@ import os
|
||||
import shutil
|
||||
|
||||
|
||||
from typing import AsyncGenerator, AsyncIterator, BinaryIO, Tuple
|
||||
from typing import AsyncGenerator, AsyncIterator, BinaryIO, Generator, Iterator, Tuple
|
||||
|
||||
from open_webui.constants import ERROR_MESSAGES
|
||||
from open_webui.config import UPLOAD_DIR
|
||||
from smart_open import open
|
||||
|
||||
from openwebui.backend.open_webui.storage.base_storage_provider import LocalFile, StorageProvider
|
||||
from open_webui.storage.base_storage_provider import LocalFile, StorageProvider
|
||||
|
||||
class LocalStorageProvider(StorageProvider):
|
||||
async def upload_file(self, file: BinaryIO, filename: str) -> Tuple[bytes, str]:
|
||||
@ -22,7 +22,7 @@ class LocalStorageProvider(StorageProvider):
|
||||
f.write(contents)
|
||||
return contents, file_path
|
||||
|
||||
async def get_file(self, file_path: str) -> AsyncGenerator[bytes]:
|
||||
async def get_file(self, file_path: str) -> AsyncIterator[bytes]:
|
||||
chunk_size = 8 * 1024
|
||||
with open(file_path, 'rb') as file:
|
||||
while True:
|
||||
|
@ -1,9 +1,9 @@
|
||||
from open_webui.constants import ERROR_MESSAGES
|
||||
from open_webui.config import STORAGE_PROVIDER
|
||||
|
||||
from openwebui.backend.open_webui.storage.base_storage_provider import StorageProvider
|
||||
from openwebui.backend.open_webui.storage.local_storage_provider import LocalStorageProvider
|
||||
from openwebui.backend.open_webui.storage.s3_storage_provider import S3StorageProvider
|
||||
from open_webui.storage.base_storage_provider import StorageProvider
|
||||
from open_webui.storage.local_storage_provider import LocalStorageProvider
|
||||
from open_webui.storage.s3_storage_provider import S3StorageProvider
|
||||
|
||||
def get_storage_provider() -> StorageProvider:
|
||||
if STORAGE_PROVIDER == "s3":
|
||||
|
@ -1,7 +1,6 @@
|
||||
import os
|
||||
import aioboto3
|
||||
|
||||
|
||||
from typing import AsyncGenerator, AsyncIterator, BinaryIO, Iterator, Tuple, Optional
|
||||
|
||||
from open_webui.constants import ERROR_MESSAGES
|
||||
@ -15,13 +14,10 @@ from open_webui.config import (
|
||||
S3_ENDPOINT_URL,
|
||||
UPLOAD_DIR,
|
||||
)
|
||||
from mypy_boto3_s3.client import S3Client
|
||||
from smart_open import open
|
||||
import boto3
|
||||
from typing import BinaryIO, Tuple, Optional
|
||||
|
||||
|
||||
from openwebui.backend.open_webui.storage.base_storage_provider import LocalCachedFile, StorageProvider
|
||||
from open_webui.storage.base_storage_provider import LocalCachedFile, StorageProvider
|
||||
|
||||
class S3StorageProvider(StorageProvider):
|
||||
def __init__(self):
|
||||
@ -46,19 +42,21 @@ class S3StorageProvider(StorageProvider):
|
||||
raise ValueError(ERROR_MESSAGES.EMPTY_CONTENT)
|
||||
|
||||
try:
|
||||
await self.get_client().put_object(Bucket=self.bucket_name, Key=f"{self.bucket_prefix}/{filename}", Body=contents)
|
||||
async with self.get_client() as client:
|
||||
await client.put_object(Bucket=self.bucket_name, Key=f"{self.bucket_prefix}/{filename}", Body=contents)
|
||||
return contents, f"s3://{self.bucket_name}/{self.bucket_prefix}/{filename}"
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Error uploading file to S3: {e}")
|
||||
|
||||
async def get_file(self, file_path: str) :
|
||||
async def get_file(self, file_path: str) -> AsyncIterator[bytes]:
|
||||
"""Downloads a file from S3 and returns the local file path."""
|
||||
try:
|
||||
bucket_name, key = file_path.split("//")[1].split("/", 1)
|
||||
# local_file_path = f"{S3_LOCAL_CACHE_DIR}/{key}"
|
||||
# os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
|
||||
response = await self.get_client().get_object(Bucket=bucket_name, Key=key)
|
||||
return response.get("Body").iter_chunks()
|
||||
async with self.get_client() as client:
|
||||
response = await client.get_object(Bucket=bucket_name, Key=key)
|
||||
return response.get("Body").iter_chunks()
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Error downloading file {file_path} from S3: {e}")
|
||||
|
||||
@ -67,7 +65,8 @@ class S3StorageProvider(StorageProvider):
|
||||
bucket_name, key = file_path.split("//")[1].split("/", 1)
|
||||
local_file_path = f"{S3_LOCAL_CACHE_DIR}/{key}"
|
||||
os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
|
||||
await self.get_client().download_file(bucket_name, key, local_file_path)
|
||||
async with self.get_client() as client:
|
||||
await client.download_file(bucket_name, key, local_file_path)
|
||||
print(f"download s3 file to {local_file_path}")
|
||||
return LocalCachedFile(local_file_path)
|
||||
except Exception as e:
|
||||
@ -76,18 +75,18 @@ class S3StorageProvider(StorageProvider):
|
||||
async def delete_file(self, filename: str) -> None:
|
||||
"""Deletes a file from S3."""
|
||||
try:
|
||||
await self.get_client().delete_object(Bucket=self.bucket_name, Key=filename)
|
||||
async with self.get_client() as client:
|
||||
await client.delete_object(Bucket=self.bucket_name, Key=filename)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Error deleting file {filename} from S3: {e}")
|
||||
|
||||
async def delete_all_files(self) -> None:
|
||||
"""Deletes all files from S3."""
|
||||
try:
|
||||
response = await self.get_client().list_objects_v2(Bucket=self.bucket_name, Prefix=self.bucket_prefix)
|
||||
if "Contents" in response:
|
||||
for content in response["Contents"]:
|
||||
await self.get_client().delete_object(
|
||||
Bucket=self.bucket_name, Key=content["Key"]
|
||||
)
|
||||
async with self.get_client() as client:
|
||||
response = await client.list_objects_v2(Bucket=self.bucket_name, Prefix=self.bucket_prefix)
|
||||
if "Contents" in response:
|
||||
for content in response["Contents"]:
|
||||
await client.delete_object(Bucket=self.bucket_name, Key=content["Key"])
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Error deleting all files from S3: {e}")
|
||||
|
Loading…
Reference in New Issue
Block a user