Update mistral.py

This commit is contained in:
PVBLIC Foundation 2025-05-23 10:00:19 -07:00 committed by GitHub
parent 86e24bb4aa
commit bf193dfb5d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,8 +1,12 @@
import requests import requests
import aiohttp
import asyncio
import logging import logging
import os import os
import sys import sys
import time
from typing import List, Dict, Any from typing import List, Dict, Any
from contextlib import asynccontextmanager
from langchain_core.documents import Document from langchain_core.documents import Document
from open_webui.env import SRC_LOG_LEVELS, GLOBAL_LOG_LEVEL from open_webui.env import SRC_LOG_LEVELS, GLOBAL_LOG_LEVEL
@ -14,18 +18,29 @@ log.setLevel(SRC_LOG_LEVELS["RAG"])
class MistralLoader: class MistralLoader:
""" """
Enhanced Mistral OCR loader with both sync and async support.
Loads documents by processing them through the Mistral OCR API. Loads documents by processing them through the Mistral OCR API.
""" """
BASE_API_URL = "https://api.mistral.ai/v1" BASE_API_URL = "https://api.mistral.ai/v1"
def __init__(self, api_key: str, file_path: str): def __init__(
self,
api_key: str,
file_path: str,
timeout: int = 300, # 5 minutes default
max_retries: int = 3,
enable_debug_logging: bool = False
):
""" """
Initializes the loader. Initializes the loader with enhanced features.
Args: Args:
api_key: Your Mistral API key. api_key: Your Mistral API key.
file_path: The local path to the PDF file to process. file_path: The local path to the PDF file to process.
timeout: Request timeout in seconds.
max_retries: Maximum number of retry attempts.
enable_debug_logging: Enable detailed debug logs.
""" """
if not api_key: if not api_key:
raise ValueError("API key cannot be empty.") raise ValueError("API key cannot be empty.")
@ -34,7 +49,23 @@ class MistralLoader:
self.api_key = api_key self.api_key = api_key
self.file_path = file_path self.file_path = file_path
self.headers = {"Authorization": f"Bearer {self.api_key}"} self.timeout = timeout
self.max_retries = max_retries
self.debug = enable_debug_logging
# Pre-compute file info for performance
self.file_name = os.path.basename(file_path)
self.file_size = os.path.getsize(file_path)
self.headers = {
"Authorization": f"Bearer {self.api_key}",
"User-Agent": "OpenWebUI-MistralLoader/2.0"
}
def _debug_log(self, message: str, *args) -> None:
"""Conditional debug logging for performance."""
if self.debug:
log.debug(message, *args)
def _handle_response(self, response: requests.Response) -> Dict[str, Any]: def _handle_response(self, response: requests.Response) -> Dict[str, Any]:
"""Checks response status and returns JSON content.""" """Checks response status and returns JSON content."""
@ -54,24 +85,81 @@ class MistralLoader:
log.error(f"JSON decode error: {json_err} - Response: {response.text}") log.error(f"JSON decode error: {json_err} - Response: {response.text}")
raise # Re-raise after logging raise # Re-raise after logging
async def _handle_response_async(self, response: aiohttp.ClientResponse) -> Dict[str, Any]:
"""Async version of response handling with better error info."""
try:
response.raise_for_status()
# Check content type
content_type = response.headers.get('content-type', '')
if 'application/json' not in content_type:
if response.status == 204:
return {}
text = await response.text()
raise ValueError(f"Unexpected content type: {content_type}, body: {text[:200]}...")
return await response.json()
except aiohttp.ClientResponseError as e:
error_text = await response.text() if response else "No response"
log.error(f"HTTP {e.status}: {e.message} - Response: {error_text[:500]}")
raise
except aiohttp.ClientError as e:
log.error(f"Client error: {e}")
raise
except Exception as e:
log.error(f"Unexpected error processing response: {e}")
raise
def _retry_request_sync(self, request_func, *args, **kwargs):
"""Synchronous retry logic with exponential backoff."""
for attempt in range(self.max_retries):
try:
return request_func(*args, **kwargs)
except (requests.exceptions.RequestException, Exception) as e:
if attempt == self.max_retries - 1:
raise
wait_time = (2 ** attempt) + 0.5
log.warning(f"Request failed (attempt {attempt + 1}/{self.max_retries}): {e}. Retrying in {wait_time}s...")
time.sleep(wait_time)
async def _retry_request_async(self, request_func, *args, **kwargs):
"""Async retry logic with exponential backoff."""
for attempt in range(self.max_retries):
try:
return await request_func(*args, **kwargs)
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt == self.max_retries - 1:
raise
wait_time = (2 ** attempt) + 0.5
log.warning(f"Request failed (attempt {attempt + 1}/{self.max_retries}): {e}. Retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
def _upload_file(self) -> str: def _upload_file(self) -> str:
"""Uploads the file to Mistral for OCR processing.""" """Uploads the file to Mistral for OCR processing (sync version)."""
log.info("Uploading file to Mistral API") log.info("Uploading file to Mistral API")
url = f"{self.BASE_API_URL}/files" url = f"{self.BASE_API_URL}/files"
file_name = os.path.basename(self.file_path) file_name = os.path.basename(self.file_path)
try: def upload_request():
with open(self.file_path, "rb") as f: with open(self.file_path, "rb") as f:
files = {"file": (file_name, f, "application/pdf")} files = {"file": (file_name, f, "application/pdf")}
data = {"purpose": "ocr"} data = {"purpose": "ocr"}
upload_headers = self.headers.copy() # Avoid modifying self.headers
response = requests.post( response = requests.post(
url, headers=upload_headers, files=files, data=data url,
headers=self.headers,
files=files,
data=data,
timeout=self.timeout
) )
response_data = self._handle_response(response) return self._handle_response(response)
try:
response_data = self._retry_request_sync(upload_request)
file_id = response_data.get("id") file_id = response_data.get("id")
if not file_id: if not file_id:
raise ValueError("File ID not found in upload response.") raise ValueError("File ID not found in upload response.")
@ -81,16 +169,63 @@ class MistralLoader:
log.error(f"Failed to upload file: {e}") log.error(f"Failed to upload file: {e}")
raise raise
async def _upload_file_async(self, session: aiohttp.ClientSession) -> str:
"""Async file upload with streaming for better memory efficiency."""
url = f"{self.BASE_API_URL}/files"
async def upload_request():
# Create multipart writer for streaming upload
writer = aiohttp.MultipartWriter('form-data')
# Add purpose field
purpose_part = writer.append('ocr')
purpose_part.set_content_disposition('form-data', name='purpose')
# Add file part with streaming
file_part = writer.append_payload(aiohttp.streams.FilePayload(
self.file_path,
filename=self.file_name,
content_type='application/pdf'
))
file_part.set_content_disposition('form-data', name='file', filename=self.file_name)
self._debug_log(f"Uploading file: {self.file_name} ({self.file_size:,} bytes)")
async with session.post(
url,
data=writer,
headers=self.headers,
timeout=aiohttp.ClientTimeout(total=self.timeout)
) as response:
return await self._handle_response_async(response)
response_data = await self._retry_request_async(upload_request)
file_id = response_data.get("id")
if not file_id:
raise ValueError("File ID not found in upload response.")
log.info(f"File uploaded successfully. File ID: {file_id}")
return file_id
def _get_signed_url(self, file_id: str) -> str: def _get_signed_url(self, file_id: str) -> str:
"""Retrieves a temporary signed URL for the uploaded file.""" """Retrieves a temporary signed URL for the uploaded file (sync version)."""
log.info(f"Getting signed URL for file ID: {file_id}") log.info(f"Getting signed URL for file ID: {file_id}")
url = f"{self.BASE_API_URL}/files/{file_id}/url" url = f"{self.BASE_API_URL}/files/{file_id}/url"
params = {"expiry": 1} params = {"expiry": 1}
signed_url_headers = {**self.headers, "Accept": "application/json"} signed_url_headers = {**self.headers, "Accept": "application/json"}
def url_request():
response = requests.get(
url,
headers=signed_url_headers,
params=params,
timeout=self.timeout
)
return self._handle_response(response)
try: try:
response = requests.get(url, headers=signed_url_headers, params=params) response_data = self._retry_request_sync(url_request)
response_data = self._handle_response(response)
signed_url = response_data.get("url") signed_url = response_data.get("url")
if not signed_url: if not signed_url:
raise ValueError("Signed URL not found in response.") raise ValueError("Signed URL not found in response.")
@ -100,8 +235,37 @@ class MistralLoader:
log.error(f"Failed to get signed URL: {e}") log.error(f"Failed to get signed URL: {e}")
raise raise
async def _get_signed_url_async(self, session: aiohttp.ClientSession, file_id: str) -> str:
"""Async signed URL retrieval."""
url = f"{self.BASE_API_URL}/files/{file_id}/url"
params = {"expiry": 1}
headers = {
**self.headers,
"Accept": "application/json"
}
async def url_request():
self._debug_log(f"Getting signed URL for file ID: {file_id}")
async with session.get(
url,
headers=headers,
params=params,
timeout=aiohttp.ClientTimeout(total=self.timeout)
) as response:
return await self._handle_response_async(response)
response_data = await self._retry_request_async(url_request)
signed_url = response_data.get("url")
if not signed_url:
raise ValueError("Signed URL not found in response.")
self._debug_log("Signed URL received successfully")
return signed_url
def _process_ocr(self, signed_url: str) -> Dict[str, Any]: def _process_ocr(self, signed_url: str) -> Dict[str, Any]:
"""Sends the signed URL to the OCR endpoint for processing.""" """Sends the signed URL to the OCR endpoint for processing (sync version)."""
log.info("Processing OCR via Mistral API") log.info("Processing OCR via Mistral API")
url = f"{self.BASE_API_URL}/ocr" url = f"{self.BASE_API_URL}/ocr"
ocr_headers = { ocr_headers = {
@ -118,43 +282,179 @@ class MistralLoader:
"include_image_base64": False, "include_image_base64": False,
} }
def ocr_request():
response = requests.post(
url,
headers=ocr_headers,
json=payload,
timeout=self.timeout
)
return self._handle_response(response)
try: try:
response = requests.post(url, headers=ocr_headers, json=payload) ocr_response = self._retry_request_sync(ocr_request)
ocr_response = self._handle_response(response)
log.info("OCR processing done.") log.info("OCR processing done.")
log.debug("OCR response: %s", ocr_response) self._debug_log("OCR response: %s", ocr_response)
return ocr_response return ocr_response
except Exception as e: except Exception as e:
log.error(f"Failed during OCR processing: {e}") log.error(f"Failed during OCR processing: {e}")
raise raise
async def _process_ocr_async(self, session: aiohttp.ClientSession, signed_url: str) -> Dict[str, Any]:
"""Async OCR processing with timing metrics."""
url = f"{self.BASE_API_URL}/ocr"
headers = {
**self.headers,
"Content-Type": "application/json",
"Accept": "application/json",
}
payload = {
"model": "mistral-ocr-latest",
"document": {
"type": "document_url",
"document_url": signed_url,
},
"include_image_base64": False,
}
async def ocr_request():
log.info("Starting OCR processing via Mistral API")
start_time = time.time()
async with session.post(
url,
json=payload,
headers=headers,
timeout=aiohttp.ClientTimeout(total=self.timeout)
) as response:
ocr_response = await self._handle_response_async(response)
processing_time = time.time() - start_time
log.info(f"OCR processing completed in {processing_time:.2f}s")
return ocr_response
return await self._retry_request_async(ocr_request)
def _delete_file(self, file_id: str) -> None: def _delete_file(self, file_id: str) -> None:
"""Deletes the file from Mistral storage.""" """Deletes the file from Mistral storage (sync version)."""
log.info(f"Deleting uploaded file ID: {file_id}") log.info(f"Deleting uploaded file ID: {file_id}")
url = f"{self.BASE_API_URL}/files/{file_id}" url = f"{self.BASE_API_URL}/files/{file_id}"
# No specific Accept header needed, default or Authorization is usually sufficient
try: try:
response = requests.delete(url, headers=self.headers) response = requests.delete(url, headers=self.headers, timeout=30)
delete_response = self._handle_response( delete_response = self._handle_response(response)
response log.info(f"File deleted successfully: {delete_response}")
) # Check status, ignore response body unless needed
log.info(
f"File deleted successfully: {delete_response}"
) # Log the response if available
except Exception as e: except Exception as e:
# Log error but don't necessarily halt execution if deletion fails # Log error but don't necessarily halt execution if deletion fails
log.error(f"Failed to delete file ID {file_id}: {e}") log.error(f"Failed to delete file ID {file_id}: {e}")
# Depending on requirements, you might choose to raise the error here
async def _delete_file_async(self, session: aiohttp.ClientSession, file_id: str) -> None:
"""Async file deletion with error tolerance."""
try:
async def delete_request():
self._debug_log(f"Deleting file ID: {file_id}")
async with session.delete(
url=f"{self.BASE_API_URL}/files/{file_id}",
headers=self.headers,
timeout=aiohttp.ClientTimeout(total=30) # Shorter timeout for cleanup
) as response:
return await self._handle_response_async(response)
await self._retry_request_async(delete_request)
self._debug_log(f"File {file_id} deleted successfully")
except Exception as e:
# Don't fail the entire process if cleanup fails
log.warning(f"Failed to delete file ID {file_id}: {e}")
@asynccontextmanager
async def _get_session(self):
"""Context manager for HTTP session with optimized settings."""
connector = aiohttp.TCPConnector(
limit=10, # Total connection limit
limit_per_host=5, # Per-host connection limit
ttl_dns_cache=300, # DNS cache TTL
use_dns_cache=True,
keepalive_timeout=30,
enable_cleanup_closed=True
)
async with aiohttp.ClientSession(
connector=connector,
timeout=aiohttp.ClientTimeout(total=self.timeout),
headers={"User-Agent": "OpenWebUI-MistralLoader/2.0"}
) as session:
yield session
def _process_results(self, ocr_response: Dict[str, Any]) -> List[Document]:
"""Process OCR results into Document objects with enhanced metadata."""
pages_data = ocr_response.get("pages")
if not pages_data:
log.warning("No pages found in OCR response.")
return [Document(page_content="No text content found", metadata={"error": "no_pages"})]
documents = []
total_pages = len(pages_data)
skipped_pages = 0
for page_data in pages_data:
page_content = page_data.get("markdown")
page_index = page_data.get("index") # API uses 0-based index
if page_content is not None and page_index is not None:
# Clean up content efficiently
cleaned_content = page_content.strip() if isinstance(page_content, str) else str(page_content)
if cleaned_content: # Only add non-empty pages
documents.append(
Document(
page_content=cleaned_content,
metadata={
"page": page_index, # 0-based index from API
"page_label": page_index + 1, # 1-based label for convenience
"total_pages": total_pages,
"file_name": self.file_name,
"file_size": self.file_size,
"processing_engine": "mistral-ocr"
},
)
)
else:
skipped_pages += 1
self._debug_log(f"Skipping empty page {page_index}")
else:
skipped_pages += 1
self._debug_log(f"Skipping page due to missing 'markdown' or 'index'. Data: {page_data}")
if skipped_pages > 0:
log.info(f"Processed {len(documents)} pages, skipped {skipped_pages} empty/invalid pages")
if not documents:
# Case where pages existed but none had valid markdown/index
log.warning("OCR response contained pages, but none had valid content/index.")
return [
Document(
page_content="No valid text content found in document",
metadata={"error": "no_valid_pages", "total_pages": total_pages}
)
]
return documents
def load(self) -> List[Document]: def load(self) -> List[Document]:
""" """
Executes the full OCR workflow: upload, get URL, process OCR, delete file. Executes the full OCR workflow: upload, get URL, process OCR, delete file.
Synchronous version for backward compatibility.
Returns: Returns:
A list of Document objects, one for each page processed. A list of Document objects, one for each page processed.
""" """
file_id = None file_id = None
start_time = time.time()
try: try:
# 1. Upload file # 1. Upload file
file_id = self._upload_file() file_id = self._upload_file()
@ -166,53 +466,21 @@ class MistralLoader:
ocr_response = self._process_ocr(signed_url) ocr_response = self._process_ocr(signed_url)
# 4. Process results # 4. Process results
pages_data = ocr_response.get("pages") documents = self._process_results(ocr_response)
if not pages_data:
log.warning("No pages found in OCR response.")
return [Document(page_content="No text content found", metadata={})]
documents = [] total_time = time.time() - start_time
total_pages = len(pages_data) log.info(f"Sync OCR workflow completed in {total_time:.2f}s, produced {len(documents)} documents")
for page_data in pages_data:
page_content = page_data.get("markdown")
page_index = page_data.get("index") # API uses 0-based index
if page_content is not None and page_index is not None:
documents.append(
Document(
page_content=page_content,
metadata={
"page": page_index, # 0-based index from API
"page_label": page_index
+ 1, # 1-based label for convenience
"total_pages": total_pages,
# Add other relevant metadata from page_data if available/needed
# e.g., page_data.get('width'), page_data.get('height')
},
)
)
else:
log.warning(
f"Skipping page due to missing 'markdown' or 'index'. Data: {page_data}"
)
if not documents:
# Case where pages existed but none had valid markdown/index
log.warning(
"OCR response contained pages, but none had valid content/index."
)
return [
Document(
page_content="No text content found in valid pages", metadata={}
)
]
return documents return documents
except Exception as e: except Exception as e:
log.error(f"An error occurred during the loading process: {e}") total_time = time.time() - start_time
# Return an empty list or a specific error document on failure log.error(f"An error occurred during the loading process after {total_time:.2f}s: {e}")
return [Document(page_content=f"Error during processing: {e}", metadata={})] # Return an error document on failure
return [Document(
page_content=f"Error during processing: {e}",
metadata={"error": "processing_failed", "file_name": self.file_name}
)]
finally: finally:
# 5. Delete file (attempt even if prior steps failed after upload) # 5. Delete file (attempt even if prior steps failed after upload)
if file_id: if file_id:
@ -220,6 +488,88 @@ class MistralLoader:
self._delete_file(file_id) self._delete_file(file_id)
except Exception as del_e: except Exception as del_e:
# Log deletion error, but don't overwrite original error if one occurred # Log deletion error, but don't overwrite original error if one occurred
log.error( log.error(f"Cleanup error: Could not delete file ID {file_id}. Reason: {del_e}")
f"Cleanup error: Could not delete file ID {file_id}. Reason: {del_e}"
) async def load_async(self) -> List[Document]:
"""
Asynchronous OCR workflow execution with optimized performance.
Returns:
A list of Document objects, one for each page processed.
"""
file_id = None
start_time = time.time()
try:
async with self._get_session() as session:
# 1. Upload file with streaming
file_id = await self._upload_file_async(session)
# 2. Get signed URL
signed_url = await self._get_signed_url_async(session, file_id)
# 3. Process OCR
ocr_response = await self._process_ocr_async(session, signed_url)
# 4. Process results
documents = self._process_results(ocr_response)
total_time = time.time() - start_time
log.info(f"Async OCR workflow completed in {total_time:.2f}s, produced {len(documents)} documents")
return documents
except Exception as e:
total_time = time.time() - start_time
log.error(f"Async OCR workflow failed after {total_time:.2f}s: {e}")
return [Document(
page_content=f"Error during OCR processing: {e}",
metadata={"error": "processing_failed", "file_name": self.file_name}
)]
finally:
# 5. Cleanup - always attempt file deletion
if file_id:
try:
async with self._get_session() as session:
await self._delete_file_async(session, file_id)
except Exception as cleanup_error:
log.error(f"Cleanup failed for file ID {file_id}: {cleanup_error}")
@staticmethod
async def load_multiple_async(loaders: List['MistralLoader']) -> List[List[Document]]:
"""
Process multiple files concurrently for maximum performance.
Args:
loaders: List of MistralLoader instances
Returns:
List of document lists, one for each loader
"""
if not loaders:
return []
log.info(f"Starting concurrent processing of {len(loaders)} files")
start_time = time.time()
# Process all files concurrently
tasks = [loader.load_async() for loader in loaders]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle any exceptions in results
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
log.error(f"File {i} failed: {result}")
processed_results.append([Document(
page_content=f"Error processing file: {result}",
metadata={"error": "batch_processing_failed", "file_index": i}
)])
else:
processed_results.append(result)
total_time = time.time() - start_time
total_docs = sum(len(docs) for docs in processed_results)
log.info(f"Batch processing completed in {total_time:.2f}s, produced {total_docs} total documents")
return processed_results