This commit is contained in:
Timothy J. Baek 2024-05-21 18:33:16 -07:00
parent 24e02a9017
commit 13f054714b
6 changed files with 211 additions and 209 deletions

View File

@ -37,8 +37,9 @@ def load_modules_from_directory(directory):
for loaded_module in load_modules_from_directory("./pipelines"): for loaded_module in load_modules_from_directory("./pipelines"):
# Do something with the loaded module # Do something with the loaded module
print("Loaded:", loaded_module.__name__) print("Loaded:", loaded_module.__name__)
PIPELINES[loaded_module.__name__] = { PIPELINES[loaded_module.__name__] = {
"module": loaded_module, "module": loaded_module.Pipeline(),
"id": loaded_module.__name__, "id": loaded_module.__name__,
"name": loaded_module.__name__, "name": loaded_module.__name__,
} }

View File

@ -1,30 +1,14 @@
from typing import List, Union, Generator from typing import List, Union, Generator
from schemas import OpenAIChatMessage from schemas import OpenAIChatMessage
import os import os
import asyncio
basic_rag_pipeline = None
def get_response( class Pipeline:
user_message: str, messages: List[OpenAIChatMessage] def __init__(self):
) -> Union[str, Generator]: self.basic_rag_pipeline = None
# This is where you can add your custom RAG pipeline.
# Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response.
print(messages)
print(user_message)
question = user_message
response = basic_rag_pipeline.run(
{"text_embedder": {"text": question}, "prompt_builder": {"question": question}}
)
return response["llm"]["replies"][0]
async def on_startup():
global basic_rag_pipeline
async def on_startup(self):
os.environ["OPENAI_API_KEY"] = "your_openai_api_key_here" os.environ["OPENAI_API_KEY"] = "your_openai_api_key_here"
from haystack.components.embedders import SentenceTransformersDocumentEmbedder from haystack.components.embedders import SentenceTransformersDocumentEmbedder
@ -74,22 +58,41 @@ async def on_startup():
generator = OpenAIGenerator(model="gpt-3.5-turbo") generator = OpenAIGenerator(model="gpt-3.5-turbo")
basic_rag_pipeline = Pipeline() self.basic_rag_pipeline = Pipeline()
# Add components to your pipeline # Add components to your pipeline
basic_rag_pipeline.add_component("text_embedder", text_embedder) self.basic_rag_pipeline.add_component("text_embedder", text_embedder)
basic_rag_pipeline.add_component("retriever", retriever) self.basic_rag_pipeline.add_component("retriever", retriever)
basic_rag_pipeline.add_component("prompt_builder", prompt_builder) self.basic_rag_pipeline.add_component("prompt_builder", prompt_builder)
basic_rag_pipeline.add_component("llm", generator) self.basic_rag_pipeline.add_component("llm", generator)
# Now, connect the components to each other # Now, connect the components to each other
basic_rag_pipeline.connect("text_embedder.embedding", "retriever.query_embedding") self.basic_rag_pipeline.connect(
basic_rag_pipeline.connect("retriever", "prompt_builder.documents") "text_embedder.embedding", "retriever.query_embedding"
basic_rag_pipeline.connect("prompt_builder", "llm") )
self.basic_rag_pipeline.connect("retriever", "prompt_builder.documents")
self.basic_rag_pipeline.connect("prompt_builder", "llm")
# This function is called when the server is started.
pass pass
async def on_shutdown(self):
async def on_shutdown():
# This function is called when the server is stopped. # This function is called when the server is stopped.
pass pass
def get_response(
self, user_message: str, messages: List[OpenAIChatMessage]
) -> Union[str, Generator]:
# This is where you can add your custom RAG pipeline.
# Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response.
print(messages)
print(user_message)
question = user_message
response = self.basic_rag_pipeline.run(
{
"text_embedder": {"text": question},
"prompt_builder": {"question": question},
}
)
return response["llm"]["replies"][0]

View File

@ -3,27 +3,13 @@ from schemas import OpenAIChatMessage
import os import os
import asyncio import asyncio
index = None
documents = None
class Pipeline:
def __init__(self):
self.documents = None
self.index = None
def get_response( async def on_startup(self):
user_message: str, messages: List[OpenAIChatMessage]
) -> Union[str, Generator]:
# This is where you can add your custom RAG pipeline.
# Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response.
print(messages)
print(user_message)
query_engine = index.as_query_engine(streaming=True)
response = query_engine.query(user_message)
return response.response_gen
async def on_startup():
from llama_index.embeddings.ollama import OllamaEmbedding from llama_index.embeddings.ollama import OllamaEmbedding
from llama_index.llms.ollama import Ollama from llama_index.llms.ollama import Ollama
from llama_index.core import VectorStoreIndex, Settings from llama_index.core import VectorStoreIndex, Settings
@ -71,15 +57,28 @@ async def on_startup():
try: try:
# Load data from the branch # Load data from the branch
documents = await asyncio.to_thread(reader.load_data, branch=branch) self.documents = await asyncio.to_thread(reader.load_data, branch=branch)
index = VectorStoreIndex.from_documents(documents) self.index = VectorStoreIndex.from_documents(documents)
finally: finally:
loop.close() loop.close()
print(documents) print(self.documents)
print(index) print(self.index)
async def on_shutdown(self):
async def on_shutdown(): # This function is called when the server is stopped.
# This function is called when the pipeline is stopped.
pass pass
def get_response(
self, user_message: str, messages: List[OpenAIChatMessage]
) -> Union[str, Generator]:
# This is where you can add your custom RAG pipeline.
# Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response.
print(messages)
print(user_message)
query_engine = self.index.as_query_engine(streaming=True)
response = query_engine.query(user_message)
return response.response_gen

View File

@ -2,28 +2,12 @@ from typing import List, Union, Generator
from schemas import OpenAIChatMessage from schemas import OpenAIChatMessage
documents = None class Pipeline:
index = None def __init__(self):
self.documents = None
self.index = None
async def on_startup(self):
def get_response(
user_message: str, messages: List[OpenAIChatMessage]
) -> Union[str, Generator]:
# This is where you can add your custom RAG pipeline.
# Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response.
print(messages)
print(user_message)
query_engine = index.as_query_engine(streaming=True)
response = query_engine.query(user_message)
print(response)
return response.response_gen
async def on_startup():
from llama_index.embeddings.ollama import OllamaEmbedding from llama_index.embeddings.ollama import OllamaEmbedding
from llama_index.llms.ollama import Ollama from llama_index.llms.ollama import Ollama
from llama_index.core import Settings, VectorStoreIndex, SimpleDirectoryReader from llama_index.core import Settings, VectorStoreIndex, SimpleDirectoryReader
@ -37,12 +21,26 @@ async def on_startup():
# This function is called when the server is started. # This function is called when the server is started.
global documents, index global documents, index
documents = SimpleDirectoryReader("./data").load_data() self.documents = SimpleDirectoryReader("./data").load_data()
index = VectorStoreIndex.from_documents(documents) self.index = VectorStoreIndex.from_documents(documents)
pass pass
async def on_shutdown(self):
async def on_shutdown():
# This function is called when the server is stopped. # This function is called when the server is stopped.
pass pass
def get_response(
self, user_message: str, messages: List[OpenAIChatMessage]
) -> Union[str, Generator]:
# This is where you can add your custom RAG pipeline.
# Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response.
print(messages)
print(user_message)
query_engine = self.index.as_query_engine(streaming=True)
response = query_engine.query(user_message)
print(response)
return response.response_gen

View File

@ -1,40 +1,39 @@
from typing import List, Union, Generator from typing import List, Union, Generator
from schemas import OpenAIChatMessage from schemas import OpenAIChatMessage
documents = None
index = None
class Pipeline:
def __init__(self):
self.documents = None
self.index = None
def get_response( async def on_startup(self):
user_message: str, messages: List[OpenAIChatMessage] import os
) -> Union[str, Generator]:
# Set the OpenAI API key
os.environ["OPENAI_API_KEY"] = "your-api-key-here"
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
self.documents = SimpleDirectoryReader("./data").load_data()
self.index = VectorStoreIndex.from_documents(self.documents)
# This function is called when the server is started.
pass
async def on_shutdown(self):
# This function is called when the server is stopped.
pass
def get_response(
self, user_message: str, messages: List[OpenAIChatMessage]
) -> Union[str, Generator]:
# This is where you can add your custom RAG pipeline. # This is where you can add your custom RAG pipeline.
# Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response. # Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response.
print(messages) print(messages)
print(user_message) print(user_message)
query_engine = index.as_query_engine(streaming=True) query_engine = self.index.as_query_engine(streaming=True)
response = query_engine.query(user_message) response = query_engine.query(user_message)
return response.response_gen return response.response_gen
async def on_startup():
global documents, index
import os
# Set the OpenAI API key
os.environ["OPENAI_API_KEY"] = "your_openai_api_key_here"
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
documents = SimpleDirectoryReader("./data").load_data()
index = VectorStoreIndex.from_documents(documents)
# This function is called when the server is started.
pass
async def on_shutdown():
# This function is called when the server is stopped.
pass

View File

@ -2,25 +2,27 @@ from typing import List, Union, Generator
from schemas import OpenAIChatMessage from schemas import OpenAIChatMessage
def get_response( class Pipeline:
user_message: str, messages: List[OpenAIChatMessage] def __init__(self):
) -> Union[str, Generator]: pass
# This is where you can add your custom pipelines like RAG.
print(messages) async def on_startup(self):
print(user_message)
return f"pipeline response to: {user_message}"
async def on_startup():
# This function is called when the server is started. # This function is called when the server is started.
print("onstartup") print("onstartup")
print(__name__) print(__name__)
pass pass
async def on_shutdown():
async def on_shutdown():
# This function is called when the server is stopped. # This function is called when the server is stopped.
pass pass
def get_response(
self, user_message: str, messages: List[OpenAIChatMessage]
) -> Union[str, Generator]:
# This is where you can add your custom pipelines like RAG.
print(messages)
print(user_message)
return f"pipeline response to: {user_message}"