Merge pull request #378 from Fyve-Labs/fix/wikipedia-pipeline

fix(wikipedia-pipeline): update wikipedia pipeline example
This commit is contained in:
Timothy Jaeryang Baek 2025-02-14 13:45:25 -08:00 committed by GitHub
commit ab9012c228
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,13 +1,36 @@
"""
title: Wikipedia Article Retrieval
author: Unknown
author_url: Unknown
git_url: https://github.com/open-webui/pipelines/blob/main/examples/pipelines/integrations/wikipedia_pipeline.py
description: Wikipedia Search and Return
required_open_webui_version: 0.4.3
requirements: wikipedia
version: 0.4.3
licence: MIT
"""
from typing import List, Union, Generator, Iterator
from pydantic import BaseModel
from schemas import OpenAIChatMessage
from pydantic import BaseModel, Field
import wikipedia
import requests
import os
from datetime import datetime
import time
import re
from logging import getLogger
logger = getLogger(__name__)
logger.setLevel("DEBUG")
class Pipeline:
class Valves(BaseModel):
pass
# OPENAI_API_KEY: str = Field(default="", description="OpenAI API key")
RATE_LIMIT: int = Field(default=5, description="Rate limit for the pipeline")
WORD_LIMIT: int = Field(default=300, description="Word limit when getting page summary")
WIKIPEDIA_ROOT: str = Field(default="https://en.wikipedia.org/wiki", description="Wikipedia root URL")
def __init__(self):
# Optionally, you can set the id and name of the pipeline.
@ -17,53 +40,179 @@ class Pipeline:
# self.id = "wiki_pipeline"
self.name = "Wikipedia Pipeline"
# Initialize rate limits
self.valves = self.Valves(**{"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", "")})
# Initialize valve paramaters
self.valves = self.Valves(
**{k: os.getenv(k, v.default) for k, v in self.Valves.model_fields.items()}
)
async def on_startup(self):
# This function is called when the server is started.
print(f"on_startup:{__name__}")
logger.debug(f"on_startup:{self.name}")
pass
async def on_shutdown(self):
# This function is called when the server is stopped.
print(f"on_shutdown:{__name__}")
logger.debug(f"on_shutdown:{self.name}")
pass
def rate_check(self, dt_start: datetime):
"""
Check time, sleep if not enough time has passed for rate
Args:
dt_start (datetime): Start time of the operation
Returns:
bool: True if sleep was done
"""
dt_end = datetime.now()
time_diff = (dt_end - dt_start).total_seconds()
time_buffer = (1 / self.valves.RATE_LIMIT)
if time_diff >= time_buffer: # no need to sleep
return False
time.sleep(time_buffer - time_diff)
return True
def pipe(
self, user_message: str, model_id: str, messages: List[dict], body: dict
self,
user_message: str,
model_id: str,
messages: List[dict],
body: dict
) -> Union[str, Generator, Iterator]:
# This is where you can add your custom pipelines like RAG.
print(f"pipe:{__name__}")
"""
Main pipeline function. Performs wikipedia article lookup by query
and returns the summary of the first article.
"""
logger.debug(f"pipe:{self.name}")
if body.get("title", False):
print("Title Generation")
return "Wikipedia Pipeline"
else:
titles = []
for query in [user_message]:
query = query.replace(" ", "_")
# Check if title generation is requested
# as of 12/28/24, these were standard greetings
if ("broad tags categorizing" in user_message.lower()) \
or ("Create a concise" in user_message.lower()):
# ## Create a concise, 3-5 word title with
# ## Task:\nGenerate 1-3 broad tags categorizing the main themes
logger.debug(f"Title Generation (aborted): {user_message}")
return "(title generation disabled)"
r = requests.get(
f"https://en.wikipedia.org/w/api.php?action=opensearch&search={query}&limit=1&namespace=0&format=json"
)
logger.info(f"User Message: {user_message}")
# logger.info(f"Messages: {messages}")
# [{'role': 'user', 'content': 'history of ibm'}]
response = r.json()
titles = titles + response[1]
print(titles)
# logger.info(f"Body: {body}")
# {'stream': True, 'model': 'wikipedia_pipeline',
# 'messages': [{'role': 'user', 'content': 'history of ibm'}],
# 'user': {'name': 'User', 'id': '235a828f-84a3-44a0-b7af-721ee8be6571',
# 'email': 'admin@localhost', 'role': 'admin'}}
context = None
if len(titles) > 0:
r = requests.get(
f"https://en.wikipedia.org/w/api.php?format=json&action=query&prop=extracts&exintro&explaintext&redirects=1&titles={'|'.join(titles)}"
)
response = r.json()
# get extracts
pages = response["query"]["pages"]
for page in pages:
if context == None:
context = pages[page]["extract"] + "\n"
else:
context = context + pages[page]["extract"] + "\n"
dt_start = datetime.now()
multi_part = False
streaming = body.get("stream", False)
logger.warning(f"Stream: {streaming}")
context = ""
# examples from https://pypi.org/project/wikipedia/
# new addition - ability to include multiple topics with a semicolon
for query in user_message.split(';'):
self.rate_check(dt_start)
query = query.strip()
if multi_part:
if streaming:
yield "---\n"
else:
context += "---\n"
if body.get("stream", True):
yield from self.stream_retrieve(query, dt_start)
else:
for chunk in self.stream_retrieve(query, dt_start):
context += chunk
multi_part = True
if not streaming:
return context if context else "No information found"
def stream_retrieve(
self, query:str, dt_start: datetime,
) -> Generator:
"""
Retrieve the wikipedia page for the query and return the summary. Return a generator
for streaming responses but can also be iterated for a single response.
"""
re_query = re.compile(r"[^0-9A-Z]", re.IGNORECASE)
re_rough_word = re.compile(r"[\w]+", re.IGNORECASE)
titles_found = None
try:
titles_found = wikipedia.search(query)
# r = requests.get(
# f"https://en.wikipedia.org/w/api.php?action=opensearch&search={query}&limit=1&namespace=0&format=json"
# )
logger.info(f"Query: {query}, Found: {titles_found}")
except Exception as e:
logger.error(f"Search Error: {query} -> {e}")
yield f"Page Search Error: {query}"
if titles_found is None or not titles_found: # no results
yield f"No information found for '{query}'"
return
self.rate_check(dt_start)
# if context: # add separator if multiple topics
# context += "---\n"
try:
title_check = titles_found[0]
wiki_page = wikipedia.page(title_check, auto_suggest=False) # trick! don't auto-suggest
except wikipedia.exceptions.DisambiguationError as e:
str_error = str(e).replace("\n", ", ")
str_error = f"## Disambiguation Error ({query})\n* Status: {str_error}"
logger.error(str_error)
yield str_error + "\n"
return
except wikipedia.exceptions.RedirectError as e:
str_error = str(e).replace("\n", ", ")
str_error = f"## Redirect Error ({query})\n* Status: {str_error}"
logger.error(str_error)
yield str_error + "\n"
return
except Exception as e:
if titles_found:
str_error = f"## Page Retrieve Error ({query})\n* Found Topics (matched '{title_check}') {titles_found}"
logger.error(f"{str_error} -> {e}")
else:
str_error = f"## Page Not Found ({query})\n* Unknown error"
logger.error(f"{str_error} -> {e}")
yield str_error + "\n"
return
# found a page / section
logger.info(f"Page Sections[{query}]: {wiki_page.sections}")
yield f"## {title_check}\n"
# flatten internal links
# link_md = [f"[{x}]({self.valves.WIKIPEDIA_ROOT}/{re_query.sub('_', x)})" for x in wiki_page.links[:10]]
# yield "* Links (first 30): " + ",".join(link_md) + "\n"
# add the textual summary
summary_full = wiki_page.summary
word_positions = [x.start() for x in re_rough_word.finditer(summary_full)]
if len(word_positions) > self.valves.WORD_LIMIT:
yield summary_full[:word_positions[self.valves.WORD_LIMIT]] + "...\n"
else:
yield summary_full + "\n"
# the more you know! link to further reading
yield "### Learn More" + "\n"
yield f"* [Read more on Wikipedia...]({wiki_page.url})\n"
# also spit out the related topics from search
link_md = [f"[{x}]({self.valves.WIKIPEDIA_ROOT}/{re_query.sub('_', x)})" for x in titles_found]
yield f"* Related topics: {', '.join(link_md)}\n"
# throw in the first image for good measure
if wiki_page.images:
yield f"\n![Image: {title_check}]({wiki_page.images[0]})\n"
return