feat: valves on_startup & on_shutdown

This commit is contained in:
Timothy J. Baek 2024-05-28 15:39:28 -07:00
parent a8348a3197
commit 88f3d59fcb
20 changed files with 56 additions and 48 deletions

View File

@ -255,6 +255,7 @@ async def update_valves(pipeline_id: str, form_data: dict):
pipeline_module = PIPELINE_MODULES[pipeline_id] pipeline_module = PIPELINE_MODULES[pipeline_id]
await pipeline_module.on_shutdown()
try: try:
ValvesModel = pipeline_module.valves.__class__ ValvesModel = pipeline_module.valves.__class__
valves = ValvesModel(**form_data) valves = ValvesModel(**form_data)
@ -265,6 +266,7 @@ async def update_valves(pipeline_id: str, form_data: dict):
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"{str(e)}", detail=f"{str(e)}",
) )
await pipeline_module.on_startup()
return pipeline_module.valves return pipeline_module.valves

View File

@ -18,13 +18,13 @@ class Pipeline:
pass pass
async def on_startup(self): async def on_startup(self):
# This function is called when the server is started. # This function is called when the server is started or after valves are updated.
print(f"on_startup:{__name__}") print(f"on_startup:{__name__}")
pass pass
async def on_shutdown(self): async def on_shutdown(self):
# This function is called when the server is stopped. # This function is called when the server is stopped or before valves are updated.
print(f"on_shutdown:{__name__}") print(f"on_shutdown:{__name__}")
pass pass

View File

@ -14,12 +14,12 @@ class Pipeline:
pass pass
async def on_startup(self): async def on_startup(self):
# This function is called when the server is started. # This function is called when the server is started or after valves are updated.
print(f"on_startup:{__name__}") print(f"on_startup:{__name__}")
pass pass
async def on_shutdown(self): async def on_shutdown(self):
# This function is called when the server is stopped. # This function is called when the server is stopped or before valves are updated.
print(f"on_shutdown:{__name__}") print(f"on_shutdown:{__name__}")
pass pass

View File

@ -34,12 +34,12 @@ class Pipeline:
pass pass
async def on_startup(self): async def on_startup(self):
# This function is called when the server is started. # This function is called when the server is started or after valves are updated.
print(f"on_startup:{__name__}") print(f"on_startup:{__name__}")
pass pass
async def on_shutdown(self): async def on_shutdown(self):
# This function is called when the server is stopped. # This function is called when the server is stopped or before valves are updated.
print(f"on_shutdown:{__name__}") print(f"on_shutdown:{__name__}")
pass pass

View File

@ -75,7 +75,7 @@ class Pipeline:
pass pass
async def on_shutdown(self): async def on_shutdown(self):
# This function is called when the server is stopped. # This function is called when the server is stopped or before valves are updated.
pass pass
def pipe( def pipe(

View File

@ -43,12 +43,12 @@ class Pipeline:
pass pass
async def on_startup(self): async def on_startup(self):
# This function is called when the server is started. # This function is called when the server is started or after valves are updated.
print(f"on_startup:{__name__}") print(f"on_startup:{__name__}")
pass pass
async def on_shutdown(self): async def on_shutdown(self):
# This function is called when the server is stopped. # This function is called when the server is stopped or before valves are updated.
print(f"on_shutdown:{__name__}") print(f"on_shutdown:{__name__}")
self.langfuse.flush() self.langfuse.flush()
pass pass

View File

@ -24,32 +24,38 @@ class Pipeline:
LITELLM_BASE_URL: str LITELLM_BASE_URL: str
# Initialize rate limits # Initialize rate limits
self.valves = Valves(**{"LITELLM_BASE_URL": "http://localhost:4000"}) self.valves = Valves(**{"LITELLM_BASE_URL": "http://localhost:4001"})
self.pipelines = []
self.pipelines = self.get_litellm_models()
pass pass
def get_litellm_models(self): def get_litellm_models(self):
if self.valves.LITELLM_BASE_URL: if self.valves.LITELLM_BASE_URL:
r = requests.get(f"{self.valves.LITELLM_BASE_URL}/v1/models") try:
models = r.json() r = requests.get(f"{self.valves.LITELLM_BASE_URL}/v1/models")
return [ models = r.json()
{ return [
"id": model["id"], {
"name": model["name"] if "name" in model else model["id"], "id": model["id"],
} "name": model["name"] if "name" in model else model["id"],
for model in models["data"] }
] for model in models["data"]
]
except Exception as e:
print(f"Error: {e}")
return [
{"id": "litellm", "name": "LiteLLM: Please configure LiteLLM URL"},
]
else: else:
return [] return []
async def on_startup(self): async def on_startup(self):
# This function is called when the server is started. # This function is called when the server is started or after valves are updated.
print(f"on_startup:{__name__}") print(f"on_startup:{__name__}")
self.pipelines = self.get_litellm_models()
pass pass
async def on_shutdown(self): async def on_shutdown(self):
# This function is called when the server is stopped. # This function is called when the server is stopped or before valves are updated.
print(f"on_shutdown:{__name__}") print(f"on_shutdown:{__name__}")
pass pass

View File

@ -15,7 +15,7 @@ class Pipeline:
pass pass
async def on_startup(self): async def on_startup(self):
# This function is called when the server is started. # This function is called when the server is started or after valves are updated.
print(f"on_startup:{__name__}") print(f"on_startup:{__name__}")
from llama_cpp import Llama from llama_cpp import Llama
@ -29,7 +29,7 @@ class Pipeline:
pass pass
async def on_shutdown(self): async def on_shutdown(self):
# This function is called when the server is stopped. # This function is called when the server is stopped or before valves are updated.
print(f"on_shutdown:{__name__}") print(f"on_shutdown:{__name__}")
pass pass

View File

@ -66,7 +66,7 @@ class Pipeline:
print(self.index) print(self.index)
async def on_shutdown(self): async def on_shutdown(self):
# This function is called when the server is stopped. # This function is called when the server is stopped or before valves are updated.
pass pass
def pipe( def pipe(

View File

@ -18,7 +18,7 @@ class Pipeline:
) )
Settings.llm = Ollama(model="llama3") Settings.llm = Ollama(model="llama3")
# This function is called when the server is started. # This function is called when the server is started or after valves are updated.
global documents, index global documents, index
self.documents = SimpleDirectoryReader("./data").load_data() self.documents = SimpleDirectoryReader("./data").load_data()
@ -26,7 +26,7 @@ class Pipeline:
pass pass
async def on_shutdown(self): async def on_shutdown(self):
# This function is called when the server is stopped. # This function is called when the server is stopped or before valves are updated.
pass pass
def pipe( def pipe(

View File

@ -17,11 +17,11 @@ class Pipeline:
self.documents = SimpleDirectoryReader("./data").load_data() self.documents = SimpleDirectoryReader("./data").load_data()
self.index = VectorStoreIndex.from_documents(self.documents) self.index = VectorStoreIndex.from_documents(self.documents)
# This function is called when the server is started. # This function is called when the server is started or after valves are updated.
pass pass
async def on_shutdown(self): async def on_shutdown(self):
# This function is called when the server is stopped. # This function is called when the server is stopped or before valves are updated.
pass pass
def pipe( def pipe(

View File

@ -30,12 +30,12 @@ class Pipeline:
pass pass
async def on_startup(self): async def on_startup(self):
# This function is called when the server is started. # This function is called when the server is started or after valves are updated.
print(f"on_startup:{__name__}") print(f"on_startup:{__name__}")
pass pass
async def on_shutdown(self): async def on_shutdown(self):
# This function is called when the server is stopped. # This function is called when the server is stopped or before valves are updated.
print(f"on_shutdown:{__name__}") print(f"on_shutdown:{__name__}")
pass pass

View File

@ -32,12 +32,12 @@ class Pipeline:
] ]
async def on_startup(self): async def on_startup(self):
# This function is called when the server is started. # This function is called when the server is started or after valves are updated.
print(f"on_startup:{__name__}") print(f"on_startup:{__name__}")
pass pass
async def on_shutdown(self): async def on_shutdown(self):
# This function is called when the server is stopped. # This function is called when the server is stopped or before valves are updated.
print(f"on_shutdown:{__name__}") print(f"on_shutdown:{__name__}")
pass pass

View File

@ -14,12 +14,12 @@ class Pipeline:
pass pass
async def on_startup(self): async def on_startup(self):
# This function is called when the server is started. # This function is called when the server is started or after valves are updated.
print(f"on_startup:{__name__}") print(f"on_startup:{__name__}")
pass pass
async def on_shutdown(self): async def on_shutdown(self):
# This function is called when the server is stopped. # This function is called when the server is stopped or before valves are updated.
print(f"on_shutdown:{__name__}") print(f"on_shutdown:{__name__}")
pass pass

View File

@ -14,12 +14,12 @@ class Pipeline:
pass pass
async def on_startup(self): async def on_startup(self):
# This function is called when the server is started. # This function is called when the server is started or after valves are updated.
print(f"on_startup:{__name__}") print(f"on_startup:{__name__}")
pass pass
async def on_shutdown(self): async def on_shutdown(self):
# This function is called when the server is stopped. # This function is called when the server is stopped or before valves are updated.
print(f"on_shutdown:{__name__}") print(f"on_shutdown:{__name__}")
pass pass

View File

@ -14,12 +14,12 @@ class Pipeline:
pass pass
async def on_startup(self): async def on_startup(self):
# This function is called when the server is started. # This function is called when the server is started or after valves are updated.
print(f"on_startup:{__name__}") print(f"on_startup:{__name__}")
pass pass
async def on_shutdown(self): async def on_shutdown(self):
# This function is called when the server is stopped. # This function is called when the server is stopped or before valves are updated.
print(f"on_shutdown:{__name__}") print(f"on_shutdown:{__name__}")
pass pass

View File

@ -11,12 +11,12 @@ class Pipeline:
pass pass
async def on_startup(self): async def on_startup(self):
# This function is called when the server is started. # This function is called when the server is started or after valves are updated.
print(f"on_startup:{__name__}") print(f"on_startup:{__name__}")
pass pass
async def on_shutdown(self): async def on_shutdown(self):
# This function is called when the server is stopped. # This function is called when the server is stopped or before valves are updated.
print(f"on_shutdown:{__name__}") print(f"on_shutdown:{__name__}")
pass pass

View File

@ -47,12 +47,12 @@ class Pipeline:
self.user_requests = {} self.user_requests = {}
async def on_startup(self): async def on_startup(self):
# This function is called when the server is started. # This function is called when the server is started or after valves are updated.
print(f"on_startup:{__name__}") print(f"on_startup:{__name__}")
pass pass
async def on_shutdown(self): async def on_shutdown(self):
# This function is called when the server is stopped. # This function is called when the server is stopped or before valves are updated.
print(f"on_shutdown:{__name__}") print(f"on_shutdown:{__name__}")
pass pass

View File

@ -31,12 +31,12 @@ class Pipeline:
] ]
async def on_startup(self): async def on_startup(self):
# This function is called when the server is started. # This function is called when the server is started or after valves are updated.
print(f"on_startup:{__name__}") print(f"on_startup:{__name__}")
pass pass
async def on_shutdown(self): async def on_shutdown(self):
# This function is called when the server is stopped. # This function is called when the server is stopped or before valves are updated.
print(f"on_shutdown:{__name__}") print(f"on_shutdown:{__name__}")
pass pass

View File

@ -47,12 +47,12 @@ class Pipeline:
self.user_requests = {} self.user_requests = {}
async def on_startup(self): async def on_startup(self):
# This function is called when the server is started. # This function is called when the server is started or after valves are updated.
print(f"on_startup:{__name__}") print(f"on_startup:{__name__}")
pass pass
async def on_shutdown(self): async def on_shutdown(self):
# This function is called when the server is stopped. # This function is called when the server is stopped or before valves are updated.
print(f"on_shutdown:{__name__}") print(f"on_shutdown:{__name__}")
pass pass