Spaces:
Sleeping
Sleeping
import gradio as gr | |
import requests | |
import pdfplumber | |
from pdf2image import convert_from_path, convert_from_bytes | |
import pytesseract | |
from PIL import Image | |
import io | |
import os | |
from huggingface_hub import HfApi, create_repo | |
import re | |
from datetime import datetime | |
import urllib.parse | |
import logging | |
import subprocess | |
# Set up logging | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
# Initialize Hugging Face API | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
REPO_NAME = "pdf-images-extracted" # Consider making this configurable if needed | |
hf_api = HfApi() | |
def check_poppler(): | |
try: | |
result = subprocess.run(["pdftoppm", "-v"], capture_output=True, text=True) | |
# pdftoppm -v typically prints version info to stderr | |
version_info_log = result.stderr.strip() if result.stderr else result.stdout.strip() | |
if version_info_log: | |
# Log the first line of the version info | |
logger.info(f"Poppler version check: {version_info_log.splitlines()[0]}") | |
else: | |
logger.info("Poppler 'pdftoppm -v' ran, but no version output on stdout/stderr. Poppler is likely present.") | |
# The main goal is to confirm 'pdftoppm' is executable. | |
# FileNotFoundError is the primary concern for "not found". | |
return True | |
except FileNotFoundError: | |
logger.error("Poppler (pdftoppm command) not found. Ensure poppler-utils is installed and in PATH.") | |
return False | |
except Exception as e: # Catch any other unexpected errors during subprocess execution | |
logger.error(f"An unexpected error occurred during Poppler check: {str(e)}") | |
return False | |
def ensure_hf_dataset(): | |
try: | |
if not HF_TOKEN: | |
# This case should ideally be caught before attempting dataset operations | |
# However, having a check here is a good safeguard. | |
logger.error("HF_TOKEN is not set. Cannot ensure Hugging Face dataset.") | |
return "Error: HF_TOKEN is not set. Please configure it in Space secrets." | |
# Use hf_api instance which might be pre-configured with token, or pass token explicitly | |
# create_repo will use token from HfApi if initialized with one, or passed token, or env. | |
repo_id_obj = create_repo(repo_id=REPO_NAME, token=HF_TOKEN, repo_type="dataset", exist_ok=True) | |
logger.info(f"Dataset repo ensured: {repo_id_obj.repo_id}") | |
return repo_id_obj.repo_id # repo_id_obj is a RepoUrl object or similar | |
except Exception as e: | |
logger.error(f"Hugging Face dataset error: {str(e)}") | |
return f"Error: Failed to access or create dataset '{REPO_NAME}': {str(e)}" | |
def upload_image_to_hf(image, filename_base): | |
# filename_base should not include extension, it will be added. | |
repo_id_or_error = ensure_hf_dataset() | |
if isinstance(repo_id_or_error, str) and repo_id_or_error.startswith("Error"): | |
return repo_id_or_error # Return error message from ensure_hf_dataset | |
repo_id = repo_id_or_error # Now it's confirmed to be the repo_id string | |
try: | |
# Create a unique filename with timestamp in the repo to avoid collisions | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") # Added microseconds for more uniqueness | |
repo_filename = f"images/{filename_base}_{timestamp}.png" | |
temp_path = f"/tmp/{filename_base}_{timestamp}.png" # Use unique temp name too | |
image.save(temp_path, format="PNG") | |
logger.info(f"Attempting to upload {temp_path} to {repo_id}/{repo_filename}") | |
file_url = hf_api.upload_file( | |
path_or_fileobj=temp_path, | |
path_in_repo=repo_filename, | |
repo_id=repo_id, | |
repo_type="dataset", | |
token=HF_TOKEN # Explicitly pass token for clarity | |
) | |
os.remove(temp_path) | |
logger.info(f"Successfully uploaded image: {file_url}") | |
return file_url | |
except Exception as e: | |
logger.error(f"Image upload error for {filename_base}: {str(e)}") | |
# Clean up temp file if it exists and an error occurred after its creation | |
if 'temp_path' in locals() and os.path.exists(temp_path): | |
try: | |
os.remove(temp_path) | |
except OSError as ose: | |
logger.error(f"Error removing temp file {temp_path} after upload failure: {ose}") | |
return f"Error uploading image {filename_base}: {str(e)}" | |
def extract_text_from_pdf(pdf_input_source): # Renamed for clarity (source can be path, URL, or file obj) | |
try: | |
if isinstance(pdf_input_source, str): # Indicates a URL | |
logger.info(f"Fetching PDF from URL for text extraction: {pdf_input_source}") | |
response = requests.get(pdf_input_source, stream=True, timeout=20) # Increased timeout slightly | |
response.raise_for_status() | |
pdf_file_like_object = io.BytesIO(response.content) | |
logger.info("PDF downloaded successfully from URL.") | |
else: # Assumes a file object (e.g., from Gradio upload) | |
logger.info(f"Processing uploaded PDF file for text extraction: {getattr(pdf_input_source, 'name', 'N/A')}") | |
pdf_file_like_object = pdf_input_source | |
with pdfplumber.open(pdf_file_like_object) as pdf: | |
full_text = "" | |
for i, page in enumerate(pdf.pages): | |
logger.debug(f"Extracting text from page {i+1}") | |
page_text = page.extract_text(layout=True, x_density=1, y_density=1) or "" # x_density/y_density can impact layout accuracy | |
full_text += page_text + "\n\n" # Add double newline as page separator | |
logger.debug(f"Extracting tables from page {i+1}") | |
tables = page.extract_tables() | |
if tables: | |
for table_idx, table_data in enumerate(tables): | |
logger.debug(f"Processing table {table_idx+1} on page {i+1}") | |
if table_data: # Ensure table_data is not empty | |
table_md = "\n".join([" | ".join(str(cell) if cell is not None else "" for cell in row) for row in table_data]) | |
header_separator = " | ".join(["---"] * len(table_data[0])) if table_data[0] else "" | |
full_text += f"**Table:**\n{table_md[:table_md.find(chr(10)) if table_md.find(chr(10)) > 0 else len(table_md)]}\n{header_separator}\n{table_md[table_md.find(chr(10))+1 if table_md.find(chr(10)) > 0 else '']}\n\n" | |
# full_text += f"**Table:**\n{table_md}\n\n" # Simpler table version | |
logger.info("Text and table extraction successful.") | |
return full_text | |
except Exception as e: | |
logger.error(f"Text extraction error: {str(e)}", exc_info=True) | |
return f"Error extracting text: {str(e)}" | |
def extract_images_from_pdf(pdf_input_source): # Renamed for clarity | |
if not check_poppler(): | |
return "Error: poppler-utils not found or not working correctly. Image extraction depends on it." | |
try: | |
images = [] | |
if isinstance(pdf_input_source, str): # Indicates a URL | |
logger.info(f"Fetching PDF from URL for image extraction: {pdf_input_source}") | |
response = requests.get(pdf_input_source, stream=True, timeout=20) # Increased timeout | |
response.raise_for_status() | |
logger.info("PDF downloaded successfully, converting to images.") | |
images = convert_from_bytes(response.content, dpi=200) # dpi can be adjusted | |
else: # Assumes a file object (e.g., from Gradio upload which is a TemporaryFileWrapper) | |
file_path = getattr(pdf_input_source, 'name', None) | |
if not file_path: | |
logger.error("Uploaded PDF file has no name attribute, cannot process for images.") | |
return "Error: Could not get path from uploaded PDF file for image extraction." | |
logger.info(f"Processing uploaded PDF file for image extraction: {file_path}") | |
images = convert_from_path(file_path, dpi=200) | |
logger.info(f"Successfully extracted {len(images)} image(s) from PDF.") | |
return images | |
except Exception as e: | |
logger.error(f"Image extraction error: {str(e)}", exc_info=True) | |
return f"Error extracting images: {str(e)}" | |
def format_to_markdown(text_content, images_list): | |
markdown_output = "# Extracted PDF Content\n\n" | |
# Normalize newlines: multiple consecutive newlines become a single blank line (two \n chars) | |
text_content = re.sub(r'\n\s*\n+', '\n\n', text_content.strip()) | |
lines = text_content.split('\n') # Split by single newline. Blank lines between paragraphs become empty strings. | |
for i, line_text in enumerate(lines): | |
line_stripped = line_text.strip() | |
if not line_stripped: # Handle blank lines explicitly | |
# Add a single newline to markdown. This helps maintain paragraph separation. | |
markdown_output += "\n" | |
continue | |
# Regex for various list markers: "1.", "*", "-", "+" followed by space and content | |
list_match = re.match(r'^\s*(?:(?:\d+\.)|[*+-])\s+(.*)', line_stripped) | |
is_heading_candidate = line_stripped.isupper() and 5 < len(line_stripped) < 100 # Length constraint for ALL CAPS headings | |
if is_heading_candidate and not list_match: # Check it's not an ALL CAPS list item | |
markdown_output += f"## {line_stripped}\n\n" | |
elif list_match: | |
list_item_text = list_match.group(1) # Get the content part of the list item | |
markdown_output += f"- {list_item_text}\n" # Single newline for list items to keep them together | |
else: | |
# Default: treat as a paragraph line, add double newline for Markdown paragraph | |
markdown_output += f"{line_text}\n\n" | |
# Consolidate potentially excessive newlines that might arise from the logic above | |
markdown_output = re.sub(r'\n\s*\n+', '\n\n', markdown_output.strip()) | |
markdown_output += "\n\n" # Ensure a blank line at the end of text content before images | |
if isinstance(images_list, list) and images_list: | |
markdown_output += "## Extracted Images\n\n" | |
for i, img_pil in enumerate(images_list): | |
ocr_text = "" | |
try: | |
ocr_text = pytesseract.image_to_string(img_pil).strip() | |
logger.info(f"OCR for image {i+1} successful.") | |
except Exception as ocr_e: | |
logger.error(f"OCR error for image {i+1}: {str(ocr_e)}") | |
ocr_text = f"OCR failed: {str(ocr_e)}" | |
image_filename_base = f"extracted_image_{i+1}" | |
image_url_or_error = upload_image_to_hf(img_pil, image_filename_base) | |
if isinstance(image_url_or_error, str) and not image_url_or_error.startswith("Error"): | |
markdown_output += f"\n" | |
if ocr_text and not ocr_text.startswith("OCR failed:"): | |
markdown_output += f"**Image {i+1} OCR Text:**\n```\n{ocr_text}\n```\n\n" | |
elif ocr_text: # OCR failed message | |
markdown_output += f"**Image {i+1} OCR Note:** {ocr_text}\n\n" | |
else: # Error during upload or from ensure_hf_dataset | |
error_message = str(image_url_or_error) # Ensure it's a string | |
markdown_output += f"**Image {i+1} (Upload Error):** {error_message}\n\n" | |
return markdown_output.strip() | |
def process_pdf(pdf_file_upload, pdf_url_input): | |
current_status = "Starting PDF processing..." | |
logger.info(current_status) | |
if not HF_TOKEN: | |
current_status = "Error: HF_TOKEN is not set. Please set it in Space secrets for image uploads." | |
logger.error(current_status) | |
# App can still try to process text, but image uploads will fail. | |
# Let's allow text extraction to proceed but warn about images. | |
# For a stricter approach, uncomment return: | |
# return current_status, current_status | |
pdf_input_source = None | |
if pdf_url_input and pdf_url_input.strip(): | |
resolved_url = urllib.parse.unquote(pdf_url_input.strip()) | |
current_status = f"Attempting to download PDF from URL: {resolved_url}" | |
logger.info(current_status) | |
try: | |
# Use HEAD request to check URL validity and content type quickly | |
response = requests.head(resolved_url, allow_redirects=True, timeout=10) | |
response.raise_for_status() | |
content_type = response.headers.get('content-type', '').lower() | |
if 'application/pdf' not in content_type: | |
current_status = f"Error: URL does not point to a PDF file (Content-Type: {content_type})." | |
logger.error(current_status) | |
return current_status, current_status | |
pdf_input_source = resolved_url # Use the URL string as the source | |
logger.info("PDF URL validated.") | |
except requests.RequestException as e: | |
current_status = f"Error accessing URL '{resolved_url}': {str(e)}" | |
logger.error(current_status) | |
return current_status, current_status | |
elif pdf_file_upload: | |
# pdf_file_upload is a tempfile._TemporaryFileWrapper object from Gradio | |
pdf_input_source = pdf_file_upload | |
current_status = f"Processing uploaded PDF file: {pdf_file_upload.name}" | |
logger.info(current_status) | |
else: | |
current_status = "Error: No PDF file uploaded and no PDF URL provided." | |
logger.error(current_status) | |
return current_status, current_status | |
current_status = "Extracting text and tables from PDF..." | |
logger.info(current_status) | |
extracted_text = extract_text_from_pdf(pdf_input_source) | |
if isinstance(extracted_text, str) and extracted_text.startswith("Error extracting text:"): | |
current_status = f"Text extraction failed. {extracted_text}" | |
logger.error(current_status) | |
# Decide if to stop or continue for images | |
# For now, let's return the error directly | |
return extracted_text, current_status | |
# If pdf_input_source was a URL, extract_text_from_pdf already downloaded it. | |
# For extract_images_from_pdf, we need to pass the URL or file path again. | |
# If it was an uploaded file, its stream might have been consumed or pointer moved. | |
# It's safer to re-open/re-access for different libraries if they don't handle streams well. | |
# However, pdfplumber and pdf2image should handle file paths/objects correctly. | |
# If pdf_input_source is a file object, reset its read pointer if necessary. | |
if hasattr(pdf_input_source, 'seek') and not isinstance(pdf_input_source, str): | |
pdf_input_source.seek(0) | |
current_status = "Extracting images from PDF..." | |
logger.info(current_status) | |
extracted_images = extract_images_from_pdf(pdf_input_source) | |
if isinstance(extracted_images, str) and extracted_images.startswith("Error"): # Error string from extraction | |
current_status = f"Image extraction failed or partially failed. {extracted_images}" | |
logger.warning(current_status) # Warning, as text might still be useful | |
# We can proceed to format markdown with text and image error. | |
# Set images to empty list to avoid error in format_to_markdown | |
extracted_images = [] # Or pass the error string to be included by format_to_markdown | |
# Let format_to_markdown handle this, for now, we will pass the error string if it happened | |
# No, format_to_markdown expects a list of images or an error string from check_poppler | |
# if isinstance(extracted_images, str) -> it's an error string, that is fine. | |
current_status = "Formatting content to Markdown..." | |
logger.info(current_status) | |
# Pass the original extracted_images (which could be an error string or list of PIL images) | |
markdown_result = format_to_markdown(extracted_text, extracted_images) | |
current_status = "PDF processing complete." | |
logger.info(current_status) | |
return markdown_result, current_status | |
# Gradio Interface | |
iface = gr.Interface( | |
fn=process_pdf, | |
inputs=[ | |
gr.File(label="Upload PDF File", file_types=[".pdf"]), | |
gr.Textbox(label="Or Enter PDF URL", placeholder="e.g., https://example.com/file.pdf"), | |
], | |
outputs=[ | |
gr.Markdown(label="Markdown Output"), | |
gr.Textbox(label="Processing Status", interactive=False), | |
], | |
title="PDF to Markdown Converter", | |
description="Convert a PDF (uploaded file or URL) to Markdown. Extracts text, tables, and images. Images are uploaded to a Hugging Face dataset. Requires HF_TOKEN in Spaces Secrets for image functionality.", | |
allow_flagging="never", | |
examples=[ | |
[None, "https.arxiv.org/pdf/1706.03762.pdf"], # Attention is All You Need | |
[None, "https://www.w3.org/WAI/ER/tests/xhtml/testfiles/resources/pdf/dummy.pdf"] # A simple dummy PDF | |
] | |
) | |
if __name__ == "__main__": | |
logger.info("Starting Gradio app...") | |
try: | |
# When running in Hugging Face Spaces, share=False is recommended. | |
# The Space itself provides the public URL. | |
iface.launch(server_name="0.0.0.0", server_port=7860, share=False) | |
logger.info("Gradio app started successfully.") | |
except Exception as e: | |
logger.error(f"Failed to start Gradio app: {str(e)}", exc_info=True) | |
# Re-raise the exception to ensure the script exits if Gradio fails to launch | |
raise |