|
import os |
|
import time |
|
import zipfile |
|
from collections import defaultdict |
|
from pathlib import Path |
|
|
|
import gradio as gr |
|
import pymupdf4llm |
|
from gradio_pdf import PDF |
|
|
|
from backends import ( |
|
SUPPORTED_METHODS, |
|
SUPPORTED_METHODS_METADATA, |
|
convert_gmft, |
|
convert_pypdf, |
|
convert_smoldocling, |
|
convert_unstructured, |
|
) |
|
from backends.settings import ENABLE_DEBUG_MODE |
|
from utils import remove_images_from_markdown, trim_pages |
|
|
|
TRIMMED_PDF_PATH = Path("/tmp/trimmed_input") |
|
TRIMMED_PDF_PATH.mkdir(exist_ok=True) |
|
DO_WARMUP = os.getenv("DO_WARMUP", "True").lower() == "true" |
|
MAX_SELECTED_METHODS = int(os.getenv("MAX_SELECTED_METHODS", "6")) |
|
MAX_PAGES = int(os.getenv("MAX_PAGES", "2")) |
|
|
|
|
|
|
|
import os |
|
|
|
os.system("apt-get update") |
|
os.system("apt-get install --no-install-recommends -y poppler-utils tesseract-ocr") |
|
|
|
|
|
def convert_document(path, method, start_page=0, enabled=True): |
|
if enabled: |
|
print("Processing file", path, "with method", method) |
|
else: |
|
return "", "", "", [] |
|
|
|
|
|
start = time.time() |
|
|
|
path = trim_pages( |
|
path, |
|
output_path=TRIMMED_PDF_PATH, |
|
start_page=start_page, |
|
trim_pages=MAX_PAGES, |
|
) |
|
file_name = Path(path).stem |
|
debug_image_paths = [] |
|
text = "unknown method" |
|
|
|
if method == "SmolDocling": |
|
text, debug_image_paths = convert_smoldocling(path, file_name) |
|
elif method == "PyMuPDF": |
|
text = pymupdf4llm.to_markdown( |
|
path, |
|
embed_images=True, |
|
) |
|
elif method == "Unstructured": |
|
text, debug_image_paths = convert_unstructured(path, file_name) |
|
elif method == "GMFT (table-only)": |
|
text, debug_image_paths = convert_gmft(path, file_name) |
|
elif method == "PyPDF": |
|
text, debug_image_paths = convert_pypdf(path, file_name) |
|
else: |
|
raise ValueError(f"Unsupported method: {method}") |
|
|
|
duration = time.time() - start |
|
duration_message = f"Conversion with {method} took *{duration:.2f} seconds*" |
|
print(duration_message) |
|
return ( |
|
duration_message, |
|
text, |
|
remove_images_from_markdown(text), |
|
debug_image_paths, |
|
) |
|
|
|
|
|
def to_zip_file(file_path, methods, *output_components): |
|
markdown_text_dict = dict() |
|
debug_images_dict = defaultdict(list) |
|
for idx, method_name in enumerate(SUPPORTED_METHODS): |
|
if method_name not in methods: |
|
continue |
|
|
|
markdown_text = output_components[idx * 4 + 2] |
|
debug_images = output_components[idx * 4 + 3] |
|
|
|
markdown_text_dict[method_name] = markdown_text |
|
debug_images_dict[method_name] = debug_images |
|
|
|
|
|
temp_dir = Path(file_path).parent |
|
zip_file_path = temp_dir / "output.zip" |
|
|
|
markdown_path = temp_dir / f"{method_name}.md" |
|
with open(markdown_path, "w") as f: |
|
f.write(markdown_text) |
|
|
|
|
|
with zipfile.ZipFile(zip_file_path, "w", zipfile.ZIP_DEFLATED) as zipf: |
|
for method_name, markdown_text in markdown_text_dict.items(): |
|
debug_image_paths = debug_images_dict[method_name] |
|
|
|
|
|
zipf.write( |
|
markdown_path, |
|
f"{method_name}/{method_name}.md", |
|
) |
|
if debug_image_paths: |
|
for idx, (debug_image_path, _) in enumerate(debug_image_paths): |
|
debug_image_name = Path(debug_image_path).name |
|
zipf.write( |
|
debug_image_path, |
|
f"{method_name}/{debug_image_name}", |
|
) |
|
|
|
return gr.update( |
|
value=str(zip_file_path), |
|
visible=True, |
|
) |
|
|
|
|
|
def show_tabs(selected_methods): |
|
visible_tabs = [] |
|
for method in SUPPORTED_METHODS: |
|
visible_tabs.append(gr.update(visible=method in selected_methods)) |
|
|
|
return visible_tabs |
|
|
|
|
|
latex_delimiters = [ |
|
{"left": "$$", "right": "$$", "display": True}, |
|
] |
|
|
|
|
|
start_startup = time.time() |
|
WARMUP_PDF_PATH = "examples/table.pdf" |
|
|
|
if DO_WARMUP: |
|
print("Warm-up sequence") |
|
for method in SUPPORTED_METHODS: |
|
for _ in range(1): |
|
convert_document(WARMUP_PDF_PATH, method) |
|
startup_duration = time.time() - start_startup |
|
print(f"Total start-up time: {startup_duration:.2f} seconds") |
|
|
|
with gr.Blocks( |
|
theme=gr.themes.Ocean(), |
|
) as demo: |
|
with open("header.html", "r") as file: |
|
header = file.read() |
|
gr.HTML(header) |
|
output_components = [] |
|
output_tabs = [] |
|
visualization_sub_tabs = [] |
|
|
|
with gr.Row(): |
|
with gr.Column(variant="panel", scale=5): |
|
input_file = gr.File( |
|
label="Upload PDF document", |
|
file_types=[ |
|
".pdf", |
|
], |
|
) |
|
with gr.Accordion("Examples:"): |
|
example_root = os.path.join(os.path.dirname(__file__), "examples") |
|
gr.Examples( |
|
examples=[ |
|
os.path.join(example_root, _) |
|
for _ in os.listdir(example_root) |
|
if _.endswith("pdf") |
|
], |
|
inputs=input_file, |
|
) |
|
progress_status = gr.Markdown("", show_label=False, container=False) |
|
output_file = gr.File( |
|
label="Download output", |
|
interactive=False, |
|
visible=False, |
|
) |
|
|
|
with gr.Column(variant="panel", scale=5): |
|
with gr.Row(): |
|
methods = gr.Dropdown( |
|
SUPPORTED_METHODS, |
|
label=("Conversion methods"), |
|
value=SUPPORTED_METHODS[:2], |
|
multiselect=True, |
|
) |
|
with gr.Row(): |
|
with gr.Accordion( |
|
"Advanced settings", |
|
open=False, |
|
): |
|
start_page = gr.Number( |
|
label=( |
|
f"Starting page (only max {MAX_PAGES} " |
|
"consecutive pages are processed)" |
|
), |
|
minimum=1, |
|
maximum=100, |
|
step=1, |
|
value=1, |
|
) |
|
visual_checkbox = gr.Checkbox( |
|
label="Enable debug visualization", |
|
visible=ENABLE_DEBUG_MODE, |
|
value=True, |
|
) |
|
with gr.Row(): |
|
convert_btn = gr.Button("Convert", variant="primary", scale=2) |
|
clear_btn = gr.ClearButton(value="Clear", scale=1) |
|
|
|
with gr.Row(): |
|
with gr.Column(variant="panel", scale=5): |
|
pdf_preview = PDF( |
|
label="PDF preview", |
|
interactive=False, |
|
visible=True, |
|
height=800, |
|
) |
|
with gr.Accordion("Notes", open=False): |
|
gr.Markdown( |
|
container=False, |
|
show_label=False, |
|
value=( |
|
"- Use the playground for non-sensitive data only.\n" |
|
"- Figure and formular extraction might not work properly with some methods (we are working on that!).\n" |
|
"- Some methods (e.g: Gemini) may use external API to process the document.\n" |
|
"- Some methods (e.g: Unstructured) may has longer processing time due to not utilizing GPU resource, be patient!\n" |
|
"- If you have suggestion for better default configuration of current methods, please let us know!\n" |
|
), |
|
) |
|
|
|
with gr.Column(variant="panel", scale=5): |
|
with gr.Tabs(): |
|
for method in SUPPORTED_METHODS: |
|
with gr.Tab(method, visible=False) as output_tab: |
|
with gr.Tabs(): |
|
with gr.Tab("Markdown render"): |
|
markdown_render = gr.Markdown( |
|
label="Markdown rendering", |
|
height=800, |
|
show_copy_button=True, |
|
latex_delimiters=latex_delimiters, |
|
) |
|
with gr.Tab("Markdown text"): |
|
markdown_text = gr.TextArea( |
|
lines=45, show_label=False, container=False |
|
) |
|
with gr.Tab( |
|
"Debug visualization", |
|
visible=ENABLE_DEBUG_MODE, |
|
) as visual_sub_tab: |
|
output_description = gr.Markdown( |
|
container=False, |
|
show_label=False, |
|
) |
|
debug_images = gr.Gallery( |
|
show_label=False, |
|
container=False, |
|
interactive=False, |
|
) |
|
with gr.Tab("About"): |
|
method_metadata = SUPPORTED_METHODS_METADATA[ |
|
method |
|
] |
|
method_name = method_metadata["name"] |
|
method_description = method_metadata[ |
|
"description" |
|
] |
|
method_url = method_metadata["url"] |
|
method_documentation = method_metadata[ |
|
"documentation" |
|
] |
|
gr.Markdown( |
|
value=( |
|
f"# {method_name}\n\n{method_description}\n\n" |
|
+ ( |
|
f"[[Github repo]]({method_url}) " |
|
if method_url |
|
else "" |
|
) |
|
+ f"[[Documentation]]({method_documentation})" |
|
), |
|
container=False, |
|
show_label=False, |
|
) |
|
|
|
output_components.extend( |
|
[ |
|
output_description, |
|
markdown_render, |
|
markdown_text, |
|
debug_images, |
|
] |
|
) |
|
output_tabs.append(output_tab) |
|
visualization_sub_tabs.append(visual_sub_tab) |
|
|
|
input_file.change(fn=lambda x: x, inputs=input_file, outputs=pdf_preview) |
|
|
|
def check_preconditions(input_file, selected_methods): |
|
if len(selected_methods) > MAX_SELECTED_METHODS: |
|
raise ValueError( |
|
"Please select up-to " f"{MAX_SELECTED_METHODS} methods only!" |
|
) |
|
if input_file is None: |
|
raise ValueError("Please upload a PDF file first!") |
|
|
|
click_event = convert_btn.click( |
|
fn=check_preconditions, |
|
inputs=[input_file, methods], |
|
).success( |
|
fn=show_tabs, |
|
inputs=[methods], |
|
outputs=output_tabs, |
|
) |
|
for idx, method in enumerate(SUPPORTED_METHODS): |
|
|
|
def progress_message(selected_methods, method=method): |
|
selected_methods_indices = [ |
|
idx |
|
for idx, current_method in enumerate(SUPPORTED_METHODS) |
|
if current_method in selected_methods |
|
] |
|
try: |
|
current_method_idx = selected_methods_indices.index( |
|
SUPPORTED_METHODS.index(method) |
|
) |
|
msg = ( |
|
f"Processing ({current_method_idx + 1} / " |
|
f"{len(selected_methods)}) **{method}**...\n\n" |
|
) |
|
except ValueError: |
|
msg = gr.update() |
|
|
|
return msg |
|
|
|
def process_method(input_file, start_page, selected_methods, method=method): |
|
return convert_document( |
|
input_file, |
|
method=method, |
|
start_page=start_page - 1, |
|
enabled=method in selected_methods, |
|
) |
|
|
|
click_event = click_event.then( |
|
fn=lambda methods, method=method: progress_message(methods, method), |
|
inputs=[methods], |
|
outputs=[progress_status], |
|
).then( |
|
fn=lambda input_file, start_page, methods, method=method: process_method( |
|
input_file, start_page, methods, method |
|
), |
|
inputs=[input_file, start_page, methods], |
|
outputs=output_components[idx * 4 : (idx + 1) * 4], |
|
) |
|
|
|
click_event.then(lambda: "All tasks completed.", outputs=[progress_status],).then( |
|
fn=to_zip_file, |
|
inputs=[ |
|
input_file, |
|
methods, |
|
] |
|
+ output_components, |
|
outputs=[output_file], |
|
) |
|
|
|
clear_btn.add( |
|
[ |
|
input_file, |
|
pdf_preview, |
|
output_file, |
|
] |
|
+ output_components |
|
) |
|
clear_btn.click( |
|
fn=lambda: gr.update(visible=False), |
|
outputs=[output_file], |
|
) |
|
|
|
visual_checkbox.change( |
|
fn=lambda state: [gr.update(visible=state)] * len(visualization_sub_tabs), |
|
inputs=visual_checkbox, |
|
outputs=visualization_sub_tabs, |
|
) |
|
|
|
demo.queue(default_concurrency_limit=1).launch( |
|
show_error=True, |
|
max_file_size="50mb", |
|
) |
|
|