fix: langfuse

This commit is contained in:
Timothy J. Baek 2024-05-28 18:17:49 -07:00
parent a917293db5
commit 69be71ea4c
2 changed files with 37 additions and 105 deletions

View File

@ -2,6 +2,8 @@ from typing import List, Optional
from schemas import OpenAIChatMessage from schemas import OpenAIChatMessage
import os import os
from pydantic import BaseModel
from langfuse import Langfuse from langfuse import Langfuse
from langfuse.decorators import langfuse_context, observe from langfuse.decorators import langfuse_context, observe
@ -19,32 +21,38 @@ class Pipeline:
self.id = "langfuse_filter_pipeline" self.id = "langfuse_filter_pipeline"
self.name = "Langfuse Filter" self.name = "Langfuse Filter"
# Assign a priority level to the filter pipeline. class Valves(BaseModel):
# The priority level determines the order in which the filter pipelines are executed. # List target pipeline ids (models) that this filter will be connected to.
# The lower the number, the higher the priority. # If you want to connect this filter to all pipelines, you can set pipelines to ["*"]
self.priority = 0 pipelines: List[str] = []
# List target pipelines (models) that this filter will be connected to. # Assign a priority level to the filter pipeline.
# If you want to connect this filter to all pipelines, you can set pipelines to ["*"] # The priority level determines the order in which the filter pipelines are executed.
self.pipelines = ["*"] # The lower the number, the higher the priority.
priority: int = 0
self.secret_key = os.getenv("LANGFUSE_SECRET_KEY") # Valves
self.public_key = os.getenv("LANGFUSE_PUBLIC_KEY") secret_key: str
self.host = os.getenv("LANGFUSE_HOST") public_key: str
host: str
self.langfuse = Langfuse( # Initialize
secret_key=self.secret_key, self.valves = Valves(
public_key=self.public_key, **{
host=self.host, "pipelines": ["*"], # Connect to all pipelines
debug=True, "secret_key": os.getenv("LANGFUSE_SECRET_KEY"),
"public_key": os.getenv("LANGFUSE_PUBLIC_KEY"),
"host": os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"),
}
) )
self.langfuse.auth_check() self.langfuse = None
pass pass
async def on_startup(self): async def on_startup(self):
# This function is called when the server is started or after valves are updated. # This function is called when the server is started or after valves are updated.
print(f"on_startup:{__name__}") print(f"on_startup:{__name__}")
self.set_langfuse()
pass pass
async def on_shutdown(self): async def on_shutdown(self):
@ -53,6 +61,19 @@ class Pipeline:
self.langfuse.flush() self.langfuse.flush()
pass pass
async def on_valves_update(self):
self.set_langfuse()
pass
def set_langfuse(self):
self.langfuse = Langfuse(
secret_key=self.valves.secret_key,
public_key=self.valves.public_key,
host=self.valves.host,
debug=True,
)
self.langfuse.auth_check()
async def filter(self, body: dict, user: Optional[dict] = None) -> dict: async def filter(self, body: dict, user: Optional[dict] = None) -> dict:
print(f"filter:{__name__}") print(f"filter:{__name__}")

View File

@ -1,89 +0,0 @@
from typing import List, Union, Generator, Iterator
from schemas import OpenAIChatMessage
from pydantic import BaseModel
import requests
class Pipeline:
def __init__(self):
# You can also set the pipelines that are available in this pipeline.
# Set manifold to True if you want to use this pipeline as a manifold.
# Manifold pipelines can have multiple pipelines.
self.type = "manifold"
# Optionally, you can set the id and name of the pipeline.
# Assign a unique identifier to the pipeline.
# The identifier must be unique across all pipelines.
# The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes.
self.id = "ollama_manifold"
# Optionally, you can set the name of the manifold pipeline.
self.name = "Ollama: "
class Valves(BaseModel):
OLLAMA_BASE_URL: str
self.valves = Valves(**{"OLLAMA_BASE_URL": "http://localhost:11434"})
pass
async def on_startup(self):
# This function is called when the server is started or after valves are updated.
print(f"on_startup:{__name__}")
pass
async def on_shutdown(self):
# This function is called when the server is stopped or before valves are updated.
print(f"on_shutdown:{__name__}")
pass
def get_ollama_models(self):
if self.valves.OLLAMA_BASE_URL:
try:
r = requests.get(f"{self.valves.OLLAMA_BASE_URL}/api/tags")
models = r.json()
return [
{"id": model["model"], "name": model["name"]}
for model in models["models"]
]
except Exception as e:
print(f"Error: {e}")
return [
{
"id": self.id,
"name": "Could not fetch models from Ollama, please update the URL in the valves.",
},
]
else:
return []
# Pipelines are the models that are available in the manifold.
# It can be a list or a function that returns a list.
def pipelines(self) -> List[dict]:
return self.get_ollama_models()
def pipe(
self, user_message: str, model_id: str, messages: List[dict], body: dict
) -> Union[str, Generator, Iterator]:
# This is where you can add your custom pipelines like RAG.'
if "user" in body:
print("######################################")
print(f'# User: {body["user"]["name"]} ({body["user"]["id"]})')
print(f"# Message: {user_message}")
print("######################################")
try:
r = requests.post(
url=f"{self.OLLAMA_BASE_URL}/v1/chat/completions",
json={**body, "model": model_id},
stream=True,
)
r.raise_for_status()
if body["stream"]:
return r.iter_lines()
else:
return r.json()
except Exception as e:
return f"Error: {e}"