Added caching and optimization for Markdown table translation

This commit is contained in:
SimonOriginal 2024-06-28 15:49:10 +02:00
parent 8d3bc097e3
commit 539cbe1a4a

View File

@ -8,13 +8,15 @@ description: This pipeline integrates Google Translate for automatic translation
without requiring an API key. It supports multilingual communication by translating based on specified source
and target languages.
"""
import re
from typing import List, Optional
from schemas import OpenAIChatMessage
from pydantic import BaseModel
import requests
import os
import time
import asyncio
from functools import lru_cache
from utils.pipelines.main import get_last_user_message, get_last_assistant_message
@ -25,36 +27,33 @@ class Pipeline:
source_user: Optional[str] = "auto"
target_user: Optional[str] = "en"
source_assistant: Optional[str] = "en"
target_assistant: Optional[str] = "uk"
target_assistant: Optional[str] = "es"
def __init__(self):
# Initialize the pipeline type and name
self.type = "filter"
self.name = "Google Translate Filter"
# Initialize Valves with default values
self.valves = self.Valves(
**{
"pipelines": ["*"],
}
)
# Initialize translation cache
self.translation_cache = {}
async def on_startup(self):
# Function called when the server is started
print(f"on_startup:{__name__}")
pass
async def on_shutdown(self):
# Function called when the server is stopped
print(f"on_shutdown:{__name__}")
pass
async def on_valves_updated(self):
# Function called when the valves are updated
pass
@lru_cache(maxsize=128) # LRU cache to store translation results
def translate(self, text: str, source: str, target: str) -> str:
# Function to translate text using Google Translate
url = "https://translate.googleapis.com/translate_a/single"
params = {
"client": "gtx",
@ -65,26 +64,29 @@ class Pipeline:
}
try:
# Make a GET request to Google Translate API
r = requests.get(url, params=params)
r.raise_for_status() # Raise an exception for bad status codes
# Parse the JSON response
r.raise_for_status()
result = r.json()
translated_text = ''.join([sentence[0] for sentence in result[0]]) # Combine all translated sentences into one string
translated_text = ''.join([sentence[0] for sentence in result[0]])
return translated_text
except requests.exceptions.RequestException as e:
# Handle network errors, retrying after a short pause
print(f"Network error: {e}")
time.sleep(1) # Pause before retrying
return self.translate(text, source, target) # Retry translation
time.sleep(1)
return self.translate(text, source, target)
except Exception as e:
# Handle other exceptions
print(f"Error translating text: {e}")
return text # Return original text in case of error
return text
def split_text_around_table(self, text: str) -> List[str]:
table_regex = r'((?:^.*?\|.*?\n)+)(?=\n[^\|\s].*?\|)'
matches = re.split(table_regex, text, flags=re.MULTILINE)
if len(matches) > 1:
return [matches[0], matches[1]]
else:
return [text, ""]
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
# Function to process incoming messages from the user
print(f"inlet:{__name__}")
messages = body["messages"]
@ -92,16 +94,23 @@ class Pipeline:
print(f"User message: {user_message}")
# Translate user message
translated_user_message = self.translate(
user_message,
self.valves.source_user,
self.valves.target_user,
)
parts = self.split_text_around_table(user_message)
text_before_table, table_text = parts
# Check translation cache for text before table
translated_before_table = self.translation_cache.get(text_before_table)
if translated_before_table is None:
translated_before_table = self.translate(
text_before_table,
self.valves.source_user,
self.valves.target_user,
)
self.translation_cache[text_before_table] = translated_before_table
translated_user_message = translated_before_table + table_text
print(f"Translated user message: {translated_user_message}")
# Update the translated message in the messages list
for message in reversed(messages):
if message["role"] == "user":
message["content"] = translated_user_message
@ -111,7 +120,6 @@ class Pipeline:
return body
async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
# Function to process outgoing messages from the assistant
print(f"outlet:{__name__}")
messages = body["messages"]
@ -119,16 +127,23 @@ class Pipeline:
print(f"Assistant message: {assistant_message}")
# Translate assistant message
translated_assistant_message = self.translate(
assistant_message,
self.valves.source_assistant,
self.valves.target_assistant,
)
parts = self.split_text_around_table(assistant_message)
text_before_table, table_text = parts
# Check translation cache for text before table
translated_before_table = self.translation_cache.get(text_before_table)
if translated_before_table is None:
translated_before_table = self.translate(
text_before_table,
self.valves.source_assistant,
self.valves.target_assistant,
)
self.translation_cache[text_before_table] = translated_before_table
translated_assistant_message = translated_before_table + table_text
print(f"Translated assistant message: {translated_assistant_message}")
# Update the translated message in the messages list
for message in reversed(messages):
if message["role"] == "assistant":
message["content"] = translated_assistant_message