Spaces:
Sleeping
Sleeping
import json | |
import gradio as gr | |
from pdfminer.high_level import extract_pages | |
from pdfminer.layout import LTTextBoxHorizontal, LTFigure, LTImage | |
import os | |
import io | |
from PIL import Image | |
import pandas as pd | |
import pdfplumber | |
import tempfile | |
import traceback | |
import re | |
def save_image(element, images): | |
try: | |
if hasattr(element, 'stream') and element.stream: | |
image_data = element.stream.get_rawdata() | |
image = Image.open(io.BytesIO(image_data)) | |
image_filename = f"extracted_image_{len(images)}.png" | |
image.save(image_filename) | |
images.append({"filename": image_filename}) | |
else: | |
print("No stream data for image element") | |
except Exception as e: | |
print(f"Error extracting image: {e}") | |
def detect_headers(text): | |
"""Detect headers in the text and format them.""" | |
lines = text.split('\n') | |
formatted_text = "" | |
header_patterns = [r"^\d+\.\s", r"^[A-Z\s]+$", r"^[A-Z][a-z]+\s\d"] | |
for line in lines: | |
if any(re.match(pattern, line.strip()) for pattern in header_patterns): | |
formatted_text += f"# {line.strip()}\n" | |
else: | |
formatted_text += f"{line.strip()}\n" | |
return formatted_text | |
def parse_pdf(pdf_file, output_format, progress=gr.Progress()): | |
""" | |
Parses a PDF file, extracts text, tables, and images, and formats the output. | |
Args: | |
pdf_file: Path to the uploaded PDF file. | |
output_format: Desired output format ("JSON", "Markdown", or "HTML"). | |
progress: Gradio Progress object for displaying progress. | |
Returns: | |
tuple: Extracted text and download data in the specified format. | |
Returns an empty string and None if there is an error. | |
""" | |
try: | |
with open(pdf_file, 'rb') as file: | |
text = "" | |
tables = [] | |
images = [] | |
for page in extract_pages(file): | |
for element in page: | |
if isinstance(element, LTTextBoxHorizontal): | |
text += element.get_text() | |
elif isinstance(element, (LTFigure, LTImage)): | |
print(f"Processing element: {type(element)}") | |
save_image(element, images) | |
formatted_text = detect_headers(text) | |
with pdfplumber.open(pdf_file) as pdf: | |
for page_num, page in enumerate(pdf.pages): | |
for table in page.extract_tables(): | |
try: | |
if len(table) > 0 and len(set(table[0])) != len(table[0]): | |
unique_columns = [] | |
for col in table[0]: | |
if col in unique_columns: | |
col = f"{col}_{unique_columns.count(col)}" | |
unique_columns.append(col) | |
df = pd.DataFrame(table[1:], columns=unique_columns) | |
else: | |
df = pd.DataFrame(table[1:], columns=table[0] if table[0] else None) | |
tables.append(df) | |
except Exception as e: | |
print(f"Error processing table: {e}") | |
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix="." + output_format.lower()) as tmp: | |
if output_format == "JSON": | |
json_data = { | |
"text": formatted_text, | |
"tables": [table.to_dict(orient='records') for table in tables if not table.columns.duplicated().any()], | |
"images": images | |
} | |
json.dump(json_data, tmp, ensure_ascii=False, indent=4) | |
elif output_format == "Markdown": | |
markdown_text = f"# Extracted Text\n\n{formatted_text}\n\n# Tables\n" | |
for i, table in enumerate(tables): | |
if not table.columns.duplicated().any(): | |
markdown_text += f"## Table {i+1}\n" | |
markdown_text += table.to_markdown(index=False) + "\n\n" | |
markdown_text += "\n\n# Images\n\n" | |
for image in images: | |
image_path = os.path.join(os.getcwd(), image["filename"]) | |
markdown_text += f'\n' | |
tmp.write(markdown_text) | |
elif output_format == "HTML": | |
html_text = f"<p>{formatted_text}</p>\n\n<h2>Tables</h2>\n" | |
for i, table in enumerate(tables): | |
if not table.columns.duplicated().any(): | |
html_text += f"<h2>Table {i+1}</h2>\n" | |
html_text += table.to_html() + "<br>" | |
html_text += "\n\n<h2>Images</h2>\n\n" | |
for image in images: | |
image_path = os.path.join(os.getcwd(), image["filename"]) | |
html_text += f'<img src="{image_path}" alt="Image"><br>\n' | |
tmp.write(html_text) | |
download_path = tmp.name | |
return formatted_text, download_path | |
except Exception as main_e: | |
traceback.print_exc() # Print full traceback to console | |
print(f"A main error occurred: {main_e}") | |
return "", None | |
iface = gr.Interface( | |
fn=parse_pdf, | |
inputs=["file", gr.Dropdown(["JSON", "Markdown", "HTML"])], | |
outputs=[ | |
gr.Text(label="Output Text"), | |
gr.File(label="Download Output") | |
], | |
title="PDF Parser", | |
description="Parse a PDF and choose the output format." | |
) | |
if __name__ == "__main__": | |
iface.launch() # Temporarily disable sharing for debugging | |