import asyncio import logging import socket import ssl import urllib.parse import urllib.request from collections import defaultdict from datetime import datetime, time, timedelta from typing import ( Any, AsyncIterator, Dict, Iterator, List, Optional, Sequence, Union, Literal, ) import aiohttp import certifi import validators from langchain_community.document_loaders import PlaywrightURLLoader, WebBaseLoader from langchain_community.document_loaders.firecrawl import FireCrawlLoader from langchain_community.document_loaders.base import BaseLoader from langchain_core.documents import Document from open_webui.constants import ERROR_MESSAGES from open_webui.config import ( ENABLE_RAG_LOCAL_WEB_FETCH, PLAYWRIGHT_WS_URI, RAG_WEB_LOADER_ENGINE, FIRECRAWL_API_BASE_URL, FIRECRAWL_API_KEY, ) from open_webui.env import SRC_LOG_LEVELS log = logging.getLogger(__name__) log.setLevel(SRC_LOG_LEVELS["RAG"]) def validate_url(url: Union[str, Sequence[str]]): if isinstance(url, str): if isinstance(validators.url(url), validators.ValidationError): raise ValueError(ERROR_MESSAGES.INVALID_URL) if not ENABLE_RAG_LOCAL_WEB_FETCH: # Local web fetch is disabled, filter out any URLs that resolve to private IP addresses parsed_url = urllib.parse.urlparse(url) # Get IPv4 and IPv6 addresses ipv4_addresses, ipv6_addresses = resolve_hostname(parsed_url.hostname) # Check if any of the resolved addresses are private # This is technically still vulnerable to DNS rebinding attacks, as we don't control WebBaseLoader for ip in ipv4_addresses: if validators.ipv4(ip, private=True): raise ValueError(ERROR_MESSAGES.INVALID_URL) for ip in ipv6_addresses: if validators.ipv6(ip, private=True): raise ValueError(ERROR_MESSAGES.INVALID_URL) return True elif isinstance(url, Sequence): return all(validate_url(u) for u in url) else: return False def safe_validate_urls(url: Sequence[str]) -> Sequence[str]: valid_urls = [] for u in url: try: if validate_url(u): valid_urls.append(u) except ValueError: continue return valid_urls def resolve_hostname(hostname): # Get address information addr_info = socket.getaddrinfo(hostname, None) # Extract IP addresses from address information ipv4_addresses = [info[4][0] for info in addr_info if info[0] == socket.AF_INET] ipv6_addresses = [info[4][0] for info in addr_info if info[0] == socket.AF_INET6] return ipv4_addresses, ipv6_addresses def extract_metadata(soup, url): metadata = {"source": url} if title := soup.find("title"): metadata["title"] = title.get_text() if description := soup.find("meta", attrs={"name": "description"}): metadata["description"] = description.get("content", "No description found.") if html := soup.find("html"): metadata["language"] = html.get("lang", "No language found.") return metadata def verify_ssl_cert(url: str) -> bool: """Verify SSL certificate for the given URL.""" if not url.startswith("https://"): return True try: hostname = url.split("://")[-1].split("/")[0] context = ssl.create_default_context(cafile=certifi.where()) with context.wrap_socket(ssl.socket(), server_hostname=hostname) as s: s.connect((hostname, 443)) return True except ssl.SSLError: return False except Exception as e: log.warning(f"SSL verification failed for {url}: {str(e)}") return False class SafeFireCrawlLoader(BaseLoader): def __init__( self, web_paths, verify_ssl: bool = True, trust_env: bool = False, requests_per_second: Optional[float] = None, continue_on_failure: bool = True, api_key: Optional[str] = None, api_url: Optional[str] = None, mode: Literal["crawl", "scrape", "map"] = "crawl", proxy: Optional[Dict[str, str]] = None, params: Optional[Dict] = None, ): """Concurrent document loader for FireCrawl operations. Executes multiple FireCrawlLoader instances concurrently using thread pooling to improve bulk processing efficiency. Args: web_paths: List of URLs/paths to process. verify_ssl: If True, verify SSL certificates. trust_env: If True, use proxy settings from environment variables. requests_per_second: Number of requests per second to limit to. continue_on_failure (bool): If True, continue loading other URLs on failure. api_key: API key for FireCrawl service. Defaults to None (uses FIRE_CRAWL_API_KEY environment variable if not provided). api_url: Base URL for FireCrawl API. Defaults to official API endpoint. mode: Operation mode selection: - 'crawl': Website crawling mode (default) - 'scrape': Direct page scraping - 'map': Site map generation proxy: Proxy override settings for the FireCrawl API. params: The parameters to pass to the Firecrawl API. Examples include crawlerOptions. For more details, visit: https://github.com/mendableai/firecrawl-py """ proxy_server = proxy.get("server") if proxy else None if trust_env and not proxy_server: env_proxies = urllib.request.getproxies() env_proxy_server = env_proxies.get("https") or env_proxies.get("http") if env_proxy_server: if proxy: proxy["server"] = env_proxy_server else: proxy = {"server": env_proxy_server} self.web_paths = web_paths self.verify_ssl = verify_ssl self.requests_per_second = requests_per_second self.last_request_time = None self.trust_env = trust_env self.continue_on_failure = continue_on_failure self.api_key = api_key self.api_url = api_url self.mode = mode self.params = params def lazy_load(self) -> Iterator[Document]: """Load documents concurrently using FireCrawl.""" for url in self.web_paths: try: self._safe_process_url_sync(url) loader = FireCrawlLoader( url=url, api_key=self.api_key, api_url=self.api_url, mode=self.mode, params=self.params, ) yield from loader.lazy_load() except Exception as e: if self.continue_on_failure: log.exception(e, "Error loading %s", url) continue raise e async def alazy_load(self): """Async version of lazy_load.""" for url in self.web_paths: try: await self._safe_process_url(url) loader = FireCrawlLoader( url=url, api_key=self.api_key, api_url=self.api_url, mode=self.mode, params=self.params, ) async for document in loader.alazy_load(): yield document except Exception as e: if self.continue_on_failure: log.exception(e, "Error loading %s", url) continue raise e def _verify_ssl_cert(self, url: str) -> bool: return verify_ssl_cert(url) async def _wait_for_rate_limit(self): """Wait to respect the rate limit if specified.""" if self.requests_per_second and self.last_request_time: min_interval = timedelta(seconds=1.0 / self.requests_per_second) time_since_last = datetime.now() - self.last_request_time if time_since_last < min_interval: await asyncio.sleep((min_interval - time_since_last).total_seconds()) self.last_request_time = datetime.now() def _sync_wait_for_rate_limit(self): """Synchronous version of rate limit wait.""" if self.requests_per_second and self.last_request_time: min_interval = timedelta(seconds=1.0 / self.requests_per_second) time_since_last = datetime.now() - self.last_request_time if time_since_last < min_interval: time.sleep((min_interval - time_since_last).total_seconds()) self.last_request_time = datetime.now() async def _safe_process_url(self, url: str) -> bool: """Perform safety checks before processing a URL.""" if self.verify_ssl and not self._verify_ssl_cert(url): raise ValueError(f"SSL certificate verification failed for {url}") await self._wait_for_rate_limit() return True def _safe_process_url_sync(self, url: str) -> bool: """Synchronous version of safety checks.""" if self.verify_ssl and not self._verify_ssl_cert(url): raise ValueError(f"SSL certificate verification failed for {url}") self._sync_wait_for_rate_limit() return True class SafePlaywrightURLLoader(PlaywrightURLLoader): """Load HTML pages safely with Playwright, supporting SSL verification, rate limiting, and remote browser connection. Attributes: web_paths (List[str]): List of URLs to load. verify_ssl (bool): If True, verify SSL certificates. trust_env (bool): If True, use proxy settings from environment variables. requests_per_second (Optional[float]): Number of requests per second to limit to. continue_on_failure (bool): If True, continue loading other URLs on failure. headless (bool): If True, the browser will run in headless mode. proxy (dict): Proxy override settings for the Playwright session. playwright_ws_url (Optional[str]): WebSocket endpoint URI for remote browser connection. """ def __init__( self, web_paths: List[str], verify_ssl: bool = True, trust_env: bool = False, requests_per_second: Optional[float] = None, continue_on_failure: bool = True, headless: bool = True, remove_selectors: Optional[List[str]] = None, proxy: Optional[Dict[str, str]] = None, playwright_ws_url: Optional[str] = None, ): """Initialize with additional safety parameters and remote browser support.""" proxy_server = proxy.get("server") if proxy else None if trust_env and not proxy_server: env_proxies = urllib.request.getproxies() env_proxy_server = env_proxies.get("https") or env_proxies.get("http") if env_proxy_server: if proxy: proxy["server"] = env_proxy_server else: proxy = {"server": env_proxy_server} # We'll set headless to False if using playwright_ws_url since it's handled by the remote browser super().__init__( urls=web_paths, continue_on_failure=continue_on_failure, headless=headless if playwright_ws_url is None else False, remove_selectors=remove_selectors, proxy=proxy, ) self.verify_ssl = verify_ssl self.requests_per_second = requests_per_second self.last_request_time = None self.playwright_ws_url = playwright_ws_url self.trust_env = trust_env def lazy_load(self) -> Iterator[Document]: """Safely load URLs synchronously with support for remote browser.""" from playwright.sync_api import sync_playwright with sync_playwright() as p: # Use remote browser if ws_endpoint is provided, otherwise use local browser if self.playwright_ws_url: browser = p.chromium.connect(self.playwright_ws_url) else: browser = p.chromium.launch(headless=self.headless, proxy=self.proxy) for url in self.urls: try: self._safe_process_url_sync(url) page = browser.new_page() response = page.goto(url) if response is None: raise ValueError(f"page.goto() returned None for url {url}") text = self.evaluator.evaluate(page, browser, response) metadata = {"source": url} yield Document(page_content=text, metadata=metadata) except Exception as e: if self.continue_on_failure: log.exception(e, "Error loading %s", url) continue raise e browser.close() async def alazy_load(self) -> AsyncIterator[Document]: """Safely load URLs asynchronously with support for remote browser.""" from playwright.async_api import async_playwright async with async_playwright() as p: # Use remote browser if ws_endpoint is provided, otherwise use local browser if self.playwright_ws_url: browser = await p.chromium.connect(self.playwright_ws_url) else: browser = await p.chromium.launch( headless=self.headless, proxy=self.proxy ) for url in self.urls: try: await self._safe_process_url(url) page = await browser.new_page() response = await page.goto(url) if response is None: raise ValueError(f"page.goto() returned None for url {url}") text = await self.evaluator.evaluate_async(page, browser, response) metadata = {"source": url} yield Document(page_content=text, metadata=metadata) except Exception as e: if self.continue_on_failure: log.exception(e, "Error loading %s", url) continue raise e await browser.close() def _verify_ssl_cert(self, url: str) -> bool: return verify_ssl_cert(url) async def _wait_for_rate_limit(self): """Wait to respect the rate limit if specified.""" if self.requests_per_second and self.last_request_time: min_interval = timedelta(seconds=1.0 / self.requests_per_second) time_since_last = datetime.now() - self.last_request_time if time_since_last < min_interval: await asyncio.sleep((min_interval - time_since_last).total_seconds()) self.last_request_time = datetime.now() def _sync_wait_for_rate_limit(self): """Synchronous version of rate limit wait.""" if self.requests_per_second and self.last_request_time: min_interval = timedelta(seconds=1.0 / self.requests_per_second) time_since_last = datetime.now() - self.last_request_time if time_since_last < min_interval: time.sleep((min_interval - time_since_last).total_seconds()) self.last_request_time = datetime.now() async def _safe_process_url(self, url: str) -> bool: """Perform safety checks before processing a URL.""" if self.verify_ssl and not self._verify_ssl_cert(url): raise ValueError(f"SSL certificate verification failed for {url}") await self._wait_for_rate_limit() return True def _safe_process_url_sync(self, url: str) -> bool: """Synchronous version of safety checks.""" if self.verify_ssl and not self._verify_ssl_cert(url): raise ValueError(f"SSL certificate verification failed for {url}") self._sync_wait_for_rate_limit() return True class SafeWebBaseLoader(WebBaseLoader): """WebBaseLoader with enhanced error handling for URLs.""" def __init__(self, trust_env: bool = False, *args, **kwargs): """Initialize SafeWebBaseLoader Args: trust_env (bool, optional): set to True if using proxy to make web requests, for example using http(s)_proxy environment variables. Defaults to False. """ super().__init__(*args, **kwargs) self.trust_env = trust_env async def _fetch( self, url: str, retries: int = 3, cooldown: int = 2, backoff: float = 1.5 ) -> str: async with aiohttp.ClientSession(trust_env=self.trust_env) as session: for i in range(retries): try: kwargs: Dict = dict( headers=self.session.headers, cookies=self.session.cookies.get_dict(), ) if not self.session.verify: kwargs["ssl"] = False async with session.get( url, **(self.requests_kwargs | kwargs) ) as response: if self.raise_for_status: response.raise_for_status() return await response.text() except aiohttp.ClientConnectionError as e: if i == retries - 1: raise else: log.warning( f"Error fetching {url} with attempt " f"{i + 1}/{retries}: {e}. Retrying..." ) await asyncio.sleep(cooldown * backoff**i) raise ValueError("retry count exceeded") def _unpack_fetch_results( self, results: Any, urls: List[str], parser: Union[str, None] = None ) -> List[Any]: """Unpack fetch results into BeautifulSoup objects.""" from bs4 import BeautifulSoup final_results = [] for i, result in enumerate(results): url = urls[i] if parser is None: if url.endswith(".xml"): parser = "xml" else: parser = self.default_parser self._check_parser(parser) final_results.append(BeautifulSoup(result, parser, **self.bs_kwargs)) return final_results async def ascrape_all( self, urls: List[str], parser: Union[str, None] = None ) -> List[Any]: """Async fetch all urls, then return soups for all results.""" results = await self.fetch_all(urls) return self._unpack_fetch_results(results, urls, parser=parser) def lazy_load(self) -> Iterator[Document]: """Lazy load text from the url(s) in web_path with error handling.""" for path in self.web_paths: try: soup = self._scrape(path, bs_kwargs=self.bs_kwargs) text = soup.get_text(**self.bs_get_text_kwargs) # Build metadata metadata = extract_metadata(soup, path) yield Document(page_content=text, metadata=metadata) except Exception as e: # Log the error and continue with the next URL log.exception(e, "Error loading %s", path) async def alazy_load(self) -> AsyncIterator[Document]: """Async lazy load text from the url(s) in web_path.""" results = await self.ascrape_all(self.web_paths) for path, soup in zip(self.web_paths, results): text = soup.get_text(**self.bs_get_text_kwargs) metadata = {"source": path} if title := soup.find("title"): metadata["title"] = title.get_text() if description := soup.find("meta", attrs={"name": "description"}): metadata["description"] = description.get( "content", "No description found." ) if html := soup.find("html"): metadata["language"] = html.get("lang", "No language found.") yield Document(page_content=text, metadata=metadata) async def aload(self) -> list[Document]: """Load data into Document objects.""" return [document async for document in self.alazy_load()] RAG_WEB_LOADER_ENGINES = defaultdict(lambda: SafeWebBaseLoader) RAG_WEB_LOADER_ENGINES["playwright"] = SafePlaywrightURLLoader RAG_WEB_LOADER_ENGINES["safe_web"] = SafeWebBaseLoader RAG_WEB_LOADER_ENGINES["firecrawl"] = SafeFireCrawlLoader def get_web_loader( urls: Union[str, Sequence[str]], verify_ssl: bool = True, requests_per_second: int = 2, trust_env: bool = False, ): # Check if the URLs are valid safe_urls = safe_validate_urls([urls] if isinstance(urls, str) else urls) web_loader_args = { "web_paths": safe_urls, "verify_ssl": verify_ssl, "requests_per_second": requests_per_second, "continue_on_failure": True, "trust_env": trust_env, } if PLAYWRIGHT_WS_URI.value: web_loader_args["playwright_ws_url"] = PLAYWRIGHT_WS_URI.value if RAG_WEB_LOADER_ENGINE.value == "firecrawl": web_loader_args["api_key"] = FIRECRAWL_API_KEY.value web_loader_args["api_url"] = FIRECRAWL_API_BASE_URL.value # Create the appropriate WebLoader based on the configuration WebLoaderClass = RAG_WEB_LOADER_ENGINES[RAG_WEB_LOADER_ENGINE.value] web_loader = WebLoaderClass(**web_loader_args) log.debug( "Using RAG_WEB_LOADER_ENGINE %s for %s URLs", web_loader.__class__.__name__, len(safe_urls), ) return web_loader