diff --git a/examples/filters/google_translation_filter_pipeline.py b/examples/filters/google_translation_filter_pipeline.py index 892c6c3..32b6b7d 100644 --- a/examples/filters/google_translation_filter_pipeline.py +++ b/examples/filters/google_translation_filter_pipeline.py @@ -8,13 +8,15 @@ description: This pipeline integrates Google Translate for automatic translation without requiring an API key. It supports multilingual communication by translating based on specified source and target languages. """ - +import re from typing import List, Optional from schemas import OpenAIChatMessage from pydantic import BaseModel import requests import os import time +import asyncio +from functools import lru_cache from utils.pipelines.main import get_last_user_message, get_last_assistant_message @@ -25,36 +27,33 @@ class Pipeline: source_user: Optional[str] = "auto" target_user: Optional[str] = "en" source_assistant: Optional[str] = "en" - target_assistant: Optional[str] = "uk" + target_assistant: Optional[str] = "es" def __init__(self): - # Initialize the pipeline type and name self.type = "filter" self.name = "Google Translate Filter" - - # Initialize Valves with default values self.valves = self.Valves( **{ "pipelines": ["*"], } ) + # Initialize translation cache + self.translation_cache = {} + async def on_startup(self): - # Function called when the server is started print(f"on_startup:{__name__}") pass async def on_shutdown(self): - # Function called when the server is stopped print(f"on_shutdown:{__name__}") pass async def on_valves_updated(self): - # Function called when the valves are updated pass + @lru_cache(maxsize=128) # LRU cache to store translation results def translate(self, text: str, source: str, target: str) -> str: - # Function to translate text using Google Translate url = "https://translate.googleapis.com/translate_a/single" params = { "client": "gtx", @@ -65,26 +64,29 @@ class Pipeline: } try: - # Make a GET request to Google Translate API r = requests.get(url, params=params) - r.raise_for_status() # Raise an exception for bad status codes - - # Parse the JSON response + r.raise_for_status() result = r.json() - translated_text = ''.join([sentence[0] for sentence in result[0]]) # Combine all translated sentences into one string + translated_text = ''.join([sentence[0] for sentence in result[0]]) return translated_text except requests.exceptions.RequestException as e: - # Handle network errors, retrying after a short pause print(f"Network error: {e}") - time.sleep(1) # Pause before retrying - return self.translate(text, source, target) # Retry translation + time.sleep(1) + return self.translate(text, source, target) except Exception as e: - # Handle other exceptions print(f"Error translating text: {e}") - return text # Return original text in case of error + return text + + def split_text_around_table(self, text: str) -> List[str]: + table_regex = r'((?:^.*?\|.*?\n)+)(?=\n[^\|\s].*?\|)' + matches = re.split(table_regex, text, flags=re.MULTILINE) + + if len(matches) > 1: + return [matches[0], matches[1]] + else: + return [text, ""] async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: - # Function to process incoming messages from the user print(f"inlet:{__name__}") messages = body["messages"] @@ -92,16 +94,23 @@ class Pipeline: print(f"User message: {user_message}") - # Translate user message - translated_user_message = self.translate( - user_message, - self.valves.source_user, - self.valves.target_user, - ) + parts = self.split_text_around_table(user_message) + text_before_table, table_text = parts + + # Check translation cache for text before table + translated_before_table = self.translation_cache.get(text_before_table) + if translated_before_table is None: + translated_before_table = self.translate( + text_before_table, + self.valves.source_user, + self.valves.target_user, + ) + self.translation_cache[text_before_table] = translated_before_table + + translated_user_message = translated_before_table + table_text print(f"Translated user message: {translated_user_message}") - # Update the translated message in the messages list for message in reversed(messages): if message["role"] == "user": message["content"] = translated_user_message @@ -111,7 +120,6 @@ class Pipeline: return body async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: - # Function to process outgoing messages from the assistant print(f"outlet:{__name__}") messages = body["messages"] @@ -119,16 +127,23 @@ class Pipeline: print(f"Assistant message: {assistant_message}") - # Translate assistant message - translated_assistant_message = self.translate( - assistant_message, - self.valves.source_assistant, - self.valves.target_assistant, - ) + parts = self.split_text_around_table(assistant_message) + text_before_table, table_text = parts + + # Check translation cache for text before table + translated_before_table = self.translation_cache.get(text_before_table) + if translated_before_table is None: + translated_before_table = self.translate( + text_before_table, + self.valves.source_assistant, + self.valves.target_assistant, + ) + self.translation_cache[text_before_table] = translated_before_table + + translated_assistant_message = translated_before_table + table_text print(f"Translated assistant message: {translated_assistant_message}") - # Update the translated message in the messages list for message in reversed(messages): if message["role"] == "assistant": message["content"] = translated_assistant_message