from fastapi import APIRouter, UploadFile, File, HTTPException import os import shutil import uuid from bs4 import BeautifulSoup from PyPDF2 import PdfReader import requests from fastapi import Form from typing import Optional router = APIRouter(tags=["files"]) # Define file storage by session (imported in main.py) session_files = {} # Root folder for uploads UPLOAD_ROOT = "uploaded_files" os.makedirs(UPLOAD_ROOT, exist_ok=True) # Minimum length for any file (in characters) MIN_FILE_LENGTH = 500 def validate_pdf(file_path: str) -> bool: """Validate if file is a valid PDF.""" try: reader = PdfReader(file_path) # Check that the PDF has at least one page if len(reader.pages) == 0: return False # Extract text to check length text = "" for page in reader.pages: text += page.extract_text() return len(text) >= MIN_FILE_LENGTH except: return False def validate_markdown(file_path: str) -> bool: """Validate if file is a valid Markdown file.""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Check minimum length and presence of markdown elements return len(content) >= MIN_FILE_LENGTH and any(marker in content for marker in ['#', '-', '*', '`', '[', '>']) except: return False def validate_html(file_path: str) -> bool: """Validate if file is a valid HTML file.""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Check minimum length and HTML structure if len(content) < MIN_FILE_LENGTH: return False BeautifulSoup(content, 'html.parser') return True except: return False def validate_txt(file_path: str) -> bool: """Validate if file is a valid text file.""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() return len(content.strip()) >= MIN_FILE_LENGTH except: return False # Initialize session files dictionary with pre-calculated documents precalculated_docs = ["the-bitter-lesson", "hurricane-faq", "pokemon-guide"] for doc_id in precalculated_docs: doc_dir = os.path.join(UPLOAD_ROOT, doc_id) if os.path.exists(doc_dir): doc_files_dir = os.path.join(doc_dir, "uploaded_files") if os.path.exists(doc_files_dir): for filename in os.listdir(doc_files_dir): if filename.endswith((".pdf", ".txt", ".html", ".md")): file_path = os.path.join(doc_files_dir, filename) session_files[doc_id] = file_path print(f"Added pre-calculated document to session_files: {doc_id} -> {file_path}") break else: # Search directly in the doc_dir for filename in os.listdir(doc_dir): if filename.endswith((".pdf", ".txt", ".html", ".md")): file_path = os.path.join(doc_dir, filename) session_files[doc_id] = file_path print(f"Added pre-calculated document to session_files: {doc_id} -> {file_path}") break @router.post("/upload") async def upload_file(file: UploadFile = File(...)): """ Upload a file to the server and generate a session ID Args: file: The file to upload Returns: Dictionary with filename, status and session_id """ # Check if the file is a PDF, TXT, HTML or MD if not file.filename.endswith(('.pdf', '.txt', '.html', '.md')): raise HTTPException(status_code=400, detail="Only PDF, TXT, HTML and MD files are accepted") # Get the file extension file_extension = os.path.splitext(file.filename)[1].lower() # Generate a session ID for this file session_id = str(uuid.uuid4()) # Create the session directory structure session_dir = os.path.join(UPLOAD_ROOT, session_id) uploaded_files_dir = os.path.join(session_dir, "uploaded_files") os.makedirs(uploaded_files_dir, exist_ok=True) # Create standardized filename standardized_filename = f"document{file_extension}" # Create the full path to save the file file_path = os.path.join(uploaded_files_dir, standardized_filename) # Save the file with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # Validate the file according to its type is_valid = False error_detail = "" if file_extension == '.pdf': try: reader = PdfReader(file_path) if len(reader.pages) == 0: error_detail = "PDF must contain at least one page" is_valid = False else: text = "" for page in reader.pages: text += page.extract_text() if len(text) < MIN_FILE_LENGTH: error_detail = f"PDF contains {len(text)} characters but must contain at least {MIN_FILE_LENGTH}" is_valid = False else: is_valid = True except: error_detail = "Invalid PDF format" is_valid = False elif file_extension == '.md': try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() if len(content) < MIN_FILE_LENGTH: error_detail = f"Markdown file contains {len(content)} characters but must contain at least {MIN_FILE_LENGTH}" is_valid = False elif not any(marker in content for marker in ['#', '-', '*', '`', '[', '>']): error_detail = "Markdown file does not contain any valid Markdown elements" is_valid = False else: is_valid = True except: error_detail = "Invalid Markdown format" is_valid = False elif file_extension == '.html': try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() if len(content) < MIN_FILE_LENGTH: error_detail = f"HTML file contains {len(content)} characters but must contain at least {MIN_FILE_LENGTH}" is_valid = False else: BeautifulSoup(content, 'html.parser') is_valid = True except: error_detail = "Invalid HTML format" is_valid = False elif file_extension == '.txt': try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() content_length = len(content.strip()) if content_length < MIN_FILE_LENGTH: error_detail = f"Text file contains {content_length} characters but must contain at least {MIN_FILE_LENGTH}" is_valid = False else: is_valid = True except: error_detail = "Invalid text format" is_valid = False if not is_valid: # Delete the invalid file os.remove(file_path) raise HTTPException(status_code=400, detail=error_detail or f"Invalid {file_extension[1:].upper()} file") # Store file path for later use session_files[session_id] = file_path return {"filename": standardized_filename, "status": "uploaded", "session_id": session_id} @router.post("/upload-url") async def upload_url(url: str = Form(...)): """ Upload content from a URL, extract text and store it as a document Args: url: The URL to download content from Returns: Dictionary with status and session_id """ try: # Retrieve the content from the URL response = requests.get(url, timeout=10) response.raise_for_status() # Raise an exception if the HTTP status is not 200 # Extract text from HTML with BeautifulSoup soup = BeautifulSoup(response.text, 'html.parser') # Remove script and style tags for script in soup(["script", "style"]): script.extract() # Extract the text text = soup.get_text() # Clean the text (remove multiple spaces and empty lines) lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) text = '\n'.join(chunk for chunk in chunks if chunk) # Limit to 1000 characters if necessary if len(text) > 25000: text = text[:25000] # Check if the text is long enough if len(text.strip()) < MIN_FILE_LENGTH: raise HTTPException( status_code=400, detail=f"The content is too short ({len(text.strip())} characters). Minimum required: {MIN_FILE_LENGTH} characters." ) # Generate a session ID session_id = str(uuid.uuid4()) # Create the directory structure for the session session_dir = os.path.join(UPLOAD_ROOT, session_id) uploaded_files_dir = os.path.join(session_dir, "uploaded_files") os.makedirs(uploaded_files_dir, exist_ok=True) # Path of the file to save file_path = os.path.join(uploaded_files_dir, "document.txt") # Save the text with open(file_path, "w", encoding="utf-8") as f: f.write(text) # Store the file path for later use session_files[session_id] = file_path return { "status": "uploaded", "session_id": session_id, "filename": "document.txt", "text_length": len(text), "source_url": url } except requests.exceptions.RequestException as e: raise HTTPException(status_code=400, detail=f"Error retrieving the URL: {str(e)}") except Exception as e: raise HTTPException(status_code=500, detail=f"Error processing the URL: {str(e)}")