From 69be71ea4c6544c8b8e2797bdacaabba198c0b6c Mon Sep 17 00:00:00 2001 From: "Timothy J. Baek" Date: Tue, 28 May 2024 18:17:49 -0700 Subject: [PATCH] fix: langfuse --- .../examples/langfuse_filter_pipeline.py | 53 +++++++---- pipelines/ollama_manifold_pipeline.py | 89 ------------------- 2 files changed, 37 insertions(+), 105 deletions(-) delete mode 100644 pipelines/ollama_manifold_pipeline.py diff --git a/pipelines/examples/langfuse_filter_pipeline.py b/pipelines/examples/langfuse_filter_pipeline.py index c107f39..88667c4 100644 --- a/pipelines/examples/langfuse_filter_pipeline.py +++ b/pipelines/examples/langfuse_filter_pipeline.py @@ -2,6 +2,8 @@ from typing import List, Optional from schemas import OpenAIChatMessage import os + +from pydantic import BaseModel from langfuse import Langfuse from langfuse.decorators import langfuse_context, observe @@ -19,32 +21,38 @@ class Pipeline: self.id = "langfuse_filter_pipeline" self.name = "Langfuse Filter" - # Assign a priority level to the filter pipeline. - # The priority level determines the order in which the filter pipelines are executed. - # The lower the number, the higher the priority. - self.priority = 0 + class Valves(BaseModel): + # List target pipeline ids (models) that this filter will be connected to. + # If you want to connect this filter to all pipelines, you can set pipelines to ["*"] + pipelines: List[str] = [] - # List target pipelines (models) that this filter will be connected to. - # If you want to connect this filter to all pipelines, you can set pipelines to ["*"] - self.pipelines = ["*"] + # Assign a priority level to the filter pipeline. + # The priority level determines the order in which the filter pipelines are executed. + # The lower the number, the higher the priority. + priority: int = 0 - self.secret_key = os.getenv("LANGFUSE_SECRET_KEY") - self.public_key = os.getenv("LANGFUSE_PUBLIC_KEY") - self.host = os.getenv("LANGFUSE_HOST") + # Valves + secret_key: str + public_key: str + host: str - self.langfuse = Langfuse( - secret_key=self.secret_key, - public_key=self.public_key, - host=self.host, - debug=True, + # Initialize + self.valves = Valves( + **{ + "pipelines": ["*"], # Connect to all pipelines + "secret_key": os.getenv("LANGFUSE_SECRET_KEY"), + "public_key": os.getenv("LANGFUSE_PUBLIC_KEY"), + "host": os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"), + } ) - self.langfuse.auth_check() + self.langfuse = None pass async def on_startup(self): # This function is called when the server is started or after valves are updated. print(f"on_startup:{__name__}") + self.set_langfuse() pass async def on_shutdown(self): @@ -53,6 +61,19 @@ class Pipeline: self.langfuse.flush() pass + async def on_valves_update(self): + self.set_langfuse() + pass + + def set_langfuse(self): + self.langfuse = Langfuse( + secret_key=self.valves.secret_key, + public_key=self.valves.public_key, + host=self.valves.host, + debug=True, + ) + self.langfuse.auth_check() + async def filter(self, body: dict, user: Optional[dict] = None) -> dict: print(f"filter:{__name__}") diff --git a/pipelines/ollama_manifold_pipeline.py b/pipelines/ollama_manifold_pipeline.py deleted file mode 100644 index 0c58cbb..0000000 --- a/pipelines/ollama_manifold_pipeline.py +++ /dev/null @@ -1,89 +0,0 @@ -from typing import List, Union, Generator, Iterator -from schemas import OpenAIChatMessage -from pydantic import BaseModel -import requests - - -class Pipeline: - def __init__(self): - # You can also set the pipelines that are available in this pipeline. - # Set manifold to True if you want to use this pipeline as a manifold. - # Manifold pipelines can have multiple pipelines. - self.type = "manifold" - - # Optionally, you can set the id and name of the pipeline. - # Assign a unique identifier to the pipeline. - # The identifier must be unique across all pipelines. - # The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes. - self.id = "ollama_manifold" - - # Optionally, you can set the name of the manifold pipeline. - self.name = "Ollama: " - - class Valves(BaseModel): - OLLAMA_BASE_URL: str - - self.valves = Valves(**{"OLLAMA_BASE_URL": "http://localhost:11434"}) - pass - - async def on_startup(self): - # This function is called when the server is started or after valves are updated. - print(f"on_startup:{__name__}") - pass - - async def on_shutdown(self): - # This function is called when the server is stopped or before valves are updated. - print(f"on_shutdown:{__name__}") - pass - - def get_ollama_models(self): - if self.valves.OLLAMA_BASE_URL: - try: - r = requests.get(f"{self.valves.OLLAMA_BASE_URL}/api/tags") - models = r.json() - return [ - {"id": model["model"], "name": model["name"]} - for model in models["models"] - ] - except Exception as e: - print(f"Error: {e}") - return [ - { - "id": self.id, - "name": "Could not fetch models from Ollama, please update the URL in the valves.", - }, - ] - else: - return [] - - # Pipelines are the models that are available in the manifold. - # It can be a list or a function that returns a list. - def pipelines(self) -> List[dict]: - return self.get_ollama_models() - - def pipe( - self, user_message: str, model_id: str, messages: List[dict], body: dict - ) -> Union[str, Generator, Iterator]: - # This is where you can add your custom pipelines like RAG.' - - if "user" in body: - print("######################################") - print(f'# User: {body["user"]["name"]} ({body["user"]["id"]})') - print(f"# Message: {user_message}") - print("######################################") - - try: - r = requests.post( - url=f"{self.OLLAMA_BASE_URL}/v1/chat/completions", - json={**body, "model": model_id}, - stream=True, - ) - - r.raise_for_status() - - if body["stream"]: - return r.iter_lines() - else: - return r.json() - except Exception as e: - return f"Error: {e}"