Spaces:
Running
Running
import gradio as gr | |
import requests | |
import pdfplumber | |
from pdf2image import convert_from_path, convert_from_bytes | |
import pytesseract | |
from PIL import Image | |
import io | |
import os | |
from huggingface_hub import HfApi, create_repo | |
import re | |
from datetime import datetime | |
import urllib.parse | |
import logging | |
import subprocess | |
# Set up logging | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
# Initialize Hugging Face API | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
REPO_NAME = "pdf-images-extracted" | |
hf_api = HfApi() | |
def check_poppler(): | |
try: | |
result = subprocess.run(["pdftoppm", "-v"], capture_output=True, text=True) | |
logger.info(f"Poppler version: {result.stdout}") | |
return True | |
except FileNotFoundError: | |
logger.error("Poppler not found in PATH.") | |
return False | |
def ensure_hf_dataset(): | |
try: | |
if not HF_TOKEN: | |
raise ValueError("HF_TOKEN is not set") | |
repo_id = create_repo(repo_id=REPO_NAME, token=HF_TOKEN, repo_type="dataset", exist_ok=True) | |
logger.info(f"Dataset repo: {repo_id}") | |
return repo_id | |
except Exception as e: | |
logger.error(f"Dataset error: {str(e)}") | |
return f"Error: Failed to access dataset: {str(e)}" | |
def upload_image_to_hf(image, filename): | |
repo_id = ensure_hf_dataset() | |
if isinstance(repo_id, str) and repo_id.startswith("Error"): | |
return repo_id | |
try: | |
temp_path = f"/tmp/temp_{filename}.png" | |
image.save(temp_path, format="PNG") | |
file_url = hf_api.upload_file( | |
path_or_fileobj=temp_path, | |
path_in_repo=f"images/{filename}.png", | |
repo_id=repo_id, | |
repo_type="dataset", | |
token=HF_TOKEN | |
) | |
os.remove(temp_path) | |
logger.info(f"Uploaded image: {file_url}") | |
return file_url | |
except Exception as e: | |
logger.error(f"Image upload error: {str(e)}") | |
return f"Error uploading image: {str(e)}" | |
def extract_text_from_pdf(pdf_input): | |
try: | |
if isinstance(pdf_input, str): | |
response = requests.get(pdf_input, stream=True, timeout=10) | |
response.raise_for_status() | |
pdf_file = io.BytesIO(response.content) | |
else: | |
pdf_file = pdf_input | |
with pdfplumber.open(pdf_file) as pdf: | |
text = "" | |
for page in pdf.pages: | |
page_text = page.extract_text(layout=True) or "" | |
text += page_text + "\n\n" | |
tables = page.extract_tables() | |
for table in tables: | |
text += "**Table:**\n" + "\n".join([" | ".join(str(cell) for cell in row) for row in table]) + "\n\n" | |
return text | |
except Exception as e: | |
logger.error(f"Text extraction error: {str(e)}") | |
return f"Error extracting text: {str(e)}" | |
def extract_images_from_pdf(pdf_input): | |
if not check_poppler(): | |
return "Error: poppler-utils not found." | |
try: | |
if isinstance(pdf_input, str): | |
response = requests.get(pdf_input, stream=True, timeout=10) | |
response.raise_for_status() | |
images = convert_from_bytes(response.content) | |
else: | |
images = convert_from_path(pdf_input.name) | |
return images | |
except Exception as e: | |
logger.error(f"Image extraction error: {str(e)}") | |
return f"Error extracting images: {str(e)}" | |
def format_to_markdown(text, images): | |
markdown_output = "# Extracted PDF Content\n\n" | |
text = re.sub(r'\n\s*\n+', '\n\n', text.strip()) | |
lines = text.split("\n") | |
for line in lines: | |
if line.isupper() and len(line) > 5: | |
markdown_output += f"## {line}\n\n" | |
elif re.match(r'^\s*[\d\-*+]\.\s+', line): | |
markdown_output += f"- {line.strip()[2:]}\n" | |
else: | |
markdown_output += f"{line}\n\n" | |
if isinstance(images, list) and images: | |
markdown_output += "## Extracted Images\n\n" | |
for i, image in enumerate(images): | |
ocr_text = pytesseract.image_to_string(image).strip() | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
filename = f"image_{i}_{timestamp}" | |
image_url = upload_image_to_hf(image, filename) | |
if not image_url.startswith("Error"): | |
markdown_output += f"\n" | |
if ocr_text: | |
markdown_output += f"**Image {i+1} OCR Text:**\n```\n{ocr_text}\n```\n\n" | |
else: | |
markdown_output += f"**Image {i+1} Error:** {image_url}\n\n" | |
return markdown_output | |
def process_pdf(pdf_input, pdf_url): | |
status = "Starting PDF processing..." | |
logger.info(status) | |
if not HF_TOKEN: | |
status = "Error: HF_TOKEN not set." | |
logger.error(status) | |
return status, status | |
if pdf_url and pdf_url.strip(): | |
pdf_url = urllib.parse.unquote(pdf_url) | |
status = f"Downloading PDF from URL: {pdf_url}" | |
logger.info(status) | |
try: | |
response = requests.head(pdf_url, allow_redirects=True, timeout=5) | |
response.raise_for_status() | |
pdf_input = pdf_url | |
except requests.RequestException as e: | |
status = f"Error accessing URL: {str(e)}" | |
logger.error(status) | |
return status, status | |
elif not pdf_input: | |
status = "Error: No PDF provided." | |
logger.error(status) | |
return status, status | |
status = "Extracting text..." | |
logger.info(status) | |
text = extract_text_from_pdf(pdf_input) | |
if isinstance(text, str) and text.startswith("Error"): | |
status = "Text extraction failed." | |
logger.error(status) | |
return text, status | |
status = "Extracting images..." | |
logger.info(status) | |
images = extract_images_from_pdf(pdf_input) | |
if isinstance(images, str) and images.startswith("Error"): | |
status = "Image extraction failed." | |
logger.error(status) | |
return images, status | |
status = "Formatting output..." | |
logger.info(status) | |
markdown_output = format_to_markdown(text, images) | |
status = "Processing complete." | |
logger.info(status) | |
return markdown_output, status | |
# Gradio Interface | |
iface = gr.Interface( | |
fn=process_pdf, | |
inputs=[ | |
gr.File(label="Upload PDF File", file_types=[".pdf"]), | |
gr.Textbox(label="PDF URL", placeholder="Enter PDF URL (e.g., https://example.com/file.pdf)"), | |
], | |
outputs=[ | |
gr.Markdown(label="Markdown Output"), | |
gr.Textbox(label="Processing Status", interactive=False), | |
], | |
title="PDF to Markdown Converter", | |
description="Convert a PDF file or URL to Markdown. Extracts text, images, and tables, with images uploaded to a Hugging Face dataset. Supports URL-encoded strings. Requires HF_TOKEN in Spaces Secrets.", | |
allow_flagging="never" | |
) | |
if __name__ == "__main__": | |
logger.info("Starting Gradio app...") | |
try: | |
iface.launch(server_name="0.0.0.0", server_port=7860, prevent_thread_lock=True) | |
logger.info("Gradio app started successfully.") | |
except Exception as e: | |
logger.error(f"Failed to start Gradio app: {str(e)}") | |
raise |