Spaces:
Sleeping
Sleeping
import os | |
import io | |
import re | |
import logging | |
import subprocess | |
from datetime import datetime | |
import urllib.parse | |
import tempfile | |
from flask import Flask, request, render_template, redirect, url_for | |
from werkzeug.utils import secure_filename # For secure file handling | |
import requests | |
import pdfplumber | |
from pdf2image import convert_from_path, convert_from_bytes | |
import pytesseract | |
from PIL import Image | |
from huggingface_hub import HfApi, create_repo, HfHubHTTPError | |
# --- Flask App Initialization --- | |
app = Flask(__name__) | |
app.config['UPLOAD_FOLDER'] = tempfile.gettempdir() # Use system temp dir | |
app.config['MAX_CONTENT_LENGTH'] = 30 * 1024 * 1024 # 30 MB limit for uploads | |
# --- Logging Configuration --- | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
# --- Hugging Face Configuration --- | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
HF_DATASET_REPO_NAME = os.getenv("HF_DATASET_REPO_NAME", "pdf-images-extracted") # Allow override via env var | |
hf_api = HfApi() | |
# --- PDF Processing Helper Functions (Adapted from Gradio version) --- | |
def check_poppler(): | |
try: | |
result = subprocess.run(["pdftoppm", "-v"], capture_output=True, text=True, check=False) | |
version_info_log = result.stderr.strip() if result.stderr else result.stdout.strip() | |
if version_info_log: | |
logger.info(f"Poppler version check: {version_info_log.splitlines()[0] if version_info_log else 'No version output'}") | |
else: | |
logger.info("Poppler 'pdftoppm -v' ran. Assuming Poppler is present.") | |
return True | |
except FileNotFoundError: | |
logger.error("Poppler (pdftoppm command) not found. Ensure poppler-utils is installed and in PATH.") | |
return False | |
except Exception as e: | |
logger.error(f"An unexpected error occurred during Poppler check: {str(e)}") | |
return False | |
def ensure_hf_dataset(): | |
if not HF_TOKEN: | |
logger.warning("HF_TOKEN is not set. Cannot ensure Hugging Face dataset. Image uploads will fail.") | |
return "Error: HF_TOKEN is not set. Please configure it in Space secrets for image uploads." | |
try: | |
repo_id_obj = create_repo(repo_id=HF_DATASET_REPO_NAME, token=HF_TOKEN, repo_type="dataset", exist_ok=True) | |
logger.info(f"Dataset repo ensured: {repo_id_obj.repo_id}") | |
return repo_id_obj.repo_id | |
except HfHubHTTPError as e: | |
if e.response.status_code == 409: # Conflict, repo already exists | |
logger.info(f"Dataset repo '{HF_DATASET_REPO_NAME}' already exists.") | |
return f"{hf_api.whoami(token=HF_TOKEN)['name']}/{HF_DATASET_REPO_NAME}" # Construct repo_id | |
logger.error(f"Hugging Face dataset error (HTTP {e.response.status_code}): {str(e)}") | |
return f"Error: Failed to access or create dataset '{HF_DATASET_REPO_NAME}': {str(e)}" | |
except Exception as e: | |
logger.error(f"Hugging Face dataset error: {str(e)}", exc_info=True) | |
return f"Error: Failed to access or create dataset '{HF_DATASET_REPO_NAME}': {str(e)}" | |
def upload_image_to_hf(image_pil, filename_base): | |
repo_id_or_error = ensure_hf_dataset() | |
if isinstance(repo_id_or_error, str) and repo_id_or_error.startswith("Error"): | |
return repo_id_or_error | |
repo_id = repo_id_or_error | |
temp_image_path = None | |
try: | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") | |
repo_filename = f"images/{filename_base}_{timestamp}.png" # Path in repo | |
# Save PIL image to a temporary file to upload | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".png", dir=app.config['UPLOAD_FOLDER']) as tmp_file: | |
temp_image_path = tmp_file.name | |
image_pil.save(temp_image_path, format="PNG") | |
logger.info(f"Attempting to upload {temp_image_path} to {repo_id}/{repo_filename}") | |
file_url = hf_api.upload_file( | |
path_or_fileobj=temp_image_path, | |
path_in_repo=repo_filename, | |
repo_id=repo_id, | |
repo_type="dataset", | |
token=HF_TOKEN | |
) | |
logger.info(f"Successfully uploaded image: {file_url}") | |
return file_url | |
except Exception as e: | |
logger.error(f"Image upload error for {filename_base}: {str(e)}", exc_info=True) | |
return f"Error uploading image {filename_base}: {str(e)}" | |
finally: | |
if temp_image_path and os.path.exists(temp_image_path): | |
try: | |
os.remove(temp_image_path) | |
except OSError as ose: | |
logger.error(f"Error removing temp image file {temp_image_path}: {ose}") | |
def extract_text_from_pdf(pdf_input_source): # pdf_input_source is URL string or local file path | |
try: | |
pdf_file_like_object = None | |
if isinstance(pdf_input_source, str) and pdf_input_source.startswith(('http://', 'https://')): | |
logger.info(f"Fetching PDF from URL for text extraction: {pdf_input_source}") | |
response = requests.get(pdf_input_source, stream=True, timeout=30) | |
response.raise_for_status() | |
pdf_file_like_object = io.BytesIO(response.content) | |
logger.info("PDF downloaded successfully from URL.") | |
elif isinstance(pdf_input_source, str) and os.path.exists(pdf_input_source): # Local file path | |
logger.info(f"Processing local PDF file for text extraction: {pdf_input_source}") | |
# pdfplumber.open can take a path directly | |
pdf_file_like_object = pdf_input_source | |
else: | |
logger.error(f"Invalid pdf_input_source for text extraction: {pdf_input_source}") | |
return "Error: Invalid input for PDF text extraction (must be URL or valid file path)." | |
with pdfplumber.open(pdf_file_like_object) as pdf: | |
full_text = "" | |
for i, page in enumerate(pdf.pages): | |
page_text = page.extract_text(layout=True, x_density=1, y_density=1) or "" | |
full_text += page_text + "\n\n" | |
tables = page.extract_tables() | |
if tables: | |
for table_data in tables: | |
if table_data: | |
header = [" | ".join(str(cell) if cell is not None else "" for cell in table_data[0])] | |
separator = [" | ".join(["---"] * len(table_data[0]))] | |
body = [" | ".join(str(cell) if cell is not None else "" for cell in row) for row in table_data[1:]] | |
table_md_lines = header + separator + body | |
full_text += f"**Table:**\n" + "\n".join(table_md_lines) + "\n\n" | |
logger.info("Text and table extraction successful.") | |
return full_text.strip() | |
except requests.RequestException as e: | |
logger.error(f"URL fetch error for text extraction: {str(e)}", exc_info=True) | |
return f"Error fetching PDF from URL: {str(e)}" | |
except Exception as e: | |
logger.error(f"Text extraction error: {str(e)}", exc_info=True) | |
return f"Error extracting text: {str(e)}" | |
def extract_images_from_pdf(pdf_input_source): # pdf_input_source is URL string or local file path | |
if not check_poppler(): | |
return "Error: poppler-utils not found or not working correctly. Image extraction depends on it." | |
images_pil = [] | |
try: | |
if isinstance(pdf_input_source, str) and pdf_input_source.startswith(('http://', 'https://')): | |
logger.info(f"Fetching PDF from URL for image extraction: {pdf_input_source}") | |
response = requests.get(pdf_input_source, stream=True, timeout=30) | |
response.raise_for_status() | |
logger.info("PDF downloaded successfully from URL, converting to images.") | |
images_pil = convert_from_bytes(response.content, dpi=200) | |
elif isinstance(pdf_input_source, str) and os.path.exists(pdf_input_source): # Local file path | |
logger.info(f"Processing local PDF file for image extraction: {pdf_input_source}") | |
images_pil = convert_from_path(pdf_input_source, dpi=200) | |
else: | |
logger.error(f"Invalid pdf_input_source for image extraction: {pdf_input_source}") | |
return "Error: Invalid input for PDF image extraction (must be URL or valid file path)." | |
logger.info(f"Successfully extracted {len(images_pil)} image(s) from PDF.") | |
return images_pil | |
except requests.RequestException as e: | |
logger.error(f"URL fetch error for image extraction: {str(e)}", exc_info=True) | |
return f"Error fetching PDF from URL for image extraction: {str(e)}" | |
except Exception as e: | |
logger.error(f"Image extraction error: {str(e)}", exc_info=True) | |
return f"Error extracting images: {str(e)}" | |
def format_to_markdown(text_content, images_input): | |
markdown_output = "# Extracted PDF Content\n\n" | |
if text_content.startswith("Error"): # If text extraction itself failed | |
markdown_output += f"**Text Extraction Note:**\n{text_content}\n\n" | |
else: | |
text_content = re.sub(r'\n\s*\n+', '\n\n', text_content.strip()) | |
lines = text_content.split('\n') | |
is_in_list = False | |
for line_text in lines: | |
line_stripped = line_text.strip() | |
if not line_stripped: | |
markdown_output += "\n" | |
is_in_list = False | |
continue | |
list_match = re.match(r'^\s*(?:(?:\d+\.)|[*+-])\s+(.*)', line_stripped) | |
is_heading_candidate = line_stripped.isupper() and 5 < len(line_stripped) < 100 | |
if is_heading_candidate and not list_match: | |
markdown_output += f"## {line_stripped}\n\n" | |
is_in_list = False | |
elif list_match: | |
list_item_text = list_match.group(1) | |
markdown_output += f"- {list_item_text}\n" | |
is_in_list = True | |
else: | |
if is_in_list: markdown_output += "\n" | |
markdown_output += f"{line_text}\n\n" | |
is_in_list = False | |
markdown_output = re.sub(r'\n\s*\n+', '\n\n', markdown_output.strip()) + "\n\n" | |
if isinstance(images_input, list) and images_input: | |
markdown_output += "## Extracted Images\n\n" | |
if not HF_TOKEN: | |
markdown_output += "**Note:** `HF_TOKEN` not set. Images were extracted but not uploaded to Hugging Face Hub.\n\n" | |
for i, img_pil in enumerate(images_input): | |
ocr_text = "" | |
try: | |
ocr_text = pytesseract.image_to_string(img_pil).strip() | |
logger.info(f"OCR for image {i+1} successful.") | |
except Exception as ocr_e: | |
logger.error(f"OCR error for image {i+1}: {str(ocr_e)}") | |
ocr_text = f"OCR failed: {str(ocr_e)}" | |
if HF_TOKEN: # Only attempt upload if token is present | |
image_filename_base = f"extracted_image_{i+1}" | |
image_url_or_error = upload_image_to_hf(img_pil, image_filename_base) | |
if isinstance(image_url_or_error, str) and not image_url_or_error.startswith("Error"): | |
markdown_output += f"\n" | |
else: | |
markdown_output += f"**Image {i+1} (Upload Error):** {str(image_url_or_error)}\n\n" | |
else: # No token, show placeholder or local info if we were saving them locally | |
markdown_output += f"**Image {i+1} (not uploaded due to missing HF_TOKEN)**\n" | |
if ocr_text: | |
markdown_output += f"**Image {i+1} OCR Text:**\n```\n{ocr_text}\n```\n\n" | |
elif isinstance(images_input, str) and images_input.startswith("Error"): | |
markdown_output += f"## Image Extraction Note\n\n{images_input}\n\n" | |
return markdown_output.strip() | |
# --- Flask Routes --- | |
def index(): | |
return render_template('index.html') | |
def process_pdf_route(): | |
pdf_file = request.files.get('pdf_file') | |
pdf_url = request.form.get('pdf_url', '').strip() | |
status_message = "Starting PDF processing..." | |
error_message = None | |
markdown_output = None | |
temp_pdf_path = None | |
pdf_input_source = None # This will be a URL string or a local file path | |
try: | |
if pdf_file and pdf_file.filename: | |
if not pdf_file.filename.lower().endswith('.pdf'): | |
raise ValueError("Uploaded file is not a PDF.") | |
filename = secure_filename(pdf_file.filename) | |
# Save to a temporary file | |
fd, temp_pdf_path = tempfile.mkstemp(suffix=".pdf", prefix="upload_", dir=app.config['UPLOAD_FOLDER']) | |
os.close(fd) # close file descriptor from mkstemp | |
pdf_file.save(temp_pdf_path) | |
logger.info(f"Uploaded PDF saved to temporary path: {temp_pdf_path}") | |
pdf_input_source = temp_pdf_path | |
status_message = f"Processing uploaded PDF: {filename}" | |
elif pdf_url: | |
pdf_url = urllib.parse.unquote(pdf_url) | |
# Basic URL validation | |
if not (pdf_url.startswith('http://') or pdf_url.startswith('https://')): | |
raise ValueError("Invalid URL scheme. Must be http or https.") | |
if not pdf_url.lower().endswith('.pdf'): | |
logger.warning(f"URL {pdf_url} does not end with .pdf. Proceeding with caution.") | |
# Allow proceeding but log warning, actual check is content-type or processing error | |
# Quick check with HEAD request (optional, but good practice) | |
try: | |
head_resp = requests.head(pdf_url, allow_redirects=True, timeout=10) | |
head_resp.raise_for_status() | |
content_type = head_resp.headers.get('content-type', '').lower() | |
if 'application/pdf' not in content_type: | |
logger.warning(f"URL {pdf_url} content-type is '{content_type}', not 'application/pdf'.") | |
# Depending on strictness, could raise ValueError here | |
except requests.RequestException as re: | |
logger.error(f"Failed HEAD request for URL {pdf_url}: {re}") | |
# Proceed, main request in extract functions will handle final failure | |
pdf_input_source = pdf_url | |
status_message = f"Processing PDF from URL: {pdf_url}" | |
else: | |
raise ValueError("No PDF file uploaded and no PDF URL provided.") | |
# --- Core Processing --- | |
status_message += "\nExtracting text..." | |
logger.info(status_message) | |
extracted_text = extract_text_from_pdf(pdf_input_source) | |
if isinstance(extracted_text, str) and extracted_text.startswith("Error"): | |
# Let format_to_markdown handle displaying this error within its structure | |
logger.error(f"Text extraction resulted in error: {extracted_text}") | |
status_message += "\nExtracting images..." | |
logger.info(status_message) | |
extracted_images = extract_images_from_pdf(pdf_input_source) # list of PIL images or error string | |
if isinstance(extracted_images, str) and extracted_images.startswith("Error"): | |
logger.error(f"Image extraction resulted in error: {extracted_images}") | |
status_message += "\nFormatting to Markdown..." | |
logger.info(status_message) | |
markdown_output = format_to_markdown(extracted_text, extracted_images) | |
status_message = "Processing complete." | |
if isinstance(extracted_text, str) and extracted_text.startswith("Error"): | |
status_message += f" (Text extraction issues: {extracted_text.split(':', 1)[1].strip()})" | |
if isinstance(extracted_images, str) and extracted_images.startswith("Error"): | |
status_message += f" (Image extraction issues: {extracted_images.split(':', 1)[1].strip()})" | |
if not HF_TOKEN and isinstance(extracted_images, list) and extracted_images: | |
status_message += " (Note: HF_TOKEN not set, images not uploaded to Hub)" | |
except ValueError as ve: | |
logger.error(f"Input validation error: {str(ve)}") | |
error_message = str(ve) | |
status_message = "Processing failed." | |
except Exception as e: | |
logger.error(f"An unexpected error occurred during processing: {str(e)}", exc_info=True) | |
error_message = f"An unexpected error occurred: {str(e)}" | |
status_message = "Processing failed due to an unexpected error." | |
finally: | |
if temp_pdf_path and os.path.exists(temp_pdf_path): | |
try: | |
os.remove(temp_pdf_path) | |
logger.info(f"Removed temporary PDF: {temp_pdf_path}") | |
except OSError as ose: | |
logger.error(f"Error removing temporary PDF {temp_pdf_path}: {ose}") | |
return render_template('index.html', | |
markdown_output=markdown_output, | |
status_message=status_message, | |
error_message=error_message) | |
# --- Main Execution --- | |
if __name__ == '__main__': | |
# This is for local development. For Hugging Face Spaces, Gunicorn is used via Dockerfile CMD. | |
# Poppler check at startup for local dev convenience | |
if not check_poppler(): | |
logger.warning("Poppler utilities might not be installed correctly. PDF processing might fail.") | |
# Ensure UPLOAD_FOLDER exists | |
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) | |
app.run(host='0.0.0.0', port=int(os.getenv("PORT", 7860)), debug=True) |