diff --git a/main.py b/main.py index fbe30fb..61e50af 100644 --- a/main.py +++ b/main.py @@ -37,8 +37,9 @@ def load_modules_from_directory(directory): for loaded_module in load_modules_from_directory("./pipelines"): # Do something with the loaded module print("Loaded:", loaded_module.__name__) + PIPELINES[loaded_module.__name__] = { - "module": loaded_module, + "module": loaded_module.Pipeline(), "id": loaded_module.__name__, "name": loaded_module.__name__, } diff --git a/pipelines/examples/haystack_pipeline.py b/pipelines/examples/haystack_pipeline.py index ff76e4e..b7df99d 100644 --- a/pipelines/examples/haystack_pipeline.py +++ b/pipelines/examples/haystack_pipeline.py @@ -1,95 +1,98 @@ from typing import List, Union, Generator from schemas import OpenAIChatMessage import os - -basic_rag_pipeline = None +import asyncio -def get_response( - user_message: str, messages: List[OpenAIChatMessage] -) -> Union[str, Generator]: - # This is where you can add your custom RAG pipeline. - # Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response. +class Pipeline: + def __init__(self): + self.basic_rag_pipeline = None - print(messages) - print(user_message) + async def on_startup(self): + os.environ["OPENAI_API_KEY"] = "your_openai_api_key_here" - question = user_message - response = basic_rag_pipeline.run( - {"text_embedder": {"text": question}, "prompt_builder": {"question": question}} - ) + from haystack.components.embedders import SentenceTransformersDocumentEmbedder + from haystack.components.embedders import SentenceTransformersTextEmbedder + from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever + from haystack.components.builders import PromptBuilder + from haystack.components.generators import OpenAIGenerator - return response["llm"]["replies"][0] + from haystack.document_stores.in_memory import InMemoryDocumentStore + from datasets import load_dataset + from haystack import Document + from haystack import Pipeline -async def on_startup(): - global basic_rag_pipeline + document_store = InMemoryDocumentStore() - os.environ["OPENAI_API_KEY"] = "your_openai_api_key_here" + dataset = load_dataset("bilgeyucel/seven-wonders", split="train") + docs = [Document(content=doc["content"], meta=doc["meta"]) for doc in dataset] - from haystack.components.embedders import SentenceTransformersDocumentEmbedder - from haystack.components.embedders import SentenceTransformersTextEmbedder - from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever - from haystack.components.builders import PromptBuilder - from haystack.components.generators import OpenAIGenerator + doc_embedder = SentenceTransformersDocumentEmbedder( + model="sentence-transformers/all-MiniLM-L6-v2" + ) + doc_embedder.warm_up() - from haystack.document_stores.in_memory import InMemoryDocumentStore + docs_with_embeddings = doc_embedder.run(docs) + document_store.write_documents(docs_with_embeddings["documents"]) - from datasets import load_dataset - from haystack import Document - from haystack import Pipeline + text_embedder = SentenceTransformersTextEmbedder( + model="sentence-transformers/all-MiniLM-L6-v2" + ) - document_store = InMemoryDocumentStore() + retriever = InMemoryEmbeddingRetriever(document_store) - dataset = load_dataset("bilgeyucel/seven-wonders", split="train") - docs = [Document(content=doc["content"], meta=doc["meta"]) for doc in dataset] + template = """ + Given the following information, answer the question. - doc_embedder = SentenceTransformersDocumentEmbedder( - model="sentence-transformers/all-MiniLM-L6-v2" - ) - doc_embedder.warm_up() + Context: + {% for document in documents %} + {{ document.content }} + {% endfor %} - docs_with_embeddings = doc_embedder.run(docs) - document_store.write_documents(docs_with_embeddings["documents"]) + Question: {{question}} + Answer: + """ - text_embedder = SentenceTransformersTextEmbedder( - model="sentence-transformers/all-MiniLM-L6-v2" - ) + prompt_builder = PromptBuilder(template=template) - retriever = InMemoryEmbeddingRetriever(document_store) + generator = OpenAIGenerator(model="gpt-3.5-turbo") - template = """ - Given the following information, answer the question. + self.basic_rag_pipeline = Pipeline() + # Add components to your pipeline + self.basic_rag_pipeline.add_component("text_embedder", text_embedder) + self.basic_rag_pipeline.add_component("retriever", retriever) + self.basic_rag_pipeline.add_component("prompt_builder", prompt_builder) + self.basic_rag_pipeline.add_component("llm", generator) - Context: - {% for document in documents %} - {{ document.content }} - {% endfor %} + # Now, connect the components to each other + self.basic_rag_pipeline.connect( + "text_embedder.embedding", "retriever.query_embedding" + ) + self.basic_rag_pipeline.connect("retriever", "prompt_builder.documents") + self.basic_rag_pipeline.connect("prompt_builder", "llm") - Question: {{question}} - Answer: - """ + pass - prompt_builder = PromptBuilder(template=template) + async def on_shutdown(self): + # This function is called when the server is stopped. + pass - generator = OpenAIGenerator(model="gpt-3.5-turbo") + def get_response( + self, user_message: str, messages: List[OpenAIChatMessage] + ) -> Union[str, Generator]: + # This is where you can add your custom RAG pipeline. + # Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response. - basic_rag_pipeline = Pipeline() - # Add components to your pipeline - basic_rag_pipeline.add_component("text_embedder", text_embedder) - basic_rag_pipeline.add_component("retriever", retriever) - basic_rag_pipeline.add_component("prompt_builder", prompt_builder) - basic_rag_pipeline.add_component("llm", generator) + print(messages) + print(user_message) - # Now, connect the components to each other - basic_rag_pipeline.connect("text_embedder.embedding", "retriever.query_embedding") - basic_rag_pipeline.connect("retriever", "prompt_builder.documents") - basic_rag_pipeline.connect("prompt_builder", "llm") + question = user_message + response = self.basic_rag_pipeline.run( + { + "text_embedder": {"text": question}, + "prompt_builder": {"question": question}, + } + ) - # This function is called when the server is started. - pass - - -async def on_shutdown(): - # This function is called when the server is stopped. - pass + return response["llm"]["replies"][0] diff --git a/pipelines/examples/llamaindex_ollama_github_pipeline.py b/pipelines/examples/llamaindex_ollama_github_pipeline.py index a951283..85cba62 100644 --- a/pipelines/examples/llamaindex_ollama_github_pipeline.py +++ b/pipelines/examples/llamaindex_ollama_github_pipeline.py @@ -3,83 +3,82 @@ from schemas import OpenAIChatMessage import os import asyncio -index = None -documents = None +class Pipeline: + def __init__(self): + self.documents = None + self.index = None -def get_response( - user_message: str, messages: List[OpenAIChatMessage] -) -> Union[str, Generator]: - # This is where you can add your custom RAG pipeline. - # Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response. + async def on_startup(self): + from llama_index.embeddings.ollama import OllamaEmbedding + from llama_index.llms.ollama import Ollama + from llama_index.core import VectorStoreIndex, Settings + from llama_index.readers.github import GithubRepositoryReader, GithubClient - print(messages) - print(user_message) + Settings.embed_model = OllamaEmbedding( + model_name="nomic-embed-text", + base_url="http://localhost:11434", + ) + Settings.llm = Ollama(model="llama3") - query_engine = index.as_query_engine(streaming=True) - response = query_engine.query(user_message) + global index, documents - return response.response_gen + github_token = os.environ.get("GITHUB_TOKEN") + owner = "open-webui" + repo = "plugin-server" + branch = "main" + github_client = GithubClient(github_token=github_token, verbose=True) -async def on_startup(): + reader = GithubRepositoryReader( + github_client=github_client, + owner=owner, + repo=repo, + use_parser=False, + verbose=False, + filter_file_extensions=( + [ + ".png", + ".jpg", + ".jpeg", + ".gif", + ".svg", + ".ico", + "json", + ".ipynb", + ], + GithubRepositoryReader.FilterType.EXCLUDE, + ), + ) - from llama_index.embeddings.ollama import OllamaEmbedding - from llama_index.llms.ollama import Ollama - from llama_index.core import VectorStoreIndex, Settings - from llama_index.readers.github import GithubRepositoryReader, GithubClient + loop = asyncio.new_event_loop() - Settings.embed_model = OllamaEmbedding( - model_name="nomic-embed-text", - base_url="http://localhost:11434", - ) - Settings.llm = Ollama(model="llama3") + reader._loop = loop - global index, documents + try: + # Load data from the branch + self.documents = await asyncio.to_thread(reader.load_data, branch=branch) + self.index = VectorStoreIndex.from_documents(documents) + finally: + loop.close() - github_token = os.environ.get("GITHUB_TOKEN") - owner = "open-webui" - repo = "plugin-server" - branch = "main" + print(self.documents) + print(self.index) - github_client = GithubClient(github_token=github_token, verbose=True) + async def on_shutdown(self): + # This function is called when the server is stopped. + pass - reader = GithubRepositoryReader( - github_client=github_client, - owner=owner, - repo=repo, - use_parser=False, - verbose=False, - filter_file_extensions=( - [ - ".png", - ".jpg", - ".jpeg", - ".gif", - ".svg", - ".ico", - "json", - ".ipynb", - ], - GithubRepositoryReader.FilterType.EXCLUDE, - ), - ) + def get_response( + self, user_message: str, messages: List[OpenAIChatMessage] + ) -> Union[str, Generator]: + # This is where you can add your custom RAG pipeline. + # Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response. - loop = asyncio.new_event_loop() + print(messages) + print(user_message) - reader._loop = loop + query_engine = self.index.as_query_engine(streaming=True) + response = query_engine.query(user_message) - try: - # Load data from the branch - documents = await asyncio.to_thread(reader.load_data, branch=branch) - index = VectorStoreIndex.from_documents(documents) - finally: - loop.close() - - print(documents) - print(index) - - -async def on_shutdown(): - # This function is called when the pipeline is stopped. - pass + return response.response_gen diff --git a/pipelines/examples/llamaindex_ollama_pipeline.py b/pipelines/examples/llamaindex_ollama_pipeline.py index 3f427ec..d791ad2 100644 --- a/pipelines/examples/llamaindex_ollama_pipeline.py +++ b/pipelines/examples/llamaindex_ollama_pipeline.py @@ -2,47 +2,45 @@ from typing import List, Union, Generator from schemas import OpenAIChatMessage -documents = None -index = None +class Pipeline: + def __init__(self): + self.documents = None + self.index = None + async def on_startup(self): + from llama_index.embeddings.ollama import OllamaEmbedding + from llama_index.llms.ollama import Ollama + from llama_index.core import Settings, VectorStoreIndex, SimpleDirectoryReader -def get_response( - user_message: str, messages: List[OpenAIChatMessage] -) -> Union[str, Generator]: - # This is where you can add your custom RAG pipeline. - # Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response. + Settings.embed_model = OllamaEmbedding( + model_name="nomic-embed-text", + base_url="http://localhost:11434", + ) + Settings.llm = Ollama(model="llama3") - print(messages) - print(user_message) + # This function is called when the server is started. + global documents, index - query_engine = index.as_query_engine(streaming=True) - response = query_engine.query(user_message) + self.documents = SimpleDirectoryReader("./data").load_data() + self.index = VectorStoreIndex.from_documents(documents) + pass - print(response) + async def on_shutdown(self): + # This function is called when the server is stopped. + pass - return response.response_gen + def get_response( + self, user_message: str, messages: List[OpenAIChatMessage] + ) -> Union[str, Generator]: + # This is where you can add your custom RAG pipeline. + # Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response. + print(messages) + print(user_message) -async def on_startup(): - from llama_index.embeddings.ollama import OllamaEmbedding - from llama_index.llms.ollama import Ollama - from llama_index.core import Settings, VectorStoreIndex, SimpleDirectoryReader + query_engine = self.index.as_query_engine(streaming=True) + response = query_engine.query(user_message) - Settings.embed_model = OllamaEmbedding( - model_name="nomic-embed-text", - base_url="http://localhost:11434", - ) - Settings.llm = Ollama(model="llama3") + print(response) - # This function is called when the server is started. - global documents, index - - documents = SimpleDirectoryReader("./data").load_data() - index = VectorStoreIndex.from_documents(documents) - - pass - - -async def on_shutdown(): - # This function is called when the server is stopped. - pass + return response.response_gen diff --git a/pipelines/examples/llamaindex_pipeline.py b/pipelines/examples/llamaindex_pipeline.py index ea2c6bd..975d4e3 100644 --- a/pipelines/examples/llamaindex_pipeline.py +++ b/pipelines/examples/llamaindex_pipeline.py @@ -1,40 +1,39 @@ from typing import List, Union, Generator from schemas import OpenAIChatMessage -documents = None -index = None +class Pipeline: + def __init__(self): + self.documents = None + self.index = None -def get_response( - user_message: str, messages: List[OpenAIChatMessage] -) -> Union[str, Generator]: - # This is where you can add your custom RAG pipeline. - # Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response. + async def on_startup(self): + import os - print(messages) - print(user_message) + # Set the OpenAI API key + os.environ["OPENAI_API_KEY"] = "your-api-key-here" - query_engine = index.as_query_engine(streaming=True) - response = query_engine.query(user_message) + from llama_index.core import VectorStoreIndex, SimpleDirectoryReader - return response.response_gen + self.documents = SimpleDirectoryReader("./data").load_data() + self.index = VectorStoreIndex.from_documents(self.documents) + # This function is called when the server is started. + pass + async def on_shutdown(self): + # This function is called when the server is stopped. + pass -async def on_startup(): - global documents, index - import os + def get_response( + self, user_message: str, messages: List[OpenAIChatMessage] + ) -> Union[str, Generator]: + # This is where you can add your custom RAG pipeline. + # Typically, you would retrieve relevant information from your knowledge base and synthesize it to generate a response. - # Set the OpenAI API key - os.environ["OPENAI_API_KEY"] = "your_openai_api_key_here" + print(messages) + print(user_message) - from llama_index.core import VectorStoreIndex, SimpleDirectoryReader + query_engine = self.index.as_query_engine(streaming=True) + response = query_engine.query(user_message) - documents = SimpleDirectoryReader("./data").load_data() - index = VectorStoreIndex.from_documents(documents) - # This function is called when the server is started. - pass - - -async def on_shutdown(): - # This function is called when the server is stopped. - pass + return response.response_gen diff --git a/pipelines/pipeline.py b/pipelines/pipeline.py index d23a8eb..a70aabf 100644 --- a/pipelines/pipeline.py +++ b/pipelines/pipeline.py @@ -2,25 +2,27 @@ from typing import List, Union, Generator from schemas import OpenAIChatMessage -def get_response( - user_message: str, messages: List[OpenAIChatMessage] -) -> Union[str, Generator]: - # This is where you can add your custom pipelines like RAG. +class Pipeline: + def __init__(self): + pass - print(messages) - print(user_message) + async def on_startup(self): + # This function is called when the server is started. + print("onstartup") + print(__name__) - return f"pipeline response to: {user_message}" + pass + async def on_shutdown(): + # This function is called when the server is stopped. + pass -async def on_startup(): - # This function is called when the server is started. - print("onstartup") - print(__name__) + def get_response( + self, user_message: str, messages: List[OpenAIChatMessage] + ) -> Union[str, Generator]: + # This is where you can add your custom pipelines like RAG. - pass + print(messages) + print(user_message) - -async def on_shutdown(): - # This function is called when the server is stopped. - pass + return f"pipeline response to: {user_message}"