Spaces:
Running
Running
import asyncio | |
import logging | |
import socket | |
import ssl | |
import urllib.parse | |
import urllib.request | |
from collections import defaultdict | |
from datetime import datetime, time, timedelta | |
from typing import ( | |
Any, | |
AsyncIterator, | |
Dict, | |
Iterator, | |
List, | |
Optional, | |
Sequence, | |
Union, | |
Literal, | |
) | |
import aiohttp | |
import certifi | |
import validators | |
from langchain_community.document_loaders import PlaywrightURLLoader, WebBaseLoader | |
from langchain_community.document_loaders.firecrawl import FireCrawlLoader | |
from langchain_community.document_loaders.base import BaseLoader | |
from langchain_core.documents import Document | |
from open_webui.constants import ERROR_MESSAGES | |
from open_webui.config import ( | |
ENABLE_RAG_LOCAL_WEB_FETCH, | |
PLAYWRIGHT_WS_URI, | |
RAG_WEB_LOADER_ENGINE, | |
FIRECRAWL_API_BASE_URL, | |
FIRECRAWL_API_KEY, | |
) | |
from open_webui.env import SRC_LOG_LEVELS | |
log = logging.getLogger(__name__) | |
log.setLevel(SRC_LOG_LEVELS["RAG"]) | |
def validate_url(url: Union[str, Sequence[str]]): | |
if isinstance(url, str): | |
if isinstance(validators.url(url), validators.ValidationError): | |
raise ValueError(ERROR_MESSAGES.INVALID_URL) | |
if not ENABLE_RAG_LOCAL_WEB_FETCH: | |
# Local web fetch is disabled, filter out any URLs that resolve to private IP addresses | |
parsed_url = urllib.parse.urlparse(url) | |
# Get IPv4 and IPv6 addresses | |
ipv4_addresses, ipv6_addresses = resolve_hostname(parsed_url.hostname) | |
# Check if any of the resolved addresses are private | |
# This is technically still vulnerable to DNS rebinding attacks, as we don't control WebBaseLoader | |
for ip in ipv4_addresses: | |
if validators.ipv4(ip, private=True): | |
raise ValueError(ERROR_MESSAGES.INVALID_URL) | |
for ip in ipv6_addresses: | |
if validators.ipv6(ip, private=True): | |
raise ValueError(ERROR_MESSAGES.INVALID_URL) | |
return True | |
elif isinstance(url, Sequence): | |
return all(validate_url(u) for u in url) | |
else: | |
return False | |
def safe_validate_urls(url: Sequence[str]) -> Sequence[str]: | |
valid_urls = [] | |
for u in url: | |
try: | |
if validate_url(u): | |
valid_urls.append(u) | |
except ValueError: | |
continue | |
return valid_urls | |
def resolve_hostname(hostname): | |
# Get address information | |
addr_info = socket.getaddrinfo(hostname, None) | |
# Extract IP addresses from address information | |
ipv4_addresses = [info[4][0] for info in addr_info if info[0] == socket.AF_INET] | |
ipv6_addresses = [info[4][0] for info in addr_info if info[0] == socket.AF_INET6] | |
return ipv4_addresses, ipv6_addresses | |
def extract_metadata(soup, url): | |
metadata = {"source": url} | |
if title := soup.find("title"): | |
metadata["title"] = title.get_text() | |
if description := soup.find("meta", attrs={"name": "description"}): | |
metadata["description"] = description.get("content", "No description found.") | |
if html := soup.find("html"): | |
metadata["language"] = html.get("lang", "No language found.") | |
return metadata | |
def verify_ssl_cert(url: str) -> bool: | |
"""Verify SSL certificate for the given URL.""" | |
if not url.startswith("https://"): | |
return True | |
try: | |
hostname = url.split("://")[-1].split("/")[0] | |
context = ssl.create_default_context(cafile=certifi.where()) | |
with context.wrap_socket(ssl.socket(), server_hostname=hostname) as s: | |
s.connect((hostname, 443)) | |
return True | |
except ssl.SSLError: | |
return False | |
except Exception as e: | |
log.warning(f"SSL verification failed for {url}: {str(e)}") | |
return False | |
class SafeFireCrawlLoader(BaseLoader): | |
def __init__( | |
self, | |
web_paths, | |
verify_ssl: bool = True, | |
trust_env: bool = False, | |
requests_per_second: Optional[float] = None, | |
continue_on_failure: bool = True, | |
api_key: Optional[str] = None, | |
api_url: Optional[str] = None, | |
mode: Literal["crawl", "scrape", "map"] = "crawl", | |
proxy: Optional[Dict[str, str]] = None, | |
params: Optional[Dict] = None, | |
): | |
"""Concurrent document loader for FireCrawl operations. | |
Executes multiple FireCrawlLoader instances concurrently using thread pooling | |
to improve bulk processing efficiency. | |
Args: | |
web_paths: List of URLs/paths to process. | |
verify_ssl: If True, verify SSL certificates. | |
trust_env: If True, use proxy settings from environment variables. | |
requests_per_second: Number of requests per second to limit to. | |
continue_on_failure (bool): If True, continue loading other URLs on failure. | |
api_key: API key for FireCrawl service. Defaults to None | |
(uses FIRE_CRAWL_API_KEY environment variable if not provided). | |
api_url: Base URL for FireCrawl API. Defaults to official API endpoint. | |
mode: Operation mode selection: | |
- 'crawl': Website crawling mode (default) | |
- 'scrape': Direct page scraping | |
- 'map': Site map generation | |
proxy: Proxy override settings for the FireCrawl API. | |
params: The parameters to pass to the Firecrawl API. | |
Examples include crawlerOptions. | |
For more details, visit: https://github.com/mendableai/firecrawl-py | |
""" | |
proxy_server = proxy.get("server") if proxy else None | |
if trust_env and not proxy_server: | |
env_proxies = urllib.request.getproxies() | |
env_proxy_server = env_proxies.get("https") or env_proxies.get("http") | |
if env_proxy_server: | |
if proxy: | |
proxy["server"] = env_proxy_server | |
else: | |
proxy = {"server": env_proxy_server} | |
self.web_paths = web_paths | |
self.verify_ssl = verify_ssl | |
self.requests_per_second = requests_per_second | |
self.last_request_time = None | |
self.trust_env = trust_env | |
self.continue_on_failure = continue_on_failure | |
self.api_key = api_key | |
self.api_url = api_url | |
self.mode = mode | |
self.params = params | |
def lazy_load(self) -> Iterator[Document]: | |
"""Load documents concurrently using FireCrawl.""" | |
for url in self.web_paths: | |
try: | |
self._safe_process_url_sync(url) | |
loader = FireCrawlLoader( | |
url=url, | |
api_key=self.api_key, | |
api_url=self.api_url, | |
mode=self.mode, | |
params=self.params, | |
) | |
yield from loader.lazy_load() | |
except Exception as e: | |
if self.continue_on_failure: | |
log.exception(e, "Error loading %s", url) | |
continue | |
raise e | |
async def alazy_load(self): | |
"""Async version of lazy_load.""" | |
for url in self.web_paths: | |
try: | |
await self._safe_process_url(url) | |
loader = FireCrawlLoader( | |
url=url, | |
api_key=self.api_key, | |
api_url=self.api_url, | |
mode=self.mode, | |
params=self.params, | |
) | |
async for document in loader.alazy_load(): | |
yield document | |
except Exception as e: | |
if self.continue_on_failure: | |
log.exception(e, "Error loading %s", url) | |
continue | |
raise e | |
def _verify_ssl_cert(self, url: str) -> bool: | |
return verify_ssl_cert(url) | |
async def _wait_for_rate_limit(self): | |
"""Wait to respect the rate limit if specified.""" | |
if self.requests_per_second and self.last_request_time: | |
min_interval = timedelta(seconds=1.0 / self.requests_per_second) | |
time_since_last = datetime.now() - self.last_request_time | |
if time_since_last < min_interval: | |
await asyncio.sleep((min_interval - time_since_last).total_seconds()) | |
self.last_request_time = datetime.now() | |
def _sync_wait_for_rate_limit(self): | |
"""Synchronous version of rate limit wait.""" | |
if self.requests_per_second and self.last_request_time: | |
min_interval = timedelta(seconds=1.0 / self.requests_per_second) | |
time_since_last = datetime.now() - self.last_request_time | |
if time_since_last < min_interval: | |
time.sleep((min_interval - time_since_last).total_seconds()) | |
self.last_request_time = datetime.now() | |
async def _safe_process_url(self, url: str) -> bool: | |
"""Perform safety checks before processing a URL.""" | |
if self.verify_ssl and not self._verify_ssl_cert(url): | |
raise ValueError(f"SSL certificate verification failed for {url}") | |
await self._wait_for_rate_limit() | |
return True | |
def _safe_process_url_sync(self, url: str) -> bool: | |
"""Synchronous version of safety checks.""" | |
if self.verify_ssl and not self._verify_ssl_cert(url): | |
raise ValueError(f"SSL certificate verification failed for {url}") | |
self._sync_wait_for_rate_limit() | |
return True | |
class SafePlaywrightURLLoader(PlaywrightURLLoader): | |
"""Load HTML pages safely with Playwright, supporting SSL verification, rate limiting, and remote browser connection. | |
Attributes: | |
web_paths (List[str]): List of URLs to load. | |
verify_ssl (bool): If True, verify SSL certificates. | |
trust_env (bool): If True, use proxy settings from environment variables. | |
requests_per_second (Optional[float]): Number of requests per second to limit to. | |
continue_on_failure (bool): If True, continue loading other URLs on failure. | |
headless (bool): If True, the browser will run in headless mode. | |
proxy (dict): Proxy override settings for the Playwright session. | |
playwright_ws_url (Optional[str]): WebSocket endpoint URI for remote browser connection. | |
""" | |
def __init__( | |
self, | |
web_paths: List[str], | |
verify_ssl: bool = True, | |
trust_env: bool = False, | |
requests_per_second: Optional[float] = None, | |
continue_on_failure: bool = True, | |
headless: bool = True, | |
remove_selectors: Optional[List[str]] = None, | |
proxy: Optional[Dict[str, str]] = None, | |
playwright_ws_url: Optional[str] = None, | |
): | |
"""Initialize with additional safety parameters and remote browser support.""" | |
proxy_server = proxy.get("server") if proxy else None | |
if trust_env and not proxy_server: | |
env_proxies = urllib.request.getproxies() | |
env_proxy_server = env_proxies.get("https") or env_proxies.get("http") | |
if env_proxy_server: | |
if proxy: | |
proxy["server"] = env_proxy_server | |
else: | |
proxy = {"server": env_proxy_server} | |
# We'll set headless to False if using playwright_ws_url since it's handled by the remote browser | |
super().__init__( | |
urls=web_paths, | |
continue_on_failure=continue_on_failure, | |
headless=headless if playwright_ws_url is None else False, | |
remove_selectors=remove_selectors, | |
proxy=proxy, | |
) | |
self.verify_ssl = verify_ssl | |
self.requests_per_second = requests_per_second | |
self.last_request_time = None | |
self.playwright_ws_url = playwright_ws_url | |
self.trust_env = trust_env | |
def lazy_load(self) -> Iterator[Document]: | |
"""Safely load URLs synchronously with support for remote browser.""" | |
from playwright.sync_api import sync_playwright | |
with sync_playwright() as p: | |
# Use remote browser if ws_endpoint is provided, otherwise use local browser | |
if self.playwright_ws_url: | |
browser = p.chromium.connect(self.playwright_ws_url) | |
else: | |
browser = p.chromium.launch(headless=self.headless, proxy=self.proxy) | |
for url in self.urls: | |
try: | |
self._safe_process_url_sync(url) | |
page = browser.new_page() | |
response = page.goto(url) | |
if response is None: | |
raise ValueError(f"page.goto() returned None for url {url}") | |
text = self.evaluator.evaluate(page, browser, response) | |
metadata = {"source": url} | |
yield Document(page_content=text, metadata=metadata) | |
except Exception as e: | |
if self.continue_on_failure: | |
log.exception(e, "Error loading %s", url) | |
continue | |
raise e | |
browser.close() | |
async def alazy_load(self) -> AsyncIterator[Document]: | |
"""Safely load URLs asynchronously with support for remote browser.""" | |
from playwright.async_api import async_playwright | |
async with async_playwright() as p: | |
# Use remote browser if ws_endpoint is provided, otherwise use local browser | |
if self.playwright_ws_url: | |
browser = await p.chromium.connect(self.playwright_ws_url) | |
else: | |
browser = await p.chromium.launch( | |
headless=self.headless, proxy=self.proxy | |
) | |
for url in self.urls: | |
try: | |
await self._safe_process_url(url) | |
page = await browser.new_page() | |
response = await page.goto(url) | |
if response is None: | |
raise ValueError(f"page.goto() returned None for url {url}") | |
text = await self.evaluator.evaluate_async(page, browser, response) | |
metadata = {"source": url} | |
yield Document(page_content=text, metadata=metadata) | |
except Exception as e: | |
if self.continue_on_failure: | |
log.exception(e, "Error loading %s", url) | |
continue | |
raise e | |
await browser.close() | |
def _verify_ssl_cert(self, url: str) -> bool: | |
return verify_ssl_cert(url) | |
async def _wait_for_rate_limit(self): | |
"""Wait to respect the rate limit if specified.""" | |
if self.requests_per_second and self.last_request_time: | |
min_interval = timedelta(seconds=1.0 / self.requests_per_second) | |
time_since_last = datetime.now() - self.last_request_time | |
if time_since_last < min_interval: | |
await asyncio.sleep((min_interval - time_since_last).total_seconds()) | |
self.last_request_time = datetime.now() | |
def _sync_wait_for_rate_limit(self): | |
"""Synchronous version of rate limit wait.""" | |
if self.requests_per_second and self.last_request_time: | |
min_interval = timedelta(seconds=1.0 / self.requests_per_second) | |
time_since_last = datetime.now() - self.last_request_time | |
if time_since_last < min_interval: | |
time.sleep((min_interval - time_since_last).total_seconds()) | |
self.last_request_time = datetime.now() | |
async def _safe_process_url(self, url: str) -> bool: | |
"""Perform safety checks before processing a URL.""" | |
if self.verify_ssl and not self._verify_ssl_cert(url): | |
raise ValueError(f"SSL certificate verification failed for {url}") | |
await self._wait_for_rate_limit() | |
return True | |
def _safe_process_url_sync(self, url: str) -> bool: | |
"""Synchronous version of safety checks.""" | |
if self.verify_ssl and not self._verify_ssl_cert(url): | |
raise ValueError(f"SSL certificate verification failed for {url}") | |
self._sync_wait_for_rate_limit() | |
return True | |
class SafeWebBaseLoader(WebBaseLoader): | |
"""WebBaseLoader with enhanced error handling for URLs.""" | |
def __init__(self, trust_env: bool = False, *args, **kwargs): | |
"""Initialize SafeWebBaseLoader | |
Args: | |
trust_env (bool, optional): set to True if using proxy to make web requests, for example | |
using http(s)_proxy environment variables. Defaults to False. | |
""" | |
super().__init__(*args, **kwargs) | |
self.trust_env = trust_env | |
async def _fetch( | |
self, url: str, retries: int = 3, cooldown: int = 2, backoff: float = 1.5 | |
) -> str: | |
async with aiohttp.ClientSession(trust_env=self.trust_env) as session: | |
for i in range(retries): | |
try: | |
kwargs: Dict = dict( | |
headers=self.session.headers, | |
cookies=self.session.cookies.get_dict(), | |
) | |
if not self.session.verify: | |
kwargs["ssl"] = False | |
async with session.get( | |
url, **(self.requests_kwargs | kwargs) | |
) as response: | |
if self.raise_for_status: | |
response.raise_for_status() | |
return await response.text() | |
except aiohttp.ClientConnectionError as e: | |
if i == retries - 1: | |
raise | |
else: | |
log.warning( | |
f"Error fetching {url} with attempt " | |
f"{i + 1}/{retries}: {e}. Retrying..." | |
) | |
await asyncio.sleep(cooldown * backoff**i) | |
raise ValueError("retry count exceeded") | |
def _unpack_fetch_results( | |
self, results: Any, urls: List[str], parser: Union[str, None] = None | |
) -> List[Any]: | |
"""Unpack fetch results into BeautifulSoup objects.""" | |
from bs4 import BeautifulSoup | |
final_results = [] | |
for i, result in enumerate(results): | |
url = urls[i] | |
if parser is None: | |
if url.endswith(".xml"): | |
parser = "xml" | |
else: | |
parser = self.default_parser | |
self._check_parser(parser) | |
final_results.append(BeautifulSoup(result, parser, **self.bs_kwargs)) | |
return final_results | |
async def ascrape_all( | |
self, urls: List[str], parser: Union[str, None] = None | |
) -> List[Any]: | |
"""Async fetch all urls, then return soups for all results.""" | |
results = await self.fetch_all(urls) | |
return self._unpack_fetch_results(results, urls, parser=parser) | |
def lazy_load(self) -> Iterator[Document]: | |
"""Lazy load text from the url(s) in web_path with error handling.""" | |
for path in self.web_paths: | |
try: | |
soup = self._scrape(path, bs_kwargs=self.bs_kwargs) | |
text = soup.get_text(**self.bs_get_text_kwargs) | |
# Build metadata | |
metadata = extract_metadata(soup, path) | |
yield Document(page_content=text, metadata=metadata) | |
except Exception as e: | |
# Log the error and continue with the next URL | |
log.exception(e, "Error loading %s", path) | |
async def alazy_load(self) -> AsyncIterator[Document]: | |
"""Async lazy load text from the url(s) in web_path.""" | |
results = await self.ascrape_all(self.web_paths) | |
for path, soup in zip(self.web_paths, results): | |
text = soup.get_text(**self.bs_get_text_kwargs) | |
metadata = {"source": path} | |
if title := soup.find("title"): | |
metadata["title"] = title.get_text() | |
if description := soup.find("meta", attrs={"name": "description"}): | |
metadata["description"] = description.get( | |
"content", "No description found." | |
) | |
if html := soup.find("html"): | |
metadata["language"] = html.get("lang", "No language found.") | |
yield Document(page_content=text, metadata=metadata) | |
async def aload(self) -> list[Document]: | |
"""Load data into Document objects.""" | |
return [document async for document in self.alazy_load()] | |
RAG_WEB_LOADER_ENGINES = defaultdict(lambda: SafeWebBaseLoader) | |
RAG_WEB_LOADER_ENGINES["playwright"] = SafePlaywrightURLLoader | |
RAG_WEB_LOADER_ENGINES["safe_web"] = SafeWebBaseLoader | |
RAG_WEB_LOADER_ENGINES["firecrawl"] = SafeFireCrawlLoader | |
def get_web_loader( | |
urls: Union[str, Sequence[str]], | |
verify_ssl: bool = True, | |
requests_per_second: int = 2, | |
trust_env: bool = False, | |
): | |
# Check if the URLs are valid | |
safe_urls = safe_validate_urls([urls] if isinstance(urls, str) else urls) | |
web_loader_args = { | |
"web_paths": safe_urls, | |
"verify_ssl": verify_ssl, | |
"requests_per_second": requests_per_second, | |
"continue_on_failure": True, | |
"trust_env": trust_env, | |
} | |
if PLAYWRIGHT_WS_URI.value: | |
web_loader_args["playwright_ws_url"] = PLAYWRIGHT_WS_URI.value | |
if RAG_WEB_LOADER_ENGINE.value == "firecrawl": | |
web_loader_args["api_key"] = FIRECRAWL_API_KEY.value | |
web_loader_args["api_url"] = FIRECRAWL_API_BASE_URL.value | |
# Create the appropriate WebLoader based on the configuration | |
WebLoaderClass = RAG_WEB_LOADER_ENGINES[RAG_WEB_LOADER_ENGINE.value] | |
web_loader = WebLoaderClass(**web_loader_args) | |
log.debug( | |
"Using RAG_WEB_LOADER_ENGINE %s for %s URLs", | |
web_loader.__class__.__name__, | |
len(safe_urls), | |
) | |
return web_loader | |