From a05c4d5e951d626c0c9400c7f259c56488dbf150 Mon Sep 17 00:00:00 2001 From: "Timothy J. Baek" Date: Tue, 28 May 2024 18:45:29 -0700 Subject: [PATCH] Delete rate_limit_filter_pipeline.py --- pipelines/rate_limit_filter_pipeline.py | 116 ------------------------ 1 file changed, 116 deletions(-) delete mode 100644 pipelines/rate_limit_filter_pipeline.py diff --git a/pipelines/rate_limit_filter_pipeline.py b/pipelines/rate_limit_filter_pipeline.py deleted file mode 100644 index 3422eeb..0000000 --- a/pipelines/rate_limit_filter_pipeline.py +++ /dev/null @@ -1,116 +0,0 @@ -from typing import List, Optional -from pydantic import BaseModel -from schemas import OpenAIChatMessage -import time - - -class Pipeline: - def __init__(self): - # Pipeline filters are only compatible with Open WebUI - # You can think of filter pipeline as a middleware that can be used to edit the form data before it is sent to the OpenAI API. - self.type = "filter" - - # Assign a unique identifier to the pipeline. - # The identifier must be unique across all pipelines. - # The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes. - self.id = "rate_limit_filter_pipeline" - self.name = "Rate Limit Filter" - - class Valves(BaseModel): - # List target pipeline ids (models) that this filter will be connected to. - # If you want to connect this filter to all pipelines, you can set pipelines to ["*"] - pipelines: List[str] = [] - - # Assign a priority level to the filter pipeline. - # The priority level determines the order in which the filter pipelines are executed. - # The lower the number, the higher the priority. - priority: int = 0 - - # Valves for rate limiting - requests_per_minute: Optional[int] = None - requests_per_hour: Optional[int] = None - sliding_window_limit: Optional[int] = None - sliding_window_minutes: Optional[int] = None - - # Initialize rate limits - self.valves = Valves( - **{ - "pipelines": ["*"], # Connect to all pipelines - "requests_per_minute": 10, - "requests_per_hour": 1000, - "sliding_window_limit": 100, - "sliding_window_minutes": 15, - } - ) - - # Tracking data - user_id -> (timestamps of requests) - self.user_requests = {} - - async def on_startup(self): - # This function is called when the server is started or after valves are updated. - print(f"on_startup:{__name__}") - pass - - async def on_shutdown(self): - # This function is called when the server is stopped or before valves are updated. - print(f"on_shutdown:{__name__}") - pass - - def prune_requests(self, user_id: str): - """Prune old requests that are outside of the sliding window period.""" - now = time.time() - if user_id in self.user_requests: - self.user_requests[user_id] = [ - req - for req in self.user_requests[user_id] - if ( - (self.valves.requests_per_minute is not None and now - req < 60) - or (self.valves.requests_per_hour is not None and now - req < 3600) - or ( - self.valves.sliding_window_limit is not None - and now - req < self.valves.sliding_window_minutes * 60 - ) - ) - ] - - def log_request(self, user_id: str): - """Log a new request for a user.""" - now = time.time() - if user_id not in self.user_requests: - self.user_requests[user_id] = [] - self.user_requests[user_id].append(now) - - def rate_limited(self, user_id: str) -> bool: - """Check if a user is rate limited.""" - self.prune_requests(user_id) - - user_reqs = self.user_requests.get(user_id, []) - - if self.valves.requests_per_minute is not None: - requests_last_minute = sum(1 for req in user_reqs if time.time() - req < 60) - if requests_last_minute >= self.valves.requests_per_minute: - return True - - if self.valves.requests_per_hour is not None: - requests_last_hour = sum(1 for req in user_reqs if time.time() - req < 3600) - if requests_last_hour >= self.valves.requests_per_hour: - return True - - if self.valves.sliding_window_limit is not None: - requests_in_window = len(user_reqs) - if requests_in_window >= self.valves.sliding_window_limit: - return True - - return False - - async def filter(self, body: dict, user: Optional[dict] = None) -> dict: - print(f"pipe:{__name__}") - print(body) - print(user) - - user_id = user["id"] if user and "id" in user else "default_user" - if self.rate_limited(user_id): - raise Exception("Rate limit exceeded. Please try again later.") - - self.log_request(user_id) - return body