pdf2markdown / app.py
broadfield-dev's picture
Update app.py
ae3cd0d verified
raw
history blame
17.7 kB
import gradio as gr
import requests
import pdfplumber
from pdf2image import convert_from_path, convert_from_bytes
import pytesseract
from PIL import Image
import io
import os
from huggingface_hub import HfApi, create_repo
import re
from datetime import datetime
import urllib.parse
import logging
import subprocess
# Set up logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# Initialize Hugging Face API
HF_TOKEN = os.getenv("HF_TOKEN")
REPO_NAME = "pdf-images-extracted" # Consider making this configurable if needed
hf_api = HfApi()
def check_poppler():
try:
result = subprocess.run(["pdftoppm", "-v"], capture_output=True, text=True)
# pdftoppm -v typically prints version info to stderr
version_info_log = result.stderr.strip() if result.stderr else result.stdout.strip()
if version_info_log:
# Log the first line of the version info
logger.info(f"Poppler version check: {version_info_log.splitlines()[0]}")
else:
logger.info("Poppler 'pdftoppm -v' ran, but no version output on stdout/stderr. Poppler is likely present.")
# The main goal is to confirm 'pdftoppm' is executable.
# FileNotFoundError is the primary concern for "not found".
return True
except FileNotFoundError:
logger.error("Poppler (pdftoppm command) not found. Ensure poppler-utils is installed and in PATH.")
return False
except Exception as e: # Catch any other unexpected errors during subprocess execution
logger.error(f"An unexpected error occurred during Poppler check: {str(e)}")
return False
def ensure_hf_dataset():
try:
if not HF_TOKEN:
# This case should ideally be caught before attempting dataset operations
# However, having a check here is a good safeguard.
logger.error("HF_TOKEN is not set. Cannot ensure Hugging Face dataset.")
return "Error: HF_TOKEN is not set. Please configure it in Space secrets."
# Use hf_api instance which might be pre-configured with token, or pass token explicitly
# create_repo will use token from HfApi if initialized with one, or passed token, or env.
repo_id_obj = create_repo(repo_id=REPO_NAME, token=HF_TOKEN, repo_type="dataset", exist_ok=True)
logger.info(f"Dataset repo ensured: {repo_id_obj.repo_id}")
return repo_id_obj.repo_id # repo_id_obj is a RepoUrl object or similar
except Exception as e:
logger.error(f"Hugging Face dataset error: {str(e)}")
return f"Error: Failed to access or create dataset '{REPO_NAME}': {str(e)}"
def upload_image_to_hf(image, filename_base):
# filename_base should not include extension, it will be added.
repo_id_or_error = ensure_hf_dataset()
if isinstance(repo_id_or_error, str) and repo_id_or_error.startswith("Error"):
return repo_id_or_error # Return error message from ensure_hf_dataset
repo_id = repo_id_or_error # Now it's confirmed to be the repo_id string
try:
# Create a unique filename with timestamp in the repo to avoid collisions
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") # Added microseconds for more uniqueness
repo_filename = f"images/{filename_base}_{timestamp}.png"
temp_path = f"/tmp/{filename_base}_{timestamp}.png" # Use unique temp name too
image.save(temp_path, format="PNG")
logger.info(f"Attempting to upload {temp_path} to {repo_id}/{repo_filename}")
file_url = hf_api.upload_file(
path_or_fileobj=temp_path,
path_in_repo=repo_filename,
repo_id=repo_id,
repo_type="dataset",
token=HF_TOKEN # Explicitly pass token for clarity
)
os.remove(temp_path)
logger.info(f"Successfully uploaded image: {file_url}")
return file_url
except Exception as e:
logger.error(f"Image upload error for {filename_base}: {str(e)}")
# Clean up temp file if it exists and an error occurred after its creation
if 'temp_path' in locals() and os.path.exists(temp_path):
try:
os.remove(temp_path)
except OSError as ose:
logger.error(f"Error removing temp file {temp_path} after upload failure: {ose}")
return f"Error uploading image {filename_base}: {str(e)}"
def extract_text_from_pdf(pdf_input_source): # Renamed for clarity (source can be path, URL, or file obj)
try:
if isinstance(pdf_input_source, str): # Indicates a URL
logger.info(f"Fetching PDF from URL for text extraction: {pdf_input_source}")
response = requests.get(pdf_input_source, stream=True, timeout=20) # Increased timeout slightly
response.raise_for_status()
pdf_file_like_object = io.BytesIO(response.content)
logger.info("PDF downloaded successfully from URL.")
else: # Assumes a file object (e.g., from Gradio upload)
logger.info(f"Processing uploaded PDF file for text extraction: {getattr(pdf_input_source, 'name', 'N/A')}")
pdf_file_like_object = pdf_input_source
with pdfplumber.open(pdf_file_like_object) as pdf:
full_text = ""
for i, page in enumerate(pdf.pages):
logger.debug(f"Extracting text from page {i+1}")
page_text = page.extract_text(layout=True, x_density=1, y_density=1) or "" # x_density/y_density can impact layout accuracy
full_text += page_text + "\n\n" # Add double newline as page separator
logger.debug(f"Extracting tables from page {i+1}")
tables = page.extract_tables()
if tables:
for table_idx, table_data in enumerate(tables):
logger.debug(f"Processing table {table_idx+1} on page {i+1}")
if table_data: # Ensure table_data is not empty
table_md = "\n".join([" | ".join(str(cell) if cell is not None else "" for cell in row) for row in table_data])
header_separator = " | ".join(["---"] * len(table_data[0])) if table_data[0] else ""
full_text += f"**Table:**\n{table_md[:table_md.find(chr(10)) if table_md.find(chr(10)) > 0 else len(table_md)]}\n{header_separator}\n{table_md[table_md.find(chr(10))+1 if table_md.find(chr(10)) > 0 else '']}\n\n"
# full_text += f"**Table:**\n{table_md}\n\n" # Simpler table version
logger.info("Text and table extraction successful.")
return full_text
except Exception as e:
logger.error(f"Text extraction error: {str(e)}", exc_info=True)
return f"Error extracting text: {str(e)}"
def extract_images_from_pdf(pdf_input_source): # Renamed for clarity
if not check_poppler():
return "Error: poppler-utils not found or not working correctly. Image extraction depends on it."
try:
images = []
if isinstance(pdf_input_source, str): # Indicates a URL
logger.info(f"Fetching PDF from URL for image extraction: {pdf_input_source}")
response = requests.get(pdf_input_source, stream=True, timeout=20) # Increased timeout
response.raise_for_status()
logger.info("PDF downloaded successfully, converting to images.")
images = convert_from_bytes(response.content, dpi=200) # dpi can be adjusted
else: # Assumes a file object (e.g., from Gradio upload which is a TemporaryFileWrapper)
file_path = getattr(pdf_input_source, 'name', None)
if not file_path:
logger.error("Uploaded PDF file has no name attribute, cannot process for images.")
return "Error: Could not get path from uploaded PDF file for image extraction."
logger.info(f"Processing uploaded PDF file for image extraction: {file_path}")
images = convert_from_path(file_path, dpi=200)
logger.info(f"Successfully extracted {len(images)} image(s) from PDF.")
return images
except Exception as e:
logger.error(f"Image extraction error: {str(e)}", exc_info=True)
return f"Error extracting images: {str(e)}"
def format_to_markdown(text_content, images_list):
markdown_output = "# Extracted PDF Content\n\n"
# Normalize newlines: multiple consecutive newlines become a single blank line (two \n chars)
text_content = re.sub(r'\n\s*\n+', '\n\n', text_content.strip())
lines = text_content.split('\n') # Split by single newline. Blank lines between paragraphs become empty strings.
for i, line_text in enumerate(lines):
line_stripped = line_text.strip()
if not line_stripped: # Handle blank lines explicitly
# Add a single newline to markdown. This helps maintain paragraph separation.
markdown_output += "\n"
continue
# Regex for various list markers: "1.", "*", "-", "+" followed by space and content
list_match = re.match(r'^\s*(?:(?:\d+\.)|[*+-])\s+(.*)', line_stripped)
is_heading_candidate = line_stripped.isupper() and 5 < len(line_stripped) < 100 # Length constraint for ALL CAPS headings
if is_heading_candidate and not list_match: # Check it's not an ALL CAPS list item
markdown_output += f"## {line_stripped}\n\n"
elif list_match:
list_item_text = list_match.group(1) # Get the content part of the list item
markdown_output += f"- {list_item_text}\n" # Single newline for list items to keep them together
else:
# Default: treat as a paragraph line, add double newline for Markdown paragraph
markdown_output += f"{line_text}\n\n"
# Consolidate potentially excessive newlines that might arise from the logic above
markdown_output = re.sub(r'\n\s*\n+', '\n\n', markdown_output.strip())
markdown_output += "\n\n" # Ensure a blank line at the end of text content before images
if isinstance(images_list, list) and images_list:
markdown_output += "## Extracted Images\n\n"
for i, img_pil in enumerate(images_list):
ocr_text = ""
try:
ocr_text = pytesseract.image_to_string(img_pil).strip()
logger.info(f"OCR for image {i+1} successful.")
except Exception as ocr_e:
logger.error(f"OCR error for image {i+1}: {str(ocr_e)}")
ocr_text = f"OCR failed: {str(ocr_e)}"
image_filename_base = f"extracted_image_{i+1}"
image_url_or_error = upload_image_to_hf(img_pil, image_filename_base)
if isinstance(image_url_or_error, str) and not image_url_or_error.startswith("Error"):
markdown_output += f"![Image {i+1}]({image_url_or_error})\n"
if ocr_text and not ocr_text.startswith("OCR failed:"):
markdown_output += f"**Image {i+1} OCR Text:**\n```\n{ocr_text}\n```\n\n"
elif ocr_text: # OCR failed message
markdown_output += f"**Image {i+1} OCR Note:** {ocr_text}\n\n"
else: # Error during upload or from ensure_hf_dataset
error_message = str(image_url_or_error) # Ensure it's a string
markdown_output += f"**Image {i+1} (Upload Error):** {error_message}\n\n"
return markdown_output.strip()
def process_pdf(pdf_file_upload, pdf_url_input):
current_status = "Starting PDF processing..."
logger.info(current_status)
if not HF_TOKEN:
current_status = "Error: HF_TOKEN is not set. Please set it in Space secrets for image uploads."
logger.error(current_status)
# App can still try to process text, but image uploads will fail.
# Let's allow text extraction to proceed but warn about images.
# For a stricter approach, uncomment return:
# return current_status, current_status
pdf_input_source = None
if pdf_url_input and pdf_url_input.strip():
resolved_url = urllib.parse.unquote(pdf_url_input.strip())
current_status = f"Attempting to download PDF from URL: {resolved_url}"
logger.info(current_status)
try:
# Use HEAD request to check URL validity and content type quickly
response = requests.head(resolved_url, allow_redirects=True, timeout=10)
response.raise_for_status()
content_type = response.headers.get('content-type', '').lower()
if 'application/pdf' not in content_type:
current_status = f"Error: URL does not point to a PDF file (Content-Type: {content_type})."
logger.error(current_status)
return current_status, current_status
pdf_input_source = resolved_url # Use the URL string as the source
logger.info("PDF URL validated.")
except requests.RequestException as e:
current_status = f"Error accessing URL '{resolved_url}': {str(e)}"
logger.error(current_status)
return current_status, current_status
elif pdf_file_upload:
# pdf_file_upload is a tempfile._TemporaryFileWrapper object from Gradio
pdf_input_source = pdf_file_upload
current_status = f"Processing uploaded PDF file: {pdf_file_upload.name}"
logger.info(current_status)
else:
current_status = "Error: No PDF file uploaded and no PDF URL provided."
logger.error(current_status)
return current_status, current_status
current_status = "Extracting text and tables from PDF..."
logger.info(current_status)
extracted_text = extract_text_from_pdf(pdf_input_source)
if isinstance(extracted_text, str) and extracted_text.startswith("Error extracting text:"):
current_status = f"Text extraction failed. {extracted_text}"
logger.error(current_status)
# Decide if to stop or continue for images
# For now, let's return the error directly
return extracted_text, current_status
# If pdf_input_source was a URL, extract_text_from_pdf already downloaded it.
# For extract_images_from_pdf, we need to pass the URL or file path again.
# If it was an uploaded file, its stream might have been consumed or pointer moved.
# It's safer to re-open/re-access for different libraries if they don't handle streams well.
# However, pdfplumber and pdf2image should handle file paths/objects correctly.
# If pdf_input_source is a file object, reset its read pointer if necessary.
if hasattr(pdf_input_source, 'seek') and not isinstance(pdf_input_source, str):
pdf_input_source.seek(0)
current_status = "Extracting images from PDF..."
logger.info(current_status)
extracted_images = extract_images_from_pdf(pdf_input_source)
if isinstance(extracted_images, str) and extracted_images.startswith("Error"): # Error string from extraction
current_status = f"Image extraction failed or partially failed. {extracted_images}"
logger.warning(current_status) # Warning, as text might still be useful
# We can proceed to format markdown with text and image error.
# Set images to empty list to avoid error in format_to_markdown
extracted_images = [] # Or pass the error string to be included by format_to_markdown
# Let format_to_markdown handle this, for now, we will pass the error string if it happened
# No, format_to_markdown expects a list of images or an error string from check_poppler
# if isinstance(extracted_images, str) -> it's an error string, that is fine.
current_status = "Formatting content to Markdown..."
logger.info(current_status)
# Pass the original extracted_images (which could be an error string or list of PIL images)
markdown_result = format_to_markdown(extracted_text, extracted_images)
current_status = "PDF processing complete."
logger.info(current_status)
return markdown_result, current_status
# Gradio Interface
iface = gr.Interface(
fn=process_pdf,
inputs=[
gr.File(label="Upload PDF File", file_types=[".pdf"]),
gr.Textbox(label="Or Enter PDF URL", placeholder="e.g., https://example.com/file.pdf"),
],
outputs=[
gr.Markdown(label="Markdown Output"),
gr.Textbox(label="Processing Status", interactive=False),
],
title="PDF to Markdown Converter",
description="Convert a PDF (uploaded file or URL) to Markdown. Extracts text, tables, and images. Images are uploaded to a Hugging Face dataset. Requires HF_TOKEN in Spaces Secrets for image functionality.",
allow_flagging="never",
examples=[
[None, "https.arxiv.org/pdf/1706.03762.pdf"], # Attention is All You Need
[None, "https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/pdf/dummy.pdf"] # A simple dummy PDF
]
)
if __name__ == "__main__":
logger.info("Starting Gradio app...")
try:
# When running in Hugging Face Spaces, share=False is recommended.
# The Space itself provides the public URL.
iface.launch(server_name="0.0.0.0", server_port=7860, share=False)
logger.info("Gradio app started successfully.")
except Exception as e:
logger.error(f"Failed to start Gradio app: {str(e)}", exc_info=True)
# Re-raise the exception to ensure the script exits if Gradio fails to launch
raise