From dbb117f880183e5f8a78970cfe20fad5583a6016 Mon Sep 17 00:00:00 2001 From: ther3zz <40278044+ther3zz@users.noreply.github.com> Date: Sun, 21 Jul 2024 11:06:54 -0400 Subject: [PATCH] Update langfuse_filter_pipeline.py sometimes tools/functions break the langfuse pipeline because chat_id is missing. this check to see if a value exists and if it does not, sets it to SYSTEM MESSAGE UUID where UUID is a generated uuid --- examples/filters/langfuse_filter_pipeline.py | 50 +++++++------------- 1 file changed, 18 insertions(+), 32 deletions(-) diff --git a/examples/filters/langfuse_filter_pipeline.py b/examples/filters/langfuse_filter_pipeline.py index 66dfcad..7cc373f 100644 --- a/examples/filters/langfuse_filter_pipeline.py +++ b/examples/filters/langfuse_filter_pipeline.py @@ -11,6 +11,7 @@ requirements: langfuse from typing import List, Optional from schemas import OpenAIChatMessage import os +import uuid from utils.pipelines.main import get_last_user_message, get_last_assistant_message from pydantic import BaseModel @@ -20,64 +21,36 @@ from langfuse.api.resources.commons.errors.unauthorized_error import Unauthorize class Pipeline: class Valves(BaseModel): - # List target pipeline ids (models) that this filter will be connected to. - # If you want to connect this filter to all pipelines, you can set pipelines to ["*"] - # e.g. ["llama3:latest", "gpt-3.5-turbo"] pipelines: List[str] = [] - - # Assign a priority level to the filter pipeline. - # The priority level determines the order in which the filter pipelines are executed. - # The lower the number, the higher the priority. priority: int = 0 - - # Valves secret_key: str public_key: str host: str def __init__(self): - # Pipeline filters are only compatible with Open WebUI - # You can think of filter pipeline as a middleware that can be used to edit the form data before it is sent to the OpenAI API. self.type = "filter" - - # Optionally, you can set the id and name of the pipeline. - # Best practice is to not specify the id so that it can be automatically inferred from the filename, so that users can install multiple versions of the same pipeline. - # The identifier must be unique across all pipelines. - # The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes. - # self.id = "langfuse_filter_pipeline" self.name = "Langfuse Filter" - - # Initialize self.valves = self.Valves( **{ - "pipelines": ["*"], # Connect to all pipelines + "pipelines": ["*"], "secret_key": os.getenv("LANGFUSE_SECRET_KEY", "your-secret-key-here"), "public_key": os.getenv("LANGFUSE_PUBLIC_KEY", "your-public-key-here"), "host": os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"), } ) - self.langfuse = None self.chat_generations = {} - pass async def on_startup(self): - # This function is called when the server is started. print(f"on_startup:{__name__}") self.set_langfuse() - pass async def on_shutdown(self): - # This function is called when the server is stopped. print(f"on_shutdown:{__name__}") self.langfuse.flush() - pass async def on_valves_updated(self): - # This function is called when the valves are updated. - self.set_langfuse() - pass def set_langfuse(self): try: @@ -97,6 +70,22 @@ class Pipeline: async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: print(f"inlet:{__name__}") + print(f"Received body: {body}") + print(f"User: {user}") + + # Check for presence of required keys and generate chat_id if missing + if "chat_id" not in body: + unique_id = f"SYSTEM MESSAGE {uuid.uuid4()}" + body["chat_id"] = unique_id + print(f"chat_id was missing, set to: {unique_id}") + + required_keys = ["model", "messages"] + missing_keys = [key for key in required_keys if key not in body] + + if missing_keys: + error_message = f"Error: Missing keys in the request body: {', '.join(missing_keys)}" + print(error_message) + raise ValueError(error_message) trace = self.langfuse.trace( name=f"filter:{__name__}", @@ -128,9 +117,6 @@ class Pipeline: user_message = get_last_user_message(body["messages"]) generated_message = get_last_assistant_message(body["messages"]) - # Update usage cost based on the length of the input and output messages - # Below does not reflect the actual cost of the API - # You can adjust the cost based on your requirements generation.end( output=generated_message, usage={