diff --git a/pipelines/examples/langfuse_filter_pipeline.py b/pipelines/examples/langfuse_filter_pipeline.py new file mode 100644 index 0000000..af3afb1 --- /dev/null +++ b/pipelines/examples/langfuse_filter_pipeline.py @@ -0,0 +1,65 @@ +from typing import List, Optional +from schemas import OpenAIChatMessage +import os + +from langfuse import Langfuse +from langfuse.decorators import langfuse_context, observe + + +class Pipeline: + def __init__(self): + # Pipeline filters are only compatible with Open WebUI + # You can think of filter pipeline as a middleware that can be used to edit the form data before it is sent to the OpenAI API. + self.type = "filter" + + self.id = "langfuse_filter_pipeline" + self.name = "Langfuse Filter" + + # Assign a priority level to the filter pipeline. + # The priority level determines the order in which the filter pipelines are executed. + # The lower the number, the higher the priority. + self.priority = 0 + + # List target pipelines (models) that this filter will be connected to. + self.pipelines = [ + {"id": "llama3:latest"}, + ] + + self.secret_key = os.getenv("LANGFUSE_SECRET_KEY") + self.public_key = os.getenv("LANGFUSE_PUBLIC_KEY") + self.host = os.getenv("LANGFUSE_HOST") + + self.langfuse = Langfuse( + secret_key=self.secret_key, + public_key=self.public_key, + host=self.host, + debug=True, + ) + + self.langfuse.auth_check() + pass + + async def on_startup(self): + # This function is called when the server is started. + print(f"on_startup:{__name__}") + pass + + async def on_shutdown(self): + # This function is called when the server is stopped. + print(f"on_shutdown:{__name__}") + self.langfuse.flush() + pass + + async def filter(self, body: dict, user: Optional[dict] = None) -> dict: + print(f"filter:{__name__}") + + trace = self.langfuse.trace( + name=f"filter:{__name__}", + input=body, + user_id=user["id"], + metadata={"name": user["name"]}, + ) + + print(trace.get_trace_url()) + + return body diff --git a/pipelines/ollama_pipeline.py b/pipelines/ollama_pipeline.py deleted file mode 100644 index 7524edc..0000000 --- a/pipelines/ollama_pipeline.py +++ /dev/null @@ -1,52 +0,0 @@ -from typing import List, Union, Generator, Iterator -from schemas import OpenAIChatMessage -import requests - - -class Pipeline: - def __init__(self): - # Optionally, you can set the id and name of the pipeline. - self.id = "ollama_pipeline" - self.name = "Ollama Pipeline" - pass - - async def on_startup(self): - # This function is called when the server is started. - print(f"on_startup:{__name__}") - pass - - async def on_shutdown(self): - # This function is called when the server is stopped. - print(f"on_shutdown:{__name__}") - pass - - def pipe( - self, user_message: str, model_id: str, messages: List[dict], body: dict - ) -> Union[str, Generator, Iterator]: - # This is where you can add your custom pipelines like RAG.' - print(f"pipe:{__name__}") - - OLLAMA_BASE_URL = "http://localhost:11434" - MODEL = "llama3" - - if "user" in body: - print("######################################") - print(f'# User: {body["user"]["name"]} ({body["user"]["id"]})') - print(f"# Message: {user_message}") - print("######################################") - - try: - r = requests.post( - url=f"{OLLAMA_BASE_URL}/v1/chat/completions", - json={**body, "model": MODEL}, - stream=True, - ) - - r.raise_for_status() - - if body["stream"]: - return r.iter_lines() - else: - return r.json() - except Exception as e: - return f"Error: {e}"