From 5a6e476fd90d9f6ecaee648f39722c5bb1b26020 Mon Sep 17 00:00:00 2001 From: Eric Z Date: Sat, 28 Dec 2024 14:35:14 -0600 Subject: [PATCH 1/2] fix(wikipedia-pipeline): update wikipedia pipeline example (api + responses) --- .../integrations/wikipedia_pipeline.py | 189 ++++++++++++++---- 1 file changed, 153 insertions(+), 36 deletions(-) diff --git a/examples/pipelines/integrations/wikipedia_pipeline.py b/examples/pipelines/integrations/wikipedia_pipeline.py index 433a407..a066645 100644 --- a/examples/pipelines/integrations/wikipedia_pipeline.py +++ b/examples/pipelines/integrations/wikipedia_pipeline.py @@ -1,13 +1,36 @@ +""" +title: Wikipedia Article Retrieval +author: Unknown +author_url: Unknown +git_url: https://github.com/open-webui/pipelines/blob/main/examples/pipelines/integrations/wikipedia_pipeline.py +description: Wikipedia Search and Return +required_open_webui_version: 0.4.3 +requirements: wikipedia +version: 0.4.3 +licence: MIT +""" + + from typing import List, Union, Generator, Iterator -from pydantic import BaseModel -from schemas import OpenAIChatMessage +from pydantic import BaseModel, Field +import wikipedia import requests import os +from datetime import datetime +import time +import re + +from logging import getLogger +logger = getLogger(__name__) +logger.setLevel("DEBUG") class Pipeline: class Valves(BaseModel): - pass + # OPENAI_API_KEY: str = Field(default="", description="OpenAI API key") + RATE_LIMIT: int = Field(default=5, description="Rate limit for the pipeline") + WORD_LIMIT: int = Field(default=300, description="Word limit when getting page summary") + WIKIPEDIA_ROOT: str = Field(default="https://en.wikipedia.org/wiki", description="Wikipedia root URL") def __init__(self): # Optionally, you can set the id and name of the pipeline. @@ -17,53 +40,147 @@ class Pipeline: # self.id = "wiki_pipeline" self.name = "Wikipedia Pipeline" - # Initialize rate limits - self.valves = self.Valves(**{"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", "")}) + # Initialize valve paramaters + self.valves = self.Valves( + **{k: os.getenv(k, v.default) for k, v in self.Valves.model_fields.items()} + ) async def on_startup(self): # This function is called when the server is started. - print(f"on_startup:{__name__}") + logger.debug(f"on_startup:{self.name}") pass async def on_shutdown(self): # This function is called when the server is stopped. - print(f"on_shutdown:{__name__}") + logger.debug(f"on_shutdown:{self.name}") pass + def rate_check(self, dt_start: datetime): + """ + Check time, sleep if not enough time has passed for rate + + Args: + dt_start (datetime): Start time of the operation + Returns: + bool: True if sleep was done + """ + dt_end = datetime.now() + time_diff = (dt_end - dt_start).total_seconds() + time_buffer = (1 / self.valves.RATE_LIMIT) + if time_diff >= time_buffer: # no need to sleep + return False + time.sleep(time_buffer - time_diff) + return True + def pipe( - self, user_message: str, model_id: str, messages: List[dict], body: dict + self, + user_message: str, + model_id: str, + messages: List[dict], + body: dict ) -> Union[str, Generator, Iterator]: # This is where you can add your custom pipelines like RAG. - print(f"pipe:{__name__}") + logger.debug(f"pipe:{self.name}") - if body.get("title", False): - print("Title Generation") - return "Wikipedia Pipeline" - else: - titles = [] - for query in [user_message]: - query = query.replace(" ", "_") + # Check if title generation is requested + # as of 12/28/24, these were standard greetings + if ("broad tags categorizing" in user_message.lower()) \ + or ("Create a concise" in user_message.lower()): + # ## Create a concise, 3-5 word title with + # ## Task:\nGenerate 1-3 broad tags categorizing the main themes + logger.debug(f"Title Generation (aborted): {user_message}") + return "(title generation disabled)" - r = requests.get( - f"https://en.wikipedia.org/w/api.php?action=opensearch&search={query}&limit=1&namespace=0&format=json" - ) + logger.info(f"User Message: {user_message}") + # logger.info(f"Messages: {messages}") + # [{'role': 'user', 'content': 'history of ibm'}] + + # logger.info(f"Body: {body}") + # {'stream': True, 'model': 'wikipedia_pipeline', + # 'messages': [{'role': 'user', 'content': 'history of ibm'}], + # 'user': {'name': 'User', 'id': '235a828f-84a3-44a0-b7af-721ee8be6571', + # 'email': 'admin@localhost', 'role': 'admin'}} - response = r.json() - titles = titles + response[1] - print(titles) + re_query = re.compile(r"[^0-9A-Z]", re.IGNORECASE) + re_rough_word = re.compile(r"[\w]+", re.IGNORECASE) - context = None - if len(titles) > 0: - r = requests.get( - f"https://en.wikipedia.org/w/api.php?format=json&action=query&prop=extracts&exintro&explaintext&redirects=1&titles={'|'.join(titles)}" - ) - response = r.json() - # get extracts - pages = response["query"]["pages"] - for page in pages: - if context == None: - context = pages[page]["extract"] + "\n" - else: - context = context + pages[page]["extract"] + "\n" + topics = [] + dt_start = datetime.now() - return context if context else "No information found" + # examples from https://pypi.org/project/wikipedia/ + # new addition - ability to include multiple topics with a semicolon + for query in user_message.split(';'): + self.rate_check(dt_start) + query = query.strip() + try: + titles_found = wikipedia.search(query) + # r = requests.get( + # f"https://en.wikipedia.org/w/api.php?action=opensearch&search={query}&limit=1&namespace=0&format=json" + # ) + logger.info(f"Query: {query}, Found: {titles_found}") + topics.append((query, titles_found)) + except Exception as e: + logger.error(f"Search Error: {query} -> {e}") + return f"Page Search Error: {query}" + + context = "" + for query, titles_found in topics: + self.rate_check(dt_start) + + if context: # add separator if multiple topics + context += "---\n" + try: + title_check = titles_found[0] + wiki_page = wikipedia.page(title_check, auto_suggest=False) # trick! don't auto-suggest + except wikipedia.exceptions.DisambiguationError as e: + str_error = str(e).replace("\n", ", ") + str_error = f"## Disambiguation Error ({query})\n* Status: {str_error}" + logger.error(str_error) + context += str_error + "\n" + continue + except wikipedia.exceptions.RedirectError as e: + str_error = str(e).replace("\n", ", ") + str_error = f"## Redirect Error ({query})\n* Status: {str_error}" + logger.error(str_error) + context += str_error + "\n" + continue + except Exception as e: + if titles_found: + str_error = f"## Page Retrieve Error ({query})\n* Found Topics (matched '{title_check}') {titles_found}" + logger.error(f"{str_error} -> {e}") + else: + str_error = f"## Page Not Found ({query})\n* Unknown error" + logger.error(f"{str_error} -> {e}") + context += str_error + "\n" + continue + + # found a page / section + logger.info(f"Page Sections[{query}]: {wiki_page.sections}") + context += f"## {title_check}\n" + + # flatten internal links + # link_md = [f"[{x}]({self.valves.WIKIPEDIA_ROOT}/{re_query.sub('_', x)})" for x in wiki_page.links[:10]] + # context += "* Links (first 30): " + ",".join(link_md) + "\n" + + # add the textual summary + summary_full = wiki_page.summary + word_positions = [x.start() for x in re_rough_word.finditer(summary_full)] + if len(word_positions) > self.valves.WORD_LIMIT: + context += summary_full[:word_positions[self.valves.WORD_LIMIT]] + "...\n" + else: + context += summary_full + "\n" + + # the more you know! link to further reading + context += "### Learn More" + "\n" + context += f"* [Read more on Wikipedia...]({wiki_page.url})\n" + + # also spit out the related topics from search + link_md = [f"[{x}]({self.valves.WIKIPEDIA_ROOT}/{re_query.sub('_', x)})" for x in titles_found] + context += f"* Related topics: {', '.join(link_md)}\n" + + # throw in the first image for good measure + if wiki_page.images: + context += f"\n![Image: {title_check}]({wiki_page.images[0]})\n" + + # done with querying for different pages + return context if context else "No information found" From 542d143a883aaea74cb8c5d81fdfaaf17ee209ff Mon Sep 17 00:00:00 2001 From: Eric Z Date: Sun, 29 Dec 2024 05:31:36 -0600 Subject: [PATCH 2/2] fix(wikipedia-streaming): convert to streaming-compatible responses --- .../integrations/wikipedia_pipeline.py | 172 +++++++++++------- 1 file changed, 102 insertions(+), 70 deletions(-) diff --git a/examples/pipelines/integrations/wikipedia_pipeline.py b/examples/pipelines/integrations/wikipedia_pipeline.py index a066645..15eb797 100644 --- a/examples/pipelines/integrations/wikipedia_pipeline.py +++ b/examples/pipelines/integrations/wikipedia_pipeline.py @@ -79,7 +79,10 @@ class Pipeline: messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: - # This is where you can add your custom pipelines like RAG. + """ + Main pipeline function. Performs wikipedia article lookup by query + and returns the summary of the first article. + """ logger.debug(f"pipe:{self.name}") # Check if title generation is requested @@ -101,86 +104,115 @@ class Pipeline: # 'user': {'name': 'User', 'id': '235a828f-84a3-44a0-b7af-721ee8be6571', # 'email': 'admin@localhost', 'role': 'admin'}} - re_query = re.compile(r"[^0-9A-Z]", re.IGNORECASE) - re_rough_word = re.compile(r"[\w]+", re.IGNORECASE) - - topics = [] dt_start = datetime.now() + multi_part = False + streaming = body.get("stream", False) + logger.warning(f"Stream: {streaming}") + context = "" # examples from https://pypi.org/project/wikipedia/ # new addition - ability to include multiple topics with a semicolon for query in user_message.split(';'): self.rate_check(dt_start) query = query.strip() - try: - titles_found = wikipedia.search(query) - # r = requests.get( - # f"https://en.wikipedia.org/w/api.php?action=opensearch&search={query}&limit=1&namespace=0&format=json" - # ) - logger.info(f"Query: {query}, Found: {titles_found}") - topics.append((query, titles_found)) - except Exception as e: - logger.error(f"Search Error: {query} -> {e}") - return f"Page Search Error: {query}" - context = "" - for query, titles_found in topics: - self.rate_check(dt_start) - - if context: # add separator if multiple topics - context += "---\n" - try: - title_check = titles_found[0] - wiki_page = wikipedia.page(title_check, auto_suggest=False) # trick! don't auto-suggest - except wikipedia.exceptions.DisambiguationError as e: - str_error = str(e).replace("\n", ", ") - str_error = f"## Disambiguation Error ({query})\n* Status: {str_error}" - logger.error(str_error) - context += str_error + "\n" - continue - except wikipedia.exceptions.RedirectError as e: - str_error = str(e).replace("\n", ", ") - str_error = f"## Redirect Error ({query})\n* Status: {str_error}" - logger.error(str_error) - context += str_error + "\n" - continue - except Exception as e: - if titles_found: - str_error = f"## Page Retrieve Error ({query})\n* Found Topics (matched '{title_check}') {titles_found}" - logger.error(f"{str_error} -> {e}") + if multi_part: + if streaming: + yield "---\n" else: - str_error = f"## Page Not Found ({query})\n* Unknown error" - logger.error(f"{str_error} -> {e}") - context += str_error + "\n" - continue - - # found a page / section - logger.info(f"Page Sections[{query}]: {wiki_page.sections}") - context += f"## {title_check}\n" - - # flatten internal links - # link_md = [f"[{x}]({self.valves.WIKIPEDIA_ROOT}/{re_query.sub('_', x)})" for x in wiki_page.links[:10]] - # context += "* Links (first 30): " + ",".join(link_md) + "\n" - - # add the textual summary - summary_full = wiki_page.summary - word_positions = [x.start() for x in re_rough_word.finditer(summary_full)] - if len(word_positions) > self.valves.WORD_LIMIT: - context += summary_full[:word_positions[self.valves.WORD_LIMIT]] + "...\n" + context += "---\n" + if body.get("stream", True): + yield from self.stream_retrieve(query, dt_start) else: - context += summary_full + "\n" + for chunk in self.stream_retrieve(query, dt_start): + context += chunk + multi_part = True + + if not streaming: + return context if context else "No information found" - # the more you know! link to further reading - context += "### Learn More" + "\n" - context += f"* [Read more on Wikipedia...]({wiki_page.url})\n" - # also spit out the related topics from search - link_md = [f"[{x}]({self.valves.WIKIPEDIA_ROOT}/{re_query.sub('_', x)})" for x in titles_found] - context += f"* Related topics: {', '.join(link_md)}\n" + def stream_retrieve( + self, query:str, dt_start: datetime, + ) -> Generator: + """ + Retrieve the wikipedia page for the query and return the summary. Return a generator + for streaming responses but can also be iterated for a single response. + """ - # throw in the first image for good measure - if wiki_page.images: - context += f"\n![Image: {title_check}]({wiki_page.images[0]})\n" + re_query = re.compile(r"[^0-9A-Z]", re.IGNORECASE) + re_rough_word = re.compile(r"[\w]+", re.IGNORECASE) - # done with querying for different pages - return context if context else "No information found" + titles_found = None + try: + titles_found = wikipedia.search(query) + # r = requests.get( + # f"https://en.wikipedia.org/w/api.php?action=opensearch&search={query}&limit=1&namespace=0&format=json" + # ) + logger.info(f"Query: {query}, Found: {titles_found}") + except Exception as e: + logger.error(f"Search Error: {query} -> {e}") + yield f"Page Search Error: {query}" + + if titles_found is None or not titles_found: # no results + yield f"No information found for '{query}'" + return + + self.rate_check(dt_start) + + # if context: # add separator if multiple topics + # context += "---\n" + try: + title_check = titles_found[0] + wiki_page = wikipedia.page(title_check, auto_suggest=False) # trick! don't auto-suggest + except wikipedia.exceptions.DisambiguationError as e: + str_error = str(e).replace("\n", ", ") + str_error = f"## Disambiguation Error ({query})\n* Status: {str_error}" + logger.error(str_error) + yield str_error + "\n" + return + except wikipedia.exceptions.RedirectError as e: + str_error = str(e).replace("\n", ", ") + str_error = f"## Redirect Error ({query})\n* Status: {str_error}" + logger.error(str_error) + yield str_error + "\n" + return + except Exception as e: + if titles_found: + str_error = f"## Page Retrieve Error ({query})\n* Found Topics (matched '{title_check}') {titles_found}" + logger.error(f"{str_error} -> {e}") + else: + str_error = f"## Page Not Found ({query})\n* Unknown error" + logger.error(f"{str_error} -> {e}") + yield str_error + "\n" + return + + # found a page / section + logger.info(f"Page Sections[{query}]: {wiki_page.sections}") + yield f"## {title_check}\n" + + # flatten internal links + # link_md = [f"[{x}]({self.valves.WIKIPEDIA_ROOT}/{re_query.sub('_', x)})" for x in wiki_page.links[:10]] + # yield "* Links (first 30): " + ",".join(link_md) + "\n" + + # add the textual summary + summary_full = wiki_page.summary + word_positions = [x.start() for x in re_rough_word.finditer(summary_full)] + if len(word_positions) > self.valves.WORD_LIMIT: + yield summary_full[:word_positions[self.valves.WORD_LIMIT]] + "...\n" + else: + yield summary_full + "\n" + + # the more you know! link to further reading + yield "### Learn More" + "\n" + yield f"* [Read more on Wikipedia...]({wiki_page.url})\n" + + # also spit out the related topics from search + link_md = [f"[{x}]({self.valves.WIKIPEDIA_ROOT}/{re_query.sub('_', x)})" for x in titles_found] + yield f"* Related topics: {', '.join(link_md)}\n" + + # throw in the first image for good measure + if wiki_page.images: + yield f"\n![Image: {title_check}]({wiki_page.images[0]})\n" + + return \ No newline at end of file