From a8348a31978578708eaf73643863ba9a64828493 Mon Sep 17 00:00:00 2001 From: "Timothy J. Baek" Date: Tue, 28 May 2024 15:29:02 -0700 Subject: [PATCH] feat: litellm manifold example --- .../examples/litellm_manifold_pipeline.py | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 pipelines/examples/litellm_manifold_pipeline.py diff --git a/pipelines/examples/litellm_manifold_pipeline.py b/pipelines/examples/litellm_manifold_pipeline.py new file mode 100644 index 0000000..9b6d4d5 --- /dev/null +++ b/pipelines/examples/litellm_manifold_pipeline.py @@ -0,0 +1,79 @@ +from typing import List, Union, Generator, Iterator +from schemas import OpenAIChatMessage +from pydantic import BaseModel +import requests + + +class Pipeline: + def __init__(self): + # You can also set the pipelines that are available in this pipeline. + # Set manifold to True if you want to use this pipeline as a manifold. + # Manifold pipelines can have multiple pipelines. + self.type = "manifold" + + # Optionally, you can set the id and name of the pipeline. + # Assign a unique identifier to the pipeline. + # The identifier must be unique across all pipelines. + # The identifier must be an alphanumeric string that can include underscores or hyphens. It cannot contain spaces, special characters, slashes, or backslashes. + self.id = "litellm_manifold" + + # Optionally, you can set the name of the manifold pipeline. + self.name = "LiteLLM: " + + class Valves(BaseModel): + LITELLM_BASE_URL: str + + # Initialize rate limits + self.valves = Valves(**{"LITELLM_BASE_URL": "http://localhost:4000"}) + + self.pipelines = self.get_litellm_models() + pass + + def get_litellm_models(self): + if self.valves.LITELLM_BASE_URL: + r = requests.get(f"{self.valves.LITELLM_BASE_URL}/v1/models") + models = r.json() + return [ + { + "id": model["id"], + "name": model["name"] if "name" in model else model["id"], + } + for model in models["data"] + ] + else: + return [] + + async def on_startup(self): + # This function is called when the server is started. + print(f"on_startup:{__name__}") + pass + + async def on_shutdown(self): + # This function is called when the server is stopped. + print(f"on_shutdown:{__name__}") + pass + + def pipe( + self, user_message: str, model_id: str, messages: List[dict], body: dict + ) -> Union[str, Generator, Iterator]: + if "user" in body: + print("######################################") + print(f'# User: {body["user"]["name"]} ({body["user"]["id"]})') + print(f"# Message: {user_message}") + print("######################################") + + try: + r = requests.post( + url=f"{self.valves.LITELLM_BASE_URL}/v1/chat/completions", + json={**body, "model": model_id, "user_id": body["user"]["id"]}, + stream=True, + ) + + r.raise_for_status() + + if body["stream"]: + return r.iter_lines() + else: + return r.json() + except Exception as e: + return f"Error: {e}"