fix(wikipedia-pipeline): update wikipedia pipeline example (api + responses)

This commit is contained in:
Eric Z 2024-12-28 14:35:14 -06:00
parent 1367d95750
commit 5a6e476fd9

View File

@ -1,13 +1,36 @@
"""
title: Wikipedia Article Retrieval
author: Unknown
author_url: Unknown
git_url: https://github.com/open-webui/pipelines/blob/main/examples/pipelines/integrations/wikipedia_pipeline.py
description: Wikipedia Search and Return
required_open_webui_version: 0.4.3
requirements: wikipedia
version: 0.4.3
licence: MIT
"""
from typing import List, Union, Generator, Iterator
from pydantic import BaseModel
from schemas import OpenAIChatMessage
from pydantic import BaseModel, Field
import wikipedia
import requests
import os
from datetime import datetime
import time
import re
from logging import getLogger
logger = getLogger(__name__)
logger.setLevel("DEBUG")
class Pipeline:
class Valves(BaseModel):
pass
# OPENAI_API_KEY: str = Field(default="", description="OpenAI API key")
RATE_LIMIT: int = Field(default=5, description="Rate limit for the pipeline")
WORD_LIMIT: int = Field(default=300, description="Word limit when getting page summary")
WIKIPEDIA_ROOT: str = Field(default="https://en.wikipedia.org/wiki", description="Wikipedia root URL")
def __init__(self):
# Optionally, you can set the id and name of the pipeline.
@ -17,53 +40,147 @@ class Pipeline:
# self.id = "wiki_pipeline"
self.name = "Wikipedia Pipeline"
# Initialize rate limits
self.valves = self.Valves(**{"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", "")})
# Initialize valve paramaters
self.valves = self.Valves(
**{k: os.getenv(k, v.default) for k, v in self.Valves.model_fields.items()}
)
async def on_startup(self):
# This function is called when the server is started.
print(f"on_startup:{__name__}")
logger.debug(f"on_startup:{self.name}")
pass
async def on_shutdown(self):
# This function is called when the server is stopped.
print(f"on_shutdown:{__name__}")
logger.debug(f"on_shutdown:{self.name}")
pass
def rate_check(self, dt_start: datetime):
"""
Check time, sleep if not enough time has passed for rate
Args:
dt_start (datetime): Start time of the operation
Returns:
bool: True if sleep was done
"""
dt_end = datetime.now()
time_diff = (dt_end - dt_start).total_seconds()
time_buffer = (1 / self.valves.RATE_LIMIT)
if time_diff >= time_buffer: # no need to sleep
return False
time.sleep(time_buffer - time_diff)
return True
def pipe(
self, user_message: str, model_id: str, messages: List[dict], body: dict
self,
user_message: str,
model_id: str,
messages: List[dict],
body: dict
) -> Union[str, Generator, Iterator]:
# This is where you can add your custom pipelines like RAG.
print(f"pipe:{__name__}")
logger.debug(f"pipe:{self.name}")
if body.get("title", False):
print("Title Generation")
return "Wikipedia Pipeline"
else:
titles = []
for query in [user_message]:
query = query.replace(" ", "_")
# Check if title generation is requested
# as of 12/28/24, these were standard greetings
if ("broad tags categorizing" in user_message.lower()) \
or ("Create a concise" in user_message.lower()):
# ## Create a concise, 3-5 word title with
# ## Task:\nGenerate 1-3 broad tags categorizing the main themes
logger.debug(f"Title Generation (aborted): {user_message}")
return "(title generation disabled)"
r = requests.get(
f"https://en.wikipedia.org/w/api.php?action=opensearch&search={query}&limit=1&namespace=0&format=json"
)
logger.info(f"User Message: {user_message}")
# logger.info(f"Messages: {messages}")
# [{'role': 'user', 'content': 'history of ibm'}]
# logger.info(f"Body: {body}")
# {'stream': True, 'model': 'wikipedia_pipeline',
# 'messages': [{'role': 'user', 'content': 'history of ibm'}],
# 'user': {'name': 'User', 'id': '235a828f-84a3-44a0-b7af-721ee8be6571',
# 'email': 'admin@localhost', 'role': 'admin'}}
response = r.json()
titles = titles + response[1]
print(titles)
re_query = re.compile(r"[^0-9A-Z]", re.IGNORECASE)
re_rough_word = re.compile(r"[\w]+", re.IGNORECASE)
context = None
if len(titles) > 0:
r = requests.get(
f"https://en.wikipedia.org/w/api.php?format=json&action=query&prop=extracts&exintro&explaintext&redirects=1&titles={'|'.join(titles)}"
)
response = r.json()
# get extracts
pages = response["query"]["pages"]
for page in pages:
if context == None:
context = pages[page]["extract"] + "\n"
else:
context = context + pages[page]["extract"] + "\n"
topics = []
dt_start = datetime.now()
return context if context else "No information found"
# examples from https://pypi.org/project/wikipedia/
# new addition - ability to include multiple topics with a semicolon
for query in user_message.split(';'):
self.rate_check(dt_start)
query = query.strip()
try:
titles_found = wikipedia.search(query)
# r = requests.get(
# f"https://en.wikipedia.org/w/api.php?action=opensearch&search={query}&limit=1&namespace=0&format=json"
# )
logger.info(f"Query: {query}, Found: {titles_found}")
topics.append((query, titles_found))
except Exception as e:
logger.error(f"Search Error: {query} -> {e}")
return f"Page Search Error: {query}"
context = ""
for query, titles_found in topics:
self.rate_check(dt_start)
if context: # add separator if multiple topics
context += "---\n"
try:
title_check = titles_found[0]
wiki_page = wikipedia.page(title_check, auto_suggest=False) # trick! don't auto-suggest
except wikipedia.exceptions.DisambiguationError as e:
str_error = str(e).replace("\n", ", ")
str_error = f"## Disambiguation Error ({query})\n* Status: {str_error}"
logger.error(str_error)
context += str_error + "\n"
continue
except wikipedia.exceptions.RedirectError as e:
str_error = str(e).replace("\n", ", ")
str_error = f"## Redirect Error ({query})\n* Status: {str_error}"
logger.error(str_error)
context += str_error + "\n"
continue
except Exception as e:
if titles_found:
str_error = f"## Page Retrieve Error ({query})\n* Found Topics (matched '{title_check}') {titles_found}"
logger.error(f"{str_error} -> {e}")
else:
str_error = f"## Page Not Found ({query})\n* Unknown error"
logger.error(f"{str_error} -> {e}")
context += str_error + "\n"
continue
# found a page / section
logger.info(f"Page Sections[{query}]: {wiki_page.sections}")
context += f"## {title_check}\n"
# flatten internal links
# link_md = [f"[{x}]({self.valves.WIKIPEDIA_ROOT}/{re_query.sub('_', x)})" for x in wiki_page.links[:10]]
# context += "* Links (first 30): " + ",".join(link_md) + "\n"
# add the textual summary
summary_full = wiki_page.summary
word_positions = [x.start() for x in re_rough_word.finditer(summary_full)]
if len(word_positions) > self.valves.WORD_LIMIT:
context += summary_full[:word_positions[self.valves.WORD_LIMIT]] + "...\n"
else:
context += summary_full + "\n"
# the more you know! link to further reading
context += "### Learn More" + "\n"
context += f"* [Read more on Wikipedia...]({wiki_page.url})\n"
# also spit out the related topics from search
link_md = [f"[{x}]({self.valves.WIKIPEDIA_ROOT}/{re_query.sub('_', x)})" for x in titles_found]
context += f"* Related topics: {', '.join(link_md)}\n"
# throw in the first image for good measure
if wiki_page.images:
context += f"\n![Image: {title_check}]({wiki_page.images[0]})\n"
# done with querying for different pages
return context if context else "No information found"