mirror of
https://github.com/open-webui/open-webui
synced 2024-11-26 22:09:51 +00:00
99 lines
2.8 KiB
Python
99 lines
2.8 KiB
Python
from typing import Any, Dict, Generator, List, Optional, Sequence, Union
|
|
from urllib.parse import parse_qs, urlparse
|
|
from langchain_core.documents import Document
|
|
|
|
|
|
ALLOWED_SCHEMES = {"http", "https"}
|
|
ALLOWED_NETLOCS = {
|
|
"youtu.be",
|
|
"m.youtube.com",
|
|
"youtube.com",
|
|
"www.youtube.com",
|
|
"www.youtube-nocookie.com",
|
|
"vid.plus",
|
|
}
|
|
|
|
|
|
def _parse_video_id(url: str) -> Optional[str]:
|
|
"""Parse a YouTube URL and return the video ID if valid, otherwise None."""
|
|
parsed_url = urlparse(url)
|
|
|
|
if parsed_url.scheme not in ALLOWED_SCHEMES:
|
|
return None
|
|
|
|
if parsed_url.netloc not in ALLOWED_NETLOCS:
|
|
return None
|
|
|
|
path = parsed_url.path
|
|
|
|
if path.endswith("/watch"):
|
|
query = parsed_url.query
|
|
parsed_query = parse_qs(query)
|
|
if "v" in parsed_query:
|
|
ids = parsed_query["v"]
|
|
video_id = ids if isinstance(ids, str) else ids[0]
|
|
else:
|
|
return None
|
|
else:
|
|
path = parsed_url.path.lstrip("/")
|
|
video_id = path.split("/")[-1]
|
|
|
|
if len(video_id) != 11: # Video IDs are 11 characters long
|
|
return None
|
|
|
|
return video_id
|
|
|
|
|
|
class YoutubeLoader:
|
|
"""Load `YouTube` video transcripts."""
|
|
|
|
def __init__(
|
|
self,
|
|
video_id: str,
|
|
language: Union[str, Sequence[str]] = "en",
|
|
):
|
|
"""Initialize with YouTube video ID."""
|
|
_video_id = _parse_video_id(video_id)
|
|
self.video_id = _video_id if _video_id is not None else video_id
|
|
self._metadata = {"source": video_id}
|
|
self.language = language
|
|
if isinstance(language, str):
|
|
self.language = [language]
|
|
else:
|
|
self.language = language
|
|
|
|
def load(self) -> List[Document]:
|
|
"""Load YouTube transcripts into `Document` objects."""
|
|
try:
|
|
from youtube_transcript_api import (
|
|
NoTranscriptFound,
|
|
TranscriptsDisabled,
|
|
YouTubeTranscriptApi,
|
|
)
|
|
except ImportError:
|
|
raise ImportError(
|
|
'Could not import "youtube_transcript_api" Python package. '
|
|
"Please install it with `pip install youtube-transcript-api`."
|
|
)
|
|
|
|
try:
|
|
transcript_list = YouTubeTranscriptApi.list_transcripts(self.video_id)
|
|
except Exception as e:
|
|
print(e)
|
|
return []
|
|
|
|
try:
|
|
transcript = transcript_list.find_transcript(self.language)
|
|
except NoTranscriptFound:
|
|
transcript = transcript_list.find_transcript(["en"])
|
|
|
|
transcript_pieces: List[Dict[str, Any]] = transcript.fetch()
|
|
|
|
transcript = " ".join(
|
|
map(
|
|
lambda transcript_piece: transcript_piece["text"].strip(" "),
|
|
transcript_pieces,
|
|
)
|
|
)
|
|
return [Document(page_content=transcript, metadata=self._metadata)]
|