From 6fa2f2c4503bacd877f1015ae2e4141bb99a29cb Mon Sep 17 00:00:00 2001 From: "Timothy J. Baek" Date: Fri, 31 May 2024 19:40:09 -0700 Subject: [PATCH] example: convo turn limit filter --- examples/conversation_turn_limit_filter.py | 63 ++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 examples/conversation_turn_limit_filter.py diff --git a/examples/conversation_turn_limit_filter.py b/examples/conversation_turn_limit_filter.py new file mode 100644 index 0000000..ca6c264 --- /dev/null +++ b/examples/conversation_turn_limit_filter.py @@ -0,0 +1,63 @@ +import os +from typing import List, Optional +from pydantic import BaseModel +from schemas import OpenAIChatMessage +import time + + +class Pipeline: + def __init__(self): + # Pipeline filters are only compatible with Open WebUI + # You can think of filter pipeline as a middleware that can be used to edit the form data before it is sent to the OpenAI API. + self.type = "filter" + + # Assign a unique identifier to the pipeline. + # The identifier must be unique across all pipelines. + # The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes. + self.id = "conversation_turn_limit_filter_pipeline" + self.name = "Conversation Turn Limit Filter" + + class Valves(BaseModel): + # List target pipeline ids (models) that this filter will be connected to. + # If you want to connect this filter to all pipelines, you can set pipelines to ["*"] + pipelines: List[str] = [] + + # Assign a priority level to the filter pipeline. + # The priority level determines the order in which the filter pipelines are executed. + # The lower the number, the higher the priority. + priority: int = 0 + + # Valves for conversation turn limiting + target_user_roles: List[str] = ["user"] + max_turns: Optional[int] = None + + self.valves = Valves( + **{ + "pipelines": os.getenv("CONVERSATION_TURN_PIPELINES", "*").split(","), + "max_turns": 10, + } + ) + + async def on_startup(self): + # This function is called when the server is started. + print(f"on_startup:{__name__}") + pass + + async def on_shutdown(self): + # This function is called when the server is stopped. + print(f"on_shutdown:{__name__}") + pass + + async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: + print(f"pipe:{__name__}") + print(body) + print(user) + + if user.get("role", "admin") in self.valves.target_user_roles: + messages = body.get("messages", []) + if len(messages) > self.valves.max_turns: + raise Exception( + f"Conversation turn limit exceeded. Max turns: {self.valves.max_turns}" + ) + + return body