from fastapi import APIRouter, UploadFile, File, HTTPException import os import shutil import uuid from bs4 import BeautifulSoup from PyPDF2 import PdfReader import requests from fastapi import Form from typing import Optional, List import re from urllib.parse import urlparse import html import validators router = APIRouter(tags=["files"]) # Define file storage by session (imported in main.py) session_files = {} # Root folder for uploads UPLOAD_ROOT = "uploaded_files" os.makedirs(UPLOAD_ROOT, exist_ok=True) # Minimum length for any file (in characters) MIN_FILE_LENGTH = 500 # Configuration des limites de sécurité MAX_CONTENT_SIZE = 5 * 1024 * 1024 # 5 MB max pour le contenu téléchargé REQUEST_TIMEOUT = 10 # Timeout pour les requêtes HTTP # Liste des domaines autorisés (vide = tous autorisés, mais à remplir en production) ALLOWED_DOMAINS: List[str] = [] # Liste d'extensions de fichiers à bloquer dans les URLs BLOCKED_EXTENSIONS = ['.exe', '.sh', '.bat', '.dll', '.jar', '.msi'] def validate_pdf(file_path: str) -> bool: """Validate if file is a valid PDF.""" try: reader = PdfReader(file_path) # Check that the PDF has at least one page if len(reader.pages) == 0: return False # Extract text to check length text = "" for page in reader.pages: text += page.extract_text() return len(text) >= MIN_FILE_LENGTH except: return False def validate_markdown(file_path: str) -> bool: """Validate if file is a valid Markdown file.""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Check minimum length and presence of markdown elements return len(content) >= MIN_FILE_LENGTH and any(marker in content for marker in ['#', '-', '*', '`', '[', '>']) except: return False def validate_html(file_path: str) -> bool: """Validate if file is a valid HTML file.""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Check minimum length and HTML structure if len(content) < MIN_FILE_LENGTH: return False BeautifulSoup(content, 'html.parser') return True except: return False def validate_txt(file_path: str) -> bool: """Validate if file is a valid text file.""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() return len(content.strip()) >= MIN_FILE_LENGTH except: return False # Initialize session files dictionary with pre-calculated documents precalculated_docs = ["the-bitter-lesson", "hurricane-faq", "pokemon-guide"] for doc_id in precalculated_docs: doc_dir = os.path.join(UPLOAD_ROOT, doc_id) if os.path.exists(doc_dir): doc_files_dir = os.path.join(doc_dir, "uploaded_files") if os.path.exists(doc_files_dir): for filename in os.listdir(doc_files_dir): if filename.endswith((".pdf", ".txt", ".html", ".md")): file_path = os.path.join(doc_files_dir, filename) session_files[doc_id] = file_path print(f"Added pre-calculated document to session_files: {doc_id} -> {file_path}") break else: # Search directly in the doc_dir for filename in os.listdir(doc_dir): if filename.endswith((".pdf", ".txt", ".html", ".md")): file_path = os.path.join(doc_dir, filename) session_files[doc_id] = file_path print(f"Added pre-calculated document to session_files: {doc_id} -> {file_path}") break @router.post("/upload") async def upload_file(file: UploadFile = File(...)): """ Upload a file to the server and generate a session ID Args: file: The file to upload Returns: Dictionary with filename, status and session_id """ # Check if the file is a PDF, TXT, HTML or MD if not file.filename.endswith(('.pdf', '.txt', '.html', '.md')): raise HTTPException(status_code=400, detail="Only PDF, TXT, HTML and MD files are accepted") # Get the file extension file_extension = os.path.splitext(file.filename)[1].lower() # Generate a session ID for this file session_id = str(uuid.uuid4()) # Create the session directory structure session_dir = os.path.join(UPLOAD_ROOT, session_id) uploaded_files_dir = os.path.join(session_dir, "uploaded_files") os.makedirs(uploaded_files_dir, exist_ok=True) # Create standardized filename standardized_filename = f"document{file_extension}" # Create the full path to save the file file_path = os.path.join(uploaded_files_dir, standardized_filename) # Save the file with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # Validate the file according to its type is_valid = False error_detail = "" if file_extension == '.pdf': try: reader = PdfReader(file_path) if len(reader.pages) == 0: error_detail = "PDF must contain at least one page" is_valid = False else: text = "" for page in reader.pages: text += page.extract_text() if len(text) < MIN_FILE_LENGTH: error_detail = f"PDF contains {len(text)} characters but must contain at least {MIN_FILE_LENGTH}" is_valid = False else: is_valid = True except: error_detail = "Invalid PDF format" is_valid = False elif file_extension == '.md': try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() if len(content) < MIN_FILE_LENGTH: error_detail = f"Markdown file contains {len(content)} characters but must contain at least {MIN_FILE_LENGTH}" is_valid = False elif not any(marker in content for marker in ['#', '-', '*', '`', '[', '>']): error_detail = "Markdown file does not contain any valid Markdown elements" is_valid = False else: is_valid = True except: error_detail = "Invalid Markdown format" is_valid = False elif file_extension == '.html': try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() if len(content) < MIN_FILE_LENGTH: error_detail = f"HTML file contains {len(content)} characters but must contain at least {MIN_FILE_LENGTH}" is_valid = False else: BeautifulSoup(content, 'html.parser') is_valid = True except: error_detail = "Invalid HTML format" is_valid = False elif file_extension == '.txt': try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() content_length = len(content.strip()) if content_length < MIN_FILE_LENGTH: error_detail = f"Text file contains {content_length} characters but must contain at least {MIN_FILE_LENGTH}" is_valid = False else: is_valid = True except: error_detail = "Invalid text format" is_valid = False if not is_valid: # Delete the invalid file os.remove(file_path) raise HTTPException(status_code=400, detail=error_detail or f"Invalid {file_extension[1:].upper()} file") # Store file path for later use session_files[session_id] = file_path return {"filename": standardized_filename, "status": "uploaded", "session_id": session_id} @router.post("/upload-url") async def upload_url(url: str = Form(...)): """ Upload content from a URL, extract text and store it as a document Args: url: The URL to download content from Returns: Dictionary with status and session_id """ try: # Valider que l'URL est bien formée if not validators.url(url): raise HTTPException(status_code=400, detail="Invalid URL format") # Vérifier si l'URL a une extension bloquée parsed_url = urlparse(url) path = parsed_url.path.lower() if any(path.endswith(ext) for ext in BLOCKED_EXTENSIONS): raise HTTPException(status_code=400, detail="This file type is not allowed") # Vérifier si le domaine est autorisé (si la liste n'est pas vide) domain = parsed_url.netloc if ALLOWED_DOMAINS and domain not in ALLOWED_DOMAINS: raise HTTPException(status_code=403, detail="This domain is not in the allowed list") # Retrieve the content from the URL with proper headers to mimic a browser headers = { 'User-Agent': 'Mozilla/5.0 (compatible; YourBenchBot/1.0; +https://yourbench.example.com)', 'Accept': 'text/html,application/xhtml+xml', 'Accept-Language': 'en-US,en;q=0.5', } response = requests.get( url, timeout=REQUEST_TIMEOUT, headers=headers, stream=True # Pour vérifier la taille avant de télécharger tout le contenu ) response.raise_for_status() # Vérifier le Content-Type content_type = response.headers.get('Content-Type', '') if not content_type.startswith(('text/html', 'text/plain', 'application/xhtml+xml')): raise HTTPException( status_code=400, detail=f"Unsupported content type: {content_type}. Only HTML and text formats are supported." ) # Vérifier la taille du contenu content_length = int(response.headers.get('Content-Length', 0)) if content_length > MAX_CONTENT_SIZE: raise HTTPException( status_code=400, detail=f"Content too large ({content_length} bytes). Maximum size: {MAX_CONTENT_SIZE} bytes." ) # Lire le contenu avec une limite de taille content = "" bytes_read = 0 for chunk in response.iter_content(chunk_size=8192, decode_unicode=True): bytes_read += len(chunk.encode('utf-8') if isinstance(chunk, str) else chunk) if bytes_read > MAX_CONTENT_SIZE: raise HTTPException( status_code=400, detail=f"Content exceeded maximum allowed size of {MAX_CONTENT_SIZE} bytes" ) content += chunk if isinstance(chunk, str) else chunk.decode('utf-8', errors='replace') # Extract text from HTML with BeautifulSoup using the lxml parser for better security soup = BeautifulSoup(content, 'html.parser') # Remove potentially dangerous elements for element in soup(['script', 'style', 'iframe', 'object', 'embed', 'noscript']): element.extract() # Remove on* attributes (event handlers) from all tags for tag in soup.find_all(True): for attr in list(tag.attrs): if attr.startswith('on'): del tag[attr] # Extract the text text = soup.get_text() # Clean the text (remove multiple spaces and empty lines) lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) text = '\n'.join(chunk for chunk in chunks if chunk) # Sanitize the text to prevent any potential stored XSS text = html.escape(text) # Limit to 25000 characters if necessary if len(text) > 25000: text = text[:25000] # Check if the text is long enough if len(text.strip()) < MIN_FILE_LENGTH: raise HTTPException( status_code=400, detail=f"The content is too short ({len(text.strip())} characters). Minimum required: {MIN_FILE_LENGTH} characters." ) # Generate a session ID session_id = str(uuid.uuid4()) # Create the directory structure for the session session_dir = os.path.join(UPLOAD_ROOT, session_id) uploaded_files_dir = os.path.join(session_dir, "uploaded_files") os.makedirs(uploaded_files_dir, exist_ok=True) # Path of the file to save file_path = os.path.join(uploaded_files_dir, "document.txt") # Save the text with open(file_path, "w", encoding="utf-8") as f: f.write(text) # Store the file path for later use session_files[session_id] = file_path return { "status": "uploaded", "session_id": session_id, "filename": "document.txt", "text_length": len(text), "source_url": url } except requests.exceptions.RequestException as e: raise HTTPException(status_code=400, detail=f"Error retrieving the URL: {str(e)}") except Exception as e: raise HTTPException(status_code=500, detail=f"Error processing the URL: {str(e)}")