taprosoft
fix: use ZERO
007293f
raw
history blame
14.3 kB
import os
import time
import zipfile
from collections import defaultdict
from pathlib import Path
import gradio as gr
import pymupdf4llm
from gradio_pdf import PDF
from backends import ( # convert_zerox,
SUPPORTED_METHODS,
SUPPORTED_METHODS_METADATA,
convert_gmft,
convert_pypdf,
convert_smoldocling,
convert_unstructured,
)
from backends.settings import ENABLE_DEBUG_MODE
from utils import remove_images_from_markdown, trim_pages
TRIMMED_PDF_PATH = Path("/tmp/trimmed_input")
TRIMMED_PDF_PATH.mkdir(exist_ok=True)
DO_WARMUP = os.getenv("DO_WARMUP", "True").lower() == "true"
MAX_SELECTED_METHODS = int(os.getenv("MAX_SELECTED_METHODS", "6"))
MAX_PAGES = int(os.getenv("MAX_PAGES", "2"))
# Install poppler-utils
import os
os.system("apt-get update")
os.system("apt-get install --no-install-recommends -y poppler-utils tesseract-ocr")
def convert_document(path, method, start_page=0, enabled=True):
if enabled:
print("Processing file", path, "with method", method)
else:
return "", "", "", []
# benchmarking
start = time.time()
path = trim_pages(
path,
output_path=TRIMMED_PDF_PATH,
start_page=start_page,
trim_pages=MAX_PAGES,
)
file_name = Path(path).stem
debug_image_paths = []
text = "unknown method"
if method == "SmolDocling":
text, debug_image_paths = convert_smoldocling(path, file_name)
elif method == "PyMuPDF":
text = pymupdf4llm.to_markdown(
path,
embed_images=True,
)
elif method == "Unstructured":
text, debug_image_paths = convert_unstructured(path, file_name)
elif method == "GMFT (table-only)":
text, debug_image_paths = convert_gmft(path, file_name)
elif method == "PyPDF":
text, debug_image_paths = convert_pypdf(path, file_name)
else:
raise ValueError(f"Unsupported method: {method}")
duration = time.time() - start
duration_message = f"Conversion with {method} took *{duration:.2f} seconds*"
print(duration_message)
return (
duration_message,
text,
remove_images_from_markdown(text),
debug_image_paths,
)
def to_zip_file(file_path, methods, *output_components):
markdown_text_dict = dict()
debug_images_dict = defaultdict(list)
for idx, method_name in enumerate(SUPPORTED_METHODS):
if method_name not in methods:
continue
markdown_text = output_components[idx * 4 + 2]
debug_images = output_components[idx * 4 + 3]
markdown_text_dict[method_name] = markdown_text
debug_images_dict[method_name] = debug_images
# create new temp directory using Python's tempfile module
temp_dir = Path(file_path).parent
zip_file_path = temp_dir / "output.zip"
markdown_path = temp_dir / f"{method_name}.md"
with open(markdown_path, "w") as f:
f.write(markdown_text)
# create a zip file in write mode
with zipfile.ZipFile(zip_file_path, "w", zipfile.ZIP_DEFLATED) as zipf:
for method_name, markdown_text in markdown_text_dict.items():
debug_image_paths = debug_images_dict[method_name]
# write the markdown text to the zip file
zipf.write(
markdown_path,
f"{method_name}/{method_name}.md",
)
if debug_image_paths:
for idx, (debug_image_path, _) in enumerate(debug_image_paths):
debug_image_name = Path(debug_image_path).name
zipf.write(
debug_image_path,
f"{method_name}/{debug_image_name}",
)
return gr.update(
value=str(zip_file_path),
visible=True,
)
def show_tabs(selected_methods):
visible_tabs = []
for method in SUPPORTED_METHODS:
visible_tabs.append(gr.update(visible=method in selected_methods))
return visible_tabs
latex_delimiters = [
{"left": "$$", "right": "$$", "display": True},
]
# startup test (also for loading models the first time)
start_startup = time.time()
WARMUP_PDF_PATH = "examples/table.pdf"
if DO_WARMUP:
print("Warm-up sequence")
for method in SUPPORTED_METHODS:
for _ in range(1):
convert_document(WARMUP_PDF_PATH, method)
startup_duration = time.time() - start_startup
print(f"Total start-up time: {startup_duration:.2f} seconds")
with gr.Blocks(
theme=gr.themes.Ocean(),
) as demo:
with open("header.html", "r") as file:
header = file.read()
gr.HTML(header)
output_components = []
output_tabs = []
visualization_sub_tabs = []
with gr.Row():
with gr.Column(variant="panel", scale=5):
input_file = gr.File(
label="Upload PDF document",
file_types=[
".pdf",
],
)
with gr.Accordion("Examples:"):
example_root = os.path.join(os.path.dirname(__file__), "examples")
gr.Examples(
examples=[
os.path.join(example_root, _)
for _ in os.listdir(example_root)
if _.endswith("pdf")
],
inputs=input_file,
)
progress_status = gr.Markdown("", show_label=False, container=False)
output_file = gr.File(
label="Download output",
interactive=False,
visible=False,
)
with gr.Column(variant="panel", scale=5):
with gr.Row():
methods = gr.Dropdown(
SUPPORTED_METHODS,
label=("Conversion methods"),
value=SUPPORTED_METHODS[:2],
multiselect=True,
)
with gr.Row():
with gr.Accordion(
"Advanced settings",
open=False,
):
start_page = gr.Number(
label=(
f"Starting page (only max {MAX_PAGES} "
"consecutive pages are processed)"
),
minimum=1,
maximum=100,
step=1,
value=1,
)
visual_checkbox = gr.Checkbox(
label="Enable debug visualization",
visible=ENABLE_DEBUG_MODE,
value=True,
)
with gr.Row():
convert_btn = gr.Button("Convert", variant="primary", scale=2)
clear_btn = gr.ClearButton(value="Clear", scale=1)
with gr.Row():
with gr.Column(variant="panel", scale=5):
pdf_preview = PDF(
label="PDF preview",
interactive=False,
visible=True,
height=800,
)
with gr.Accordion("Notes", open=False):
gr.Markdown(
container=False,
show_label=False,
value=(
"- Use the playground for non-sensitive data only.\n"
"- Figure and formular extraction might not work properly with some methods (we are working on that!).\n" # noqa
"- Some methods (e.g: Gemini) may use external API to process the document.\n" # noqa
"- Some methods (e.g: Unstructured) may has longer processing time due to not utilizing GPU resource, be patient!\n" # noqa
"- If you have suggestion for better default configuration of current methods, please let us know!\n" # noqa
),
)
with gr.Column(variant="panel", scale=5):
with gr.Tabs():
for method in SUPPORTED_METHODS:
with gr.Tab(method, visible=False) as output_tab:
with gr.Tabs():
with gr.Tab("Markdown render"):
markdown_render = gr.Markdown(
label="Markdown rendering",
height=800,
show_copy_button=True,
latex_delimiters=latex_delimiters,
)
with gr.Tab("Markdown text"):
markdown_text = gr.TextArea(
lines=45, show_label=False, container=False
)
with gr.Tab(
"Debug visualization",
visible=ENABLE_DEBUG_MODE,
) as visual_sub_tab:
output_description = gr.Markdown(
container=False,
show_label=False,
)
debug_images = gr.Gallery(
show_label=False,
container=False,
interactive=False,
)
with gr.Tab("About"):
method_metadata = SUPPORTED_METHODS_METADATA[
method
] # type: ignore
method_name = method_metadata["name"] # type: ignore
method_description = method_metadata[
"description"
] # type: ignore
method_url = method_metadata["url"] # type: ignore
method_documentation = method_metadata[
"documentation"
] # type: ignore
gr.Markdown(
value=(
f"# {method_name}\n\n{method_description}\n\n"
+ (
f"[[Github repo]]({method_url}) "
if method_url
else ""
)
+ f"[[Documentation]]({method_documentation})"
),
container=False,
show_label=False,
)
output_components.extend(
[
output_description,
markdown_render,
markdown_text,
debug_images,
]
)
output_tabs.append(output_tab)
visualization_sub_tabs.append(visual_sub_tab)
input_file.change(fn=lambda x: x, inputs=input_file, outputs=pdf_preview)
def check_preconditions(input_file, selected_methods):
if len(selected_methods) > MAX_SELECTED_METHODS:
raise ValueError(
"Please select up-to " f"{MAX_SELECTED_METHODS} methods only!"
)
if input_file is None:
raise ValueError("Please upload a PDF file first!")
click_event = convert_btn.click(
fn=check_preconditions,
inputs=[input_file, methods],
).success(
fn=show_tabs,
inputs=[methods],
outputs=output_tabs,
)
for idx, method in enumerate(SUPPORTED_METHODS):
def progress_message(selected_methods, method=method):
selected_methods_indices = [
idx
for idx, current_method in enumerate(SUPPORTED_METHODS)
if current_method in selected_methods
]
try:
current_method_idx = selected_methods_indices.index(
SUPPORTED_METHODS.index(method)
)
msg = (
f"Processing ({current_method_idx + 1} / "
f"{len(selected_methods)}) **{method}**...\n\n"
)
except ValueError:
msg = gr.update()
return msg
def process_method(input_file, start_page, selected_methods, method=method):
return convert_document(
input_file,
method=method,
start_page=start_page - 1,
enabled=method in selected_methods,
)
click_event = click_event.then(
fn=lambda methods, method=method: progress_message(methods, method),
inputs=[methods],
outputs=[progress_status],
).then(
fn=lambda input_file, start_page, methods, method=method: process_method(
input_file, start_page, methods, method
),
inputs=[input_file, start_page, methods],
outputs=output_components[idx * 4 : (idx + 1) * 4],
)
click_event.then(lambda: "All tasks completed.", outputs=[progress_status],).then(
fn=to_zip_file,
inputs=[
input_file,
methods,
]
+ output_components,
outputs=[output_file],
)
clear_btn.add(
[
input_file,
pdf_preview,
output_file,
]
+ output_components
)
clear_btn.click(
fn=lambda: gr.update(visible=False),
outputs=[output_file],
)
visual_checkbox.change(
fn=lambda state: [gr.update(visible=state)] * len(visualization_sub_tabs),
inputs=visual_checkbox,
outputs=visualization_sub_tabs,
)
demo.queue(default_concurrency_limit=1).launch(
show_error=True,
max_file_size="50mb",
)