import gradio as gr import requests import pdfplumber from pdf2image import convert_from_path, convert_from_bytes import pytesseract from PIL import Image import io import os from huggingface_hub import HfApi, create_repo import re from datetime import datetime import urllib.parse import logging import subprocess # Set up logging logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) # Initialize Hugging Face API HF_TOKEN = os.getenv("HF_TOKEN") REPO_NAME = "pdf-images-extracted" hf_api = HfApi() def check_poppler(): try: result = subprocess.run(["pdftoppm", "-v"], capture_output=True, text=True) logger.info(f"Poppler version: {result.stdout}") return True except FileNotFoundError: logger.error("Poppler not found in PATH.") return False def ensure_hf_dataset(): try: if not HF_TOKEN: raise ValueError("HF_TOKEN is not set") repo_id = create_repo(repo_id=REPO_NAME, token=HF_TOKEN, repo_type="dataset", exist_ok=True) logger.info(f"Dataset repo: {repo_id}") return repo_id except Exception as e: logger.error(f"Dataset error: {str(e)}") return f"Error: Failed to access dataset: {str(e)}" def upload_image_to_hf(image, filename): repo_id = ensure_hf_dataset() if isinstance(repo_id, str) and repo_id.startswith("Error"): return repo_id try: temp_path = f"/tmp/temp_{filename}.png" image.save(temp_path, format="PNG") file_url = hf_api.upload_file( path_or_fileobj=temp_path, path_in_repo=f"images/{filename}.png", repo_id=repo_id, repo_type="dataset", token=HF_TOKEN ) os.remove(temp_path) logger.info(f"Uploaded image: {file_url}") return file_url except Exception as e: logger.error(f"Image upload error: {str(e)}") return f"Error uploading image: {str(e)}" def extract_text_from_pdf(pdf_input): try: if isinstance(pdf_input, str): response = requests.get(pdf_input, stream=True, timeout=10) response.raise_for_status() pdf_file = io.BytesIO(response.content) else: pdf_file = pdf_input with pdfplumber.open(pdf_file) as pdf: text = "" for page in pdf.pages: page_text = page.extract_text(layout=True) or "" text += page_text + "\n\n" tables = page.extract_tables() for table in tables: text += "**Table:**\n" + "\n".join([" | ".join(str(cell) for cell in row) for row in table]) + "\n\n" return text except Exception as e: logger.error(f"Text extraction error: {str(e)}") return f"Error extracting text: {str(e)}" def extract_images_from_pdf(pdf_input): if not check_poppler(): return "Error: poppler-utils not found." try: if isinstance(pdf_input, str): response = requests.get(pdf_input, stream=True, timeout=10) response.raise_for_status() images = convert_from_bytes(response.content) else: images = convert_from_path(pdf_input.name) return images except Exception as e: logger.error(f"Image extraction error: {str(e)}") return f"Error extracting images: {str(e)}" def format_to_markdown(text, images): markdown_output = "# Extracted PDF Content\n\n" text = re.sub(r'\n\s*\n+', '\n\n', text.strip()) lines = text.split("\n") for line in lines: if line.isupper() and len(line) > 5: markdown_output += f"## {line}\n\n" elif re.match(r'^\s*[\d\-*+]\.\s+', line): markdown_output += f"- {line.strip()[2:]}\n" else: markdown_output += f"{line}\n\n" if isinstance(images, list) and images: markdown_output += "## Extracted Images\n\n" for i, image in enumerate(images): ocr_text = pytesseract.image_to_string(image).strip() timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"image_{i}_{timestamp}" image_url = upload_image_to_hf(image, filename) if not image_url.startswith("Error"): markdown_output += f"![Image {i+1}]({image_url})\n" if ocr_text: markdown_output += f"**Image {i+1} OCR Text:**\n```\n{ocr_text}\n```\n\n" else: markdown_output += f"**Image {i+1} Error:** {image_url}\n\n" return markdown_output def process_pdf(pdf_input, pdf_url): status = "Starting PDF processing..." logger.info(status) if not HF_TOKEN: status = "Error: HF_TOKEN not set." logger.error(status) return status, status if pdf_url and pdf_url.strip(): pdf_url = urllib.parse.unquote(pdf_url) status = f"Downloading PDF from URL: {pdf_url}" logger.info(status) try: response = requests.head(pdf_url, allow_redirects=True, timeout=5) response.raise_for_status() pdf_input = pdf_url except requests.RequestException as e: status = f"Error accessing URL: {str(e)}" logger.error(status) return status, status elif not pdf_input: status = "Error: No PDF provided." logger.error(status) return status, status status = "Extracting text..." logger.info(status) text = extract_text_from_pdf(pdf_input) if isinstance(text, str) and text.startswith("Error"): status = "Text extraction failed." logger.error(status) return text, status status = "Extracting images..." logger.info(status) images = extract_images_from_pdf(pdf_input) if isinstance(images, str) and images.startswith("Error"): status = "Image extraction failed." logger.error(status) return images, status status = "Formatting output..." logger.info(status) markdown_output = format_to_markdown(text, images) status = "Processing complete." logger.info(status) return markdown_output, status # Gradio Interface iface = gr.Interface( fn=process_pdf, inputs=[ gr.File(label="Upload PDF File", file_types=[".pdf"]), gr.Textbox(label="PDF URL", placeholder="Enter PDF URL (e.g., https://example.com/file.pdf)"), ], outputs=[ gr.Markdown(label="Markdown Output"), gr.Textbox(label="Processing Status", interactive=False), ], title="PDF to Markdown Converter", description="Convert a PDF file or URL to Markdown. Extracts text, images, and tables, with images uploaded to a Hugging Face dataset. Supports URL-encoded strings. Requires HF_TOKEN in Spaces Secrets.", allow_flagging="never" ) if __name__ == "__main__": logger.info("Starting Gradio app...") try: iface.launch(server_name="0.0.0.0", server_port=7860, share=True) logger.info("Gradio app started successfully.") except Exception as e: logger.error(f"Failed to start Gradio app: {str(e)}") raise