Spaces:
Sleeping
Sleeping
import gradio as gr | |
import requests | |
import pdfplumber | |
from pdf2image import convert_from_path, convert_from_bytes | |
import pytesseract | |
from PIL import Image | |
import io | |
import os | |
from huggingface_hub import HfApi, create_repo | |
import re | |
from datetime import datetime | |
import urllib.parse | |
import logging | |
import subprocess | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Initialize Hugging Face API | |
HF_TOKEN = os.getenv("HF_TOKEN") # Set in Hugging Face Spaces Secrets | |
REPO_NAME = "pdf-images-extracted" # Hugging Face dataset repo | |
hf_api = HfApi() | |
def check_poppler(): | |
"""Check if poppler-utils is installed.""" | |
try: | |
result = subprocess.run(["pdftoppm", "-v"], capture_output=True, text=True) | |
logger.info(f"Poppler version: {result.stdout}") | |
return True | |
except FileNotFoundError: | |
logger.error("Poppler not found in PATH.") | |
return False | |
def ensure_hf_dataset(): | |
"""Create or get Hugging Face dataset repository.""" | |
try: | |
if not HF_TOKEN: | |
raise ValueError("HF_TOKEN is not set") | |
repo_id = create_repo(repo_id=REPO_NAME, token=HF_TOKEN, repo_type="dataset", exist_ok=True) | |
logger.info(f"Successfully accessed/created dataset repo: {repo_id}") | |
return repo_id | |
except Exception as e: | |
logger.error(f"Failed to create/access dataset repo: {str(e)}") | |
return f"Error: Failed to create/access dataset repo: {str(e)}" | |
def upload_image_to_hf(image, filename): | |
"""Upload an image to Hugging Face dataset and return its URL.""" | |
repo_id = ensure_hf_dataset() | |
if isinstance(repo_id, str) and repo_id.startswith("Error"): | |
return repo_id | |
try: | |
# Save image temporarily | |
temp_path = f"/tmp/temp_{filename}.png" | |
image.save(temp_path, format="PNG") | |
# Upload to Hugging Face dataset | |
file_url = hf_api.upload_file( | |
path_or_fileobj=temp_path, | |
path_in_repo=f"images/{filename}.png", | |
repo_id=repo_id, | |
repo_type="dataset", | |
token=HF_TOKEN | |
) | |
os.remove(temp_path) | |
logger.info(f"Uploaded image to: {file_url}") | |
return file_url | |
except Exception as e: | |
logger.error(f"Error uploading image: {str(e)}") | |
return f"Error uploading image: {str(e)}" | |
def extract_text_from_pdf(pdf_input): | |
"""Extract text from PDF using pdfplumber.""" | |
try: | |
if isinstance(pdf_input, str): # URL case | |
response = requests.get(pdf_input, stream=True) | |
response.raise_for_status() | |
pdf_file = io.BytesIO(response.content) | |
else: # File upload case | |
pdf_file = pdf_input | |
with pdfplumber.open(pdf_file) as pdf: | |
text = "" | |
for page in pdf.pages: | |
page_text = page.extract_text(layout=True) or "" | |
text += page_text + "\n\n" | |
tables = page.extract_tables() | |
for table in tables: | |
text += "**Table:**\n" + "\n".join([" | ".join(str(cell) for cell in row) for row in table]) + "\n\n" | |
return text | |
except Exception as e: | |
logger.error(f"Error extracting text: {str(e)}") | |
return f"Error extracting text: {str(e)}" | |
def extract_images_from_pdf(pdf_input): | |
"""Extract images from PDF and convert to PIL images.""" | |
if not check_poppler(): | |
return "Error: poppler-utils not found. Ensure it is installed via Dockerfile." | |
try: | |
if isinstance(pdf_input, str): # URL case | |
logger.info(f"Downloading PDF from URL: {pdf_input}") | |
response = requests.get(pdf_input, stream=True) | |
response.raise_for_status() | |
images = convert_from_bytes(response.content) | |
else: # File upload case | |
logger.info(f"Processing uploaded PDF: {pdf_input.name}") | |
images = convert_from_path(pdf_input.name) | |
return images | |
except Exception as e: | |
logger.error(f"Error extracting images: {str(e)}") | |
return f"Error extracting images: {str(e)}" | |
def format_to_markdown(text, images): | |
"""Convert extracted text and images to Markdown format.""" | |
markdown_output = "# Extracted PDF Content\n\n" | |
# Clean and format text | |
text = re.sub(r'\n\s*\n+', '\n\n', text.strip()) # Normalize newlines | |
lines = text.split("\n") | |
for line in lines: | |
# Detect headings (heuristic: all caps or specific keywords) | |
if line.isupper() and len(line) > 5: | |
markdown_output += f"## {line}\n\n" | |
# Detect lists (lines starting with numbers or bullets) | |
elif re.match(r'^\s*[\d\-*+]\.\s+', line): | |
markdown_output += f"- {line.strip()[2:]}\n" | |
else: | |
markdown_output += f"{line}\n\n" | |
# Add images with Hugging Face dataset URLs | |
if isinstance(images, list) and images: | |
markdown_output += "## Extracted Images\n\n" | |
for i, image in enumerate(images): | |
ocr_text = pytesseract.image_to_string(image).strip() | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
filename = f"image_{i}_{timestamp}" | |
image_url = upload_image_to_hf(image, filename) | |
if not image_url.startswith("Error"): | |
markdown_output += f"\n" | |
if ocr_text: | |
markdown_output += f"**Image {i+1} OCR Text:**\n```\n{ocr_text}\n```\n\n" | |
else: | |
markdown_output += f"**Image {i+1} Error:** {image_url}\n\n" | |
return markdown_output | |
def process_pdf(pdf_input, pdf_url): | |
"""Main function to process PDF input (file or URL) and generate Markdown.""" | |
status = ["Starting PDF processing..."] | |
logger.info("Starting PDF processing at %s", datetime.now().strftime("%Y-%m-%d %H:%M:%S PDT")) | |
def update_status(message): | |
status[0] = message | |
return status[0] | |
if not HF_TOKEN: | |
update_status("Error: HF_TOKEN not set.") | |
return "Error: HF_TOKEN not set in Spaces Secrets.", status[0] | |
# Log poppler status | |
logger.info(f"Poppler check: {'Found' if check_poppler() else 'Not found'}") | |
update_status("Checking poppler-utils...") | |
# Decode URL-encoded string if provided | |
if pdf_url and pdf_url.strip(): | |
pdf_url = urllib.parse.unquote(pdf_url) | |
logger.info(f"Decoded URL: {pdf_url}") | |
update_status(f"Downloading PDF from URL: {pdf_url}") | |
try: | |
response = requests.head(pdf_url, allow_redirects=True) | |
response.raise_for_status() | |
pdf_input = pdf_url | |
except requests.RequestException as e: | |
logger.error(f"Error accessing URL: {str(e)}") | |
update_status(f"Error accessing URL: {str(e)}") | |
return f"Error accessing URL: {str(e)}", status[0] | |
elif not pdf_input: | |
update_status("Error: No PDF provided.") | |
return "Error: Please provide a PDF file or URL.", status[0] | |
update_status("Extracting text from PDF...") | |
text = extract_text_from_pdf(pdf_input) | |
update_status("Extracting images from PDF...") | |
images = extract_images_from_pdf(pdf_input) | |
if isinstance(text, str) and text.startswith("Error"): | |
update_status("Text extraction failed.") | |
return text, status[0] | |
if isinstance(images, str) and images.startswith("Error"): | |
update_status("Image extraction failed.") | |
return images, status[0] | |
update_status("Formatting output as Markdown...") | |
markdown_output = format_to_markdown(text, images) | |
update_status("Processing complete.") | |
return markdown_output, status[0] | |
# Gradio Interface | |
iface = gr.Interface( | |
fn=process_pdf, | |
inputs=[ | |
gr.File(label="Upload PDF File", type="filepath"), | |
gr.Textbox(label="PDF URL", placeholder="Enter the URL of the PDF"), | |
], | |
outputs=[ | |
gr.Markdown(label="Markdown Output"), | |
gr.Textbox(label="Processing Status", interactive=False), | |
], | |
title="PDF to Markdown Converter", | |
description="Upload a PDF file or provide a PDF URL (including URL-encoded strings with spaces) to convert it into a Markdown document. Images and charts are extracted, uploaded to a Hugging Face dataset, and linked in the Markdown. Formatting (e.g., headings, lists) is preserved. Requires HF_TOKEN in Spaces Secrets.", | |
allow_flagging="never" | |
) | |
if __name__ == "__main__": | |
iface.launch(server_name="0.0.0.0", server_port=7860) |