Spaces:
Sleeping
Sleeping
import json | |
import gradio as gr | |
from pdfminer.high_level import extract_pages | |
from pdfminer.layout import LTTextBoxHorizontal, LTFigure, LTImage | |
import os | |
import io | |
from PIL import Image | |
import pandas as pd | |
import pdfplumber | |
import tempfile | |
import traceback | |
def parse_pdf(pdf_file, output_format, progress=gr.Progress()): | |
""" | |
Parses a PDF file, extracts text, tables, and images, and formats the output. | |
Args: | |
pdf_file: Path to the uploaded PDF file. | |
output_format: Desired output format ("JSON", "Markdown", or "HTML"). | |
progress: Gradio Progress object for displaying progress. | |
Returns: | |
tuple: Extracted text and download data in the specified format. | |
Returns an empty string and None if there is an error. | |
""" | |
try: | |
with open(pdf_file, 'rb') as file: | |
text = "" | |
tables = [] | |
images = [] | |
for page in extract_pages(file): | |
for element in page: | |
if isinstance(element, LTTextBoxHorizontal): | |
text += element.get_text() | |
elif isinstance(element, (LTFigure, LTImage)): | |
try: | |
if hasattr(element, 'stream'): | |
image_data = element.stream.read() | |
image = Image.open(io.BytesIO(image_data)) | |
image_filename = f"extracted_image_{len(images)}.png" | |
image.save(image_filename) | |
images.append({"filename": image_filename}) | |
else: | |
for child in element: | |
if isinstance(child, LTImage): | |
image_data = child.stream.read() | |
image = Image.open(io.BytesIO(image_data)) | |
image_filename = f"extracted_image_{len(images)}.png" | |
image.save(image_filename) | |
images.append({"filename": image_filename}) | |
except Exception as e: | |
print(f"Error extracting image: {e}") | |
with pdfplumber.open(pdf_file) as pdf: | |
for page_num, page in enumerate(pdf.pages): | |
for table in page.extract_tables(): | |
if len(table) > 0 and len(set(table[0])) != len(table[0]): | |
unique_columns = [] | |
for col in table[0]: | |
if col in unique_columns: | |
col = f"{col}_{unique_columns.count(col)}" | |
unique_columns.append(col) | |
df = pd.DataFrame(table[1:], columns=unique_columns) | |
else: | |
df = pd.DataFrame(table[1:], columns=table[0] if table[0] else None) | |
tables.append(df) | |
with tempfile.NamedTemporaryFile(mode="wb", delete=False, suffix="." + output_format.lower()) as tmp: | |
if output_format == "JSON": | |
json_data = { | |
"text": text, | |
"tables": [table.to_dict(orient='records') for table in tables if not table.columns.duplicated().any()], | |
"images": images | |
} | |
json.dump(json_data, tmp, ensure_ascii=False, indent=4) # Ensure ASCII compatibility | |
elif output_format == "Markdown": | |
markdown_text = f"# Extracted Text\n\n{text}\n\n# Tables\n" | |
for i, table in enumerate(tables): | |
if not table.columns.duplicated().any(): | |
markdown_text += f"## Table {i+1}\n" | |
markdown_text += table.to_markdown(index=False) + "\n\n" | |
markdown_text += "\n\n# Images\n\n" | |
for image in images: | |
image_path = os.path.join(os.getcwd(), image["filename"]) | |
markdown_text += f'\n' | |
tmp.write(markdown_text.encode('utf-8')) | |
elif output_format == "HTML": | |
html_text = f"<p>{text}</p>\n\n<h2>Tables</h2>\n" | |
for i, table in enumerate(tables): | |
if not table.columns.duplicated().any(): | |
html_text += f"<h2>Table {i+1}</h2>\n" | |
html_text += table.to_html() + "<br>" | |
html_text += "\n\n<h2>Images</h2>\n\n" | |
for image in images: | |
image_path = os.path.join(os.getcwd(), image["filename"]) | |
html_text += f'<img src="{image_path}" alt="Image"><br>\n' | |
tmp.write(html_text.encode('utf-8')) | |
download_path = tmp.name | |
return text, download_path | |
except Exception as main_e: | |
traceback.print_exc() # Print full traceback to console | |
print(f"A main error occurred: {main_e}") | |
return "", None | |
iface = gr.Interface( | |
fn=parse_pdf, | |
inputs=["file", gr.Dropdown(["JSON", "Markdown", "HTML"])], | |
outputs=[ | |
gr.Text(label="Output Text"), | |
gr.File(label="Download Output") | |
], | |
title="PDF Parser", | |
description="Parse a PDF and choose the output format." | |
) | |
if __name__ == "__main__": | |
iface.launch() # Temporarily disable sharing for debugging |