import os import io import re import logging import subprocess from datetime import datetime import urllib.parse import tempfile from flask import Flask, request, render_template, redirect, url_for from werkzeug.utils import secure_filename # For secure file handling import requests import pdfplumber from pdf2image import convert_from_path, convert_from_bytes import pytesseract from PIL import Image from huggingface_hub import HfApi, create_repo, HfHubHTTPError # --- Flask App Initialization --- app = Flask(__name__) app.config['UPLOAD_FOLDER'] = tempfile.gettempdir() # Use system temp dir app.config['MAX_CONTENT_LENGTH'] = 30 * 1024 * 1024 # 30 MB limit for uploads # --- Logging Configuration --- logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) # --- Hugging Face Configuration --- HF_TOKEN = os.getenv("HF_TOKEN") HF_DATASET_REPO_NAME = os.getenv("HF_DATASET_REPO_NAME", "pdf-images-extracted") # Allow override via env var hf_api = HfApi() # --- PDF Processing Helper Functions (Adapted from Gradio version) --- def check_poppler(): try: result = subprocess.run(["pdftoppm", "-v"], capture_output=True, text=True, check=False) version_info_log = result.stderr.strip() if result.stderr else result.stdout.strip() if version_info_log: logger.info(f"Poppler version check: {version_info_log.splitlines()[0] if version_info_log else 'No version output'}") else: logger.info("Poppler 'pdftoppm -v' ran. Assuming Poppler is present.") return True except FileNotFoundError: logger.error("Poppler (pdftoppm command) not found. Ensure poppler-utils is installed and in PATH.") return False except Exception as e: logger.error(f"An unexpected error occurred during Poppler check: {str(e)}") return False def ensure_hf_dataset(): if not HF_TOKEN: logger.warning("HF_TOKEN is not set. Cannot ensure Hugging Face dataset. Image uploads will fail.") return "Error: HF_TOKEN is not set. Please configure it in Space secrets for image uploads." try: repo_id_obj = create_repo(repo_id=HF_DATASET_REPO_NAME, token=HF_TOKEN, repo_type="dataset", exist_ok=True) logger.info(f"Dataset repo ensured: {repo_id_obj.repo_id}") return repo_id_obj.repo_id except HfHubHTTPError as e: if e.response.status_code == 409: # Conflict, repo already exists logger.info(f"Dataset repo '{HF_DATASET_REPO_NAME}' already exists.") return f"{hf_api.whoami(token=HF_TOKEN)['name']}/{HF_DATASET_REPO_NAME}" # Construct repo_id logger.error(f"Hugging Face dataset error (HTTP {e.response.status_code}): {str(e)}") return f"Error: Failed to access or create dataset '{HF_DATASET_REPO_NAME}': {str(e)}" except Exception as e: logger.error(f"Hugging Face dataset error: {str(e)}", exc_info=True) return f"Error: Failed to access or create dataset '{HF_DATASET_REPO_NAME}': {str(e)}" def upload_image_to_hf(image_pil, filename_base): repo_id_or_error = ensure_hf_dataset() if isinstance(repo_id_or_error, str) and repo_id_or_error.startswith("Error"): return repo_id_or_error repo_id = repo_id_or_error temp_image_path = None try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") repo_filename = f"images/{filename_base}_{timestamp}.png" # Path in repo # Save PIL image to a temporary file to upload with tempfile.NamedTemporaryFile(delete=False, suffix=".png", dir=app.config['UPLOAD_FOLDER']) as tmp_file: temp_image_path = tmp_file.name image_pil.save(temp_image_path, format="PNG") logger.info(f"Attempting to upload {temp_image_path} to {repo_id}/{repo_filename}") file_url = hf_api.upload_file( path_or_fileobj=temp_image_path, path_in_repo=repo_filename, repo_id=repo_id, repo_type="dataset", token=HF_TOKEN ) logger.info(f"Successfully uploaded image: {file_url}") return file_url except Exception as e: logger.error(f"Image upload error for {filename_base}: {str(e)}", exc_info=True) return f"Error uploading image {filename_base}: {str(e)}" finally: if temp_image_path and os.path.exists(temp_image_path): try: os.remove(temp_image_path) except OSError as ose: logger.error(f"Error removing temp image file {temp_image_path}: {ose}") def extract_text_from_pdf(pdf_input_source): # pdf_input_source is URL string or local file path try: pdf_file_like_object = None if isinstance(pdf_input_source, str) and pdf_input_source.startswith(('http://', 'https://')): logger.info(f"Fetching PDF from URL for text extraction: {pdf_input_source}") response = requests.get(pdf_input_source, stream=True, timeout=30) response.raise_for_status() pdf_file_like_object = io.BytesIO(response.content) logger.info("PDF downloaded successfully from URL.") elif isinstance(pdf_input_source, str) and os.path.exists(pdf_input_source): # Local file path logger.info(f"Processing local PDF file for text extraction: {pdf_input_source}") # pdfplumber.open can take a path directly pdf_file_like_object = pdf_input_source else: logger.error(f"Invalid pdf_input_source for text extraction: {pdf_input_source}") return "Error: Invalid input for PDF text extraction (must be URL or valid file path)." with pdfplumber.open(pdf_file_like_object) as pdf: full_text = "" for i, page in enumerate(pdf.pages): page_text = page.extract_text(layout=True, x_density=1, y_density=1) or "" full_text += page_text + "\n\n" tables = page.extract_tables() if tables: for table_data in tables: if table_data: header = [" | ".join(str(cell) if cell is not None else "" for cell in table_data[0])] separator = [" | ".join(["---"] * len(table_data[0]))] body = [" | ".join(str(cell) if cell is not None else "" for cell in row) for row in table_data[1:]] table_md_lines = header + separator + body full_text += f"**Table:**\n" + "\n".join(table_md_lines) + "\n\n" logger.info("Text and table extraction successful.") return full_text.strip() except requests.RequestException as e: logger.error(f"URL fetch error for text extraction: {str(e)}", exc_info=True) return f"Error fetching PDF from URL: {str(e)}" except Exception as e: logger.error(f"Text extraction error: {str(e)}", exc_info=True) return f"Error extracting text: {str(e)}" def extract_images_from_pdf(pdf_input_source): # pdf_input_source is URL string or local file path if not check_poppler(): return "Error: poppler-utils not found or not working correctly. Image extraction depends on it." images_pil = [] try: if isinstance(pdf_input_source, str) and pdf_input_source.startswith(('http://', 'https://')): logger.info(f"Fetching PDF from URL for image extraction: {pdf_input_source}") response = requests.get(pdf_input_source, stream=True, timeout=30) response.raise_for_status() logger.info("PDF downloaded successfully from URL, converting to images.") images_pil = convert_from_bytes(response.content, dpi=200) elif isinstance(pdf_input_source, str) and os.path.exists(pdf_input_source): # Local file path logger.info(f"Processing local PDF file for image extraction: {pdf_input_source}") images_pil = convert_from_path(pdf_input_source, dpi=200) else: logger.error(f"Invalid pdf_input_source for image extraction: {pdf_input_source}") return "Error: Invalid input for PDF image extraction (must be URL or valid file path)." logger.info(f"Successfully extracted {len(images_pil)} image(s) from PDF.") return images_pil except requests.RequestException as e: logger.error(f"URL fetch error for image extraction: {str(e)}", exc_info=True) return f"Error fetching PDF from URL for image extraction: {str(e)}" except Exception as e: logger.error(f"Image extraction error: {str(e)}", exc_info=True) return f"Error extracting images: {str(e)}" def format_to_markdown(text_content, images_input): markdown_output = "# Extracted PDF Content\n\n" if text_content.startswith("Error"): # If text extraction itself failed markdown_output += f"**Text Extraction Note:**\n{text_content}\n\n" else: text_content = re.sub(r'\n\s*\n+', '\n\n', text_content.strip()) lines = text_content.split('\n') is_in_list = False for line_text in lines: line_stripped = line_text.strip() if not line_stripped: markdown_output += "\n" is_in_list = False continue list_match = re.match(r'^\s*(?:(?:\d+\.)|[*+-])\s+(.*)', line_stripped) is_heading_candidate = line_stripped.isupper() and 5 < len(line_stripped) < 100 if is_heading_candidate and not list_match: markdown_output += f"## {line_stripped}\n\n" is_in_list = False elif list_match: list_item_text = list_match.group(1) markdown_output += f"- {list_item_text}\n" is_in_list = True else: if is_in_list: markdown_output += "\n" markdown_output += f"{line_text}\n\n" is_in_list = False markdown_output = re.sub(r'\n\s*\n+', '\n\n', markdown_output.strip()) + "\n\n" if isinstance(images_input, list) and images_input: markdown_output += "## Extracted Images\n\n" if not HF_TOKEN: markdown_output += "**Note:** `HF_TOKEN` not set. Images were extracted but not uploaded to Hugging Face Hub.\n\n" for i, img_pil in enumerate(images_input): ocr_text = "" try: ocr_text = pytesseract.image_to_string(img_pil).strip() logger.info(f"OCR for image {i+1} successful.") except Exception as ocr_e: logger.error(f"OCR error for image {i+1}: {str(ocr_e)}") ocr_text = f"OCR failed: {str(ocr_e)}" if HF_TOKEN: # Only attempt upload if token is present image_filename_base = f"extracted_image_{i+1}" image_url_or_error = upload_image_to_hf(img_pil, image_filename_base) if isinstance(image_url_or_error, str) and not image_url_or_error.startswith("Error"): markdown_output += f"![Image {i+1}]({image_url_or_error})\n" else: markdown_output += f"**Image {i+1} (Upload Error):** {str(image_url_or_error)}\n\n" else: # No token, show placeholder or local info if we were saving them locally markdown_output += f"**Image {i+1} (not uploaded due to missing HF_TOKEN)**\n" if ocr_text: markdown_output += f"**Image {i+1} OCR Text:**\n```\n{ocr_text}\n```\n\n" elif isinstance(images_input, str) and images_input.startswith("Error"): markdown_output += f"## Image Extraction Note\n\n{images_input}\n\n" return markdown_output.strip() # --- Flask Routes --- @app.route('/', methods=['GET']) def index(): return render_template('index.html') @app.route('/process', methods=['POST']) def process_pdf_route(): pdf_file = request.files.get('pdf_file') pdf_url = request.form.get('pdf_url', '').strip() status_message = "Starting PDF processing..." error_message = None markdown_output = None temp_pdf_path = None pdf_input_source = None # This will be a URL string or a local file path try: if pdf_file and pdf_file.filename: if not pdf_file.filename.lower().endswith('.pdf'): raise ValueError("Uploaded file is not a PDF.") filename = secure_filename(pdf_file.filename) # Save to a temporary file fd, temp_pdf_path = tempfile.mkstemp(suffix=".pdf", prefix="upload_", dir=app.config['UPLOAD_FOLDER']) os.close(fd) # close file descriptor from mkstemp pdf_file.save(temp_pdf_path) logger.info(f"Uploaded PDF saved to temporary path: {temp_pdf_path}") pdf_input_source = temp_pdf_path status_message = f"Processing uploaded PDF: {filename}" elif pdf_url: pdf_url = urllib.parse.unquote(pdf_url) # Basic URL validation if not (pdf_url.startswith('http://') or pdf_url.startswith('https://')): raise ValueError("Invalid URL scheme. Must be http or https.") if not pdf_url.lower().endswith('.pdf'): logger.warning(f"URL {pdf_url} does not end with .pdf. Proceeding with caution.") # Allow proceeding but log warning, actual check is content-type or processing error # Quick check with HEAD request (optional, but good practice) try: head_resp = requests.head(pdf_url, allow_redirects=True, timeout=10) head_resp.raise_for_status() content_type = head_resp.headers.get('content-type', '').lower() if 'application/pdf' not in content_type: logger.warning(f"URL {pdf_url} content-type is '{content_type}', not 'application/pdf'.") # Depending on strictness, could raise ValueError here except requests.RequestException as re: logger.error(f"Failed HEAD request for URL {pdf_url}: {re}") # Proceed, main request in extract functions will handle final failure pdf_input_source = pdf_url status_message = f"Processing PDF from URL: {pdf_url}" else: raise ValueError("No PDF file uploaded and no PDF URL provided.") # --- Core Processing --- status_message += "\nExtracting text..." logger.info(status_message) extracted_text = extract_text_from_pdf(pdf_input_source) if isinstance(extracted_text, str) and extracted_text.startswith("Error"): # Let format_to_markdown handle displaying this error within its structure logger.error(f"Text extraction resulted in error: {extracted_text}") status_message += "\nExtracting images..." logger.info(status_message) extracted_images = extract_images_from_pdf(pdf_input_source) # list of PIL images or error string if isinstance(extracted_images, str) and extracted_images.startswith("Error"): logger.error(f"Image extraction resulted in error: {extracted_images}") status_message += "\nFormatting to Markdown..." logger.info(status_message) markdown_output = format_to_markdown(extracted_text, extracted_images) status_message = "Processing complete." if isinstance(extracted_text, str) and extracted_text.startswith("Error"): status_message += f" (Text extraction issues: {extracted_text.split(':', 1)[1].strip()})" if isinstance(extracted_images, str) and extracted_images.startswith("Error"): status_message += f" (Image extraction issues: {extracted_images.split(':', 1)[1].strip()})" if not HF_TOKEN and isinstance(extracted_images, list) and extracted_images: status_message += " (Note: HF_TOKEN not set, images not uploaded to Hub)" except ValueError as ve: logger.error(f"Input validation error: {str(ve)}") error_message = str(ve) status_message = "Processing failed." except Exception as e: logger.error(f"An unexpected error occurred during processing: {str(e)}", exc_info=True) error_message = f"An unexpected error occurred: {str(e)}" status_message = "Processing failed due to an unexpected error." finally: if temp_pdf_path and os.path.exists(temp_pdf_path): try: os.remove(temp_pdf_path) logger.info(f"Removed temporary PDF: {temp_pdf_path}") except OSError as ose: logger.error(f"Error removing temporary PDF {temp_pdf_path}: {ose}") return render_template('index.html', markdown_output=markdown_output, status_message=status_message, error_message=error_message) # --- Main Execution --- if __name__ == '__main__': # This is for local development. For Hugging Face Spaces, Gunicorn is used via Dockerfile CMD. # Poppler check at startup for local dev convenience if not check_poppler(): logger.warning("Poppler utilities might not be installed correctly. PDF processing might fail.") # Ensure UPLOAD_FOLDER exists os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) app.run(host='0.0.0.0', port=int(os.getenv("PORT", 7860)), debug=True)