Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
from fastapi import APIRouter, UploadFile, File, HTTPException | |
import os | |
import shutil | |
import uuid | |
from bs4 import BeautifulSoup | |
from PyPDF2 import PdfReader | |
router = APIRouter(tags=["files"]) | |
# Définir le stockage des fichiers par session (importé dans main.py) | |
session_files = {} | |
# Dossier racine pour les uploads | |
UPLOAD_ROOT = "uploaded_files" | |
os.makedirs(UPLOAD_ROOT, exist_ok=True) | |
def validate_pdf(file_path: str) -> bool: | |
"""Validate if file is a valid PDF.""" | |
try: | |
reader = PdfReader(file_path) | |
# Vérifier que le PDF a au moins une page | |
return len(reader.pages) > 0 | |
except: | |
return False | |
def validate_markdown(file_path: str) -> bool: | |
"""Validate if file is a valid Markdown file.""" | |
try: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
content = f.read() | |
# Simple check: file should contain some content and at least one markdown element | |
return len(content) > 0 and any(marker in content for marker in ['#', '-', '*', '`', '[', '>']) | |
except: | |
return False | |
def validate_html(file_path: str) -> bool: | |
"""Validate if file is a valid HTML file.""" | |
try: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
BeautifulSoup(f.read(), 'html.parser') | |
return True | |
except: | |
return False | |
def validate_txt(file_path: str) -> bool: | |
"""Validate if file is a valid text file.""" | |
try: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
content = f.read() | |
return len(content.strip()) > 0 | |
except: | |
return False | |
# Initialize session files dictionary with pre-calculated documents | |
precalculated_docs = ["the-bitter-lesson", "hurricane-faq", "pokemon-guide"] | |
for doc_id in precalculated_docs: | |
doc_dir = os.path.join(UPLOAD_ROOT, doc_id) | |
if os.path.exists(doc_dir): | |
doc_files_dir = os.path.join(doc_dir, "uploaded_files") | |
if os.path.exists(doc_files_dir): | |
for filename in os.listdir(doc_files_dir): | |
if filename.endswith((".pdf", ".txt", ".html", ".md")): | |
file_path = os.path.join(doc_files_dir, filename) | |
session_files[doc_id] = file_path | |
print(f"Added pre-calculated document to session_files: {doc_id} -> {file_path}") | |
break | |
else: | |
# Search directly in the doc_dir | |
for filename in os.listdir(doc_dir): | |
if filename.endswith((".pdf", ".txt", ".html", ".md")): | |
file_path = os.path.join(doc_dir, filename) | |
session_files[doc_id] = file_path | |
print(f"Added pre-calculated document to session_files: {doc_id} -> {file_path}") | |
break | |
async def upload_file(file: UploadFile = File(...)): | |
""" | |
Upload a file to the server and generate a session ID | |
Args: | |
file: The file to upload | |
Returns: | |
Dictionary with filename, status and session_id | |
""" | |
# Vérifier si le fichier est un PDF, TXT, HTML ou MD | |
if not file.filename.endswith(('.pdf', '.txt', '.html', '.md')): | |
raise HTTPException(status_code=400, detail="Only PDF, TXT, HTML and MD files are accepted") | |
# Get the file extension | |
file_extension = os.path.splitext(file.filename)[1].lower() | |
# Generate a session ID for this file | |
session_id = str(uuid.uuid4()) | |
# Create the session directory structure | |
session_dir = os.path.join(UPLOAD_ROOT, session_id) | |
uploaded_files_dir = os.path.join(session_dir, "uploaded_files") | |
os.makedirs(uploaded_files_dir, exist_ok=True) | |
# Create standardized filename | |
standardized_filename = f"document{file_extension}" | |
# Create the full path to save the file | |
file_path = os.path.join(uploaded_files_dir, standardized_filename) | |
# Sauvegarder le fichier | |
with open(file_path, "wb") as buffer: | |
shutil.copyfileobj(file.file, buffer) | |
# Valider le fichier selon son type | |
is_valid = False | |
if file_extension == '.pdf': | |
is_valid = validate_pdf(file_path) | |
elif file_extension == '.md': | |
is_valid = validate_markdown(file_path) | |
elif file_extension == '.html': | |
is_valid = validate_html(file_path) | |
elif file_extension == '.txt': | |
is_valid = validate_txt(file_path) | |
if not is_valid: | |
# Supprimer le fichier invalide | |
os.remove(file_path) | |
raise HTTPException(status_code=400, detail=f"Invalid {file_extension[1:].upper()} file") | |
# Store file path for later use | |
session_files[session_id] = file_path | |
return {"filename": standardized_filename, "status": "uploaded", "session_id": session_id} |