awacke1's picture
Update app.py
fd25a82 verified
raw
history blame
21.2 kB
import os
import json
import tempfile
import zipfile
from datetime import datetime
import gradio as gr
import numpy as np
import torch
from PIL import Image
# Program A imports
from utils import MEGABenchEvalDataLoader
from constants import * # This is assumed to define CITATION_BUTTON_TEXT, CITATION_BUTTON_LABEL, TABLE_INTRODUCTION, LEADERBOARD_INTRODUCTION, DATA_INFO, SUBMIT_INTRODUCTION, BASE_MODEL_GROUPS, etc.
# Program B imports
import spaces
from transformers import Qwen2VLForConditionalGeneration, AutoProcessor, Qwen2_5_VLForConditionalGeneration
from qwen_vl_utils import process_vision_info
from gliner import GLiNER
# ----------------------------------------------------------------
# Combined CSS
# ----------------------------------------------------------------
current_dir = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(current_dir, "static", "css", "style.css"), "r") as f:
base_css = f.read()
with open(os.path.join(current_dir, "static", "css", "table.css"), "r") as f:
table_css = f.read()
css_program_b = """
/* Program B CSS */
.gradio-container {
max-width: 1200px !important;
margin: 0 auto;
padding: 20px;
background-color: #f8f9fa;
}
.tabs {
border-radius: 8px;
background: white;
padding: 20px;
box-shadow: 0 2px 6px rgba(0, 0, 0, 0.1);
}
.input-container, .output-container {
background: white;
border-radius: 8px;
padding: 15px;
margin: 10px 0;
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.05);
}
.submit-btn {
background-color: #2d31fa !important;
border: none !important;
padding: 8px 20px !important;
border-radius: 6px !important;
color: white !important;
transition: all 0.3s ease !important;
}
.submit-btn:hover {
background-color: #1f24c7 !important;
transform: translateY(-1px);
}
#output {
height: 500px;
overflow: auto;
border: 1px solid #e0e0e0;
border-radius: 6px;
padding: 15px;
background: #ffffff;
font-family: 'Arial', sans-serif;
}
.gr-dropdown {
border-radius: 6px !important;
border: 1px solid #e0e0e0 !important;
}
.gr-image-input {
border: 2px dashed #ccc;
border-radius: 8px;
padding: 20px;
transition: all 0.3s ease;
}
.gr-image-input:hover {
border-color: #2d31fa;
}
"""
css_global = base_css + "\n" + table_css + "\n" + css_program_b
# ----------------------------------------------------------------
# Program A Global Initializations
# ----------------------------------------------------------------
default_loader = MEGABenchEvalDataLoader("./static/eval_results/Default")
si_loader = MEGABenchEvalDataLoader("./static/eval_results/SI")
# ----------------------------------------------------------------
# Program B Global Initializations
# ----------------------------------------------------------------
gliner_model = GLiNER.from_pretrained("knowledgator/modern-gliner-bi-large-v1.0")
DEFAULT_NER_LABELS = "person, organization, location, date, event"
models = {
"Qwen/Qwen2.5-VL-7B-Instruct": Qwen2_5_VLForConditionalGeneration.from_pretrained(
"Qwen/Qwen2.5-VL-7B-Instruct", trust_remote_code=True, torch_dtype="auto"
).cuda().eval()
}
processors = {
"Qwen/Qwen2.5-VL-7B-Instruct": AutoProcessor.from_pretrained(
"Qwen/Qwen2.5-VL-7B-Instruct", trust_remote_code=True
)
}
user_prompt = '<|user|>\n'
assistant_prompt = '<|assistant|>\n'
prompt_suffix = "<|end|>\n"
# A simple metadata container for OCR results and entity information.
class TextWithMetadata(list):
def __init__(self, *args, **kwargs):
super().__init__(*args)
self.original_text = kwargs.get('original_text', '')
self.entities = kwargs.get('entities', [])
# ----------------------------------------------------------------
# UI DEFINITION (placed at the top)
# ----------------------------------------------------------------
with gr.Blocks(css=css_global) as demo:
with gr.Tabs():
# -------------------------
# Tab 1: Dashboard (Program A)
# -------------------------
with gr.TabItem("Dashboard"):
with gr.Tabs(elem_classes="tab-buttons") as dashboard_tabs:
# --- MEGA-Bench Leaderboard Tab ---
with gr.TabItem("📊 MEGA-Bench"):
# Inject table CSS (will be updated when the table is refreshed)
css_style = gr.HTML(f"<style>{base_css}\n{table_css}</style>", visible=False)
# Define captions for default vs. single-image tables
default_caption = ("**Table 1: MEGA-Bench full results.** The number in the parentheses is the number of tasks "
"of each keyword. <br> The Core set contains $N_{\\text{core}} = 440$ tasks evaluated by "
"rule-based metrics, and the Open-ended set contains $N_{\\text{open}} = 65$ tasks evaluated by a "
"VLM judge (we use GPT-4o-0806). <br> Different from the results in our paper, we only use the Core "
"results with CoT prompting here for clarity and compatibility with the released data. <br> "
"$\\text{Overall} \\ = \\ \\frac{\\text{Core} \\ \\cdot \\ N_{\\text{core}} \\ + \\ \\text{Open-ended} "
"\\ \\cdot \\ N_{\\text{open}}}{N_{\\text{core}} \\ + \\ N_{\\text{open}}}$ <br> * indicates self-reported "
"results from the model authors.")
single_image_caption = ("**Table 2: MEGA-Bench Single-image setting results.** The number in the parentheses is the number of tasks "
"in each keyword. <br> This subset contains 273 single-image tasks from the Core set and 42 single-image tasks "
"from the Open-ended set. For open-source models, we drop the image input in the 1-shot demonstration example so that "
"the entire query contains a single image only. <br> Compared to the default table, some models with only "
"single-image support are added.")
caption_component = gr.Markdown(
value=default_caption,
elem_classes="table-caption",
latex_delimiters=[{"left": "$", "right": "$", "display": False}],
)
with gr.Row():
super_group_selector = gr.Radio(
choices=list(default_loader.SUPER_GROUPS.keys()),
label="Select a dimension to display breakdown results. We use different column colors to distinguish the overall benchmark scores and breakdown results.",
value=list(default_loader.SUPER_GROUPS.keys())[0]
)
model_group_selector = gr.Radio(
choices=list(BASE_MODEL_GROUPS.keys()),
label="Select a model group",
value="All"
)
initial_headers, initial_data = default_loader.get_leaderboard_data(
list(default_loader.SUPER_GROUPS.keys())[0], "All"
)
data_component = gr.Dataframe(
value=initial_data,
headers=initial_headers,
datatype=["number", "html"] + ["number"] * (len(initial_headers) - 2),
interactive=True,
elem_classes="custom-dataframe",
max_height=2400,
column_widths=["100px", "240px"] + ["160px"] * 3 + ["210px"] * (len(initial_headers) - 5),
)
with gr.Row():
with gr.Accordion("Citation", open=False):
citation_button = gr.Textbox(
value=CITATION_BUTTON_TEXT,
label=CITATION_BUTTON_LABEL,
elem_id="citation-button",
lines=10,
)
gr.Markdown(TABLE_INTRODUCTION)
with gr.Row():
table_selector = gr.Radio(
choices=["Default", "Single Image"],
label="Select table to display. Default: all MEGA-Bench tasks; Single Image: single-image tasks only.",
value="Default"
)
refresh_button = gr.Button("Refresh")
# Wire up event handlers (functions defined below)
refresh_button.click(
fn=update_table_and_caption,
inputs=[table_selector, super_group_selector, model_group_selector],
outputs=[data_component, caption_component, css_style]
)
super_group_selector.change(
fn=update_table_and_caption,
inputs=[table_selector, super_group_selector, model_group_selector],
outputs=[data_component, caption_component, css_style]
)
model_group_selector.change(
fn=update_table_and_caption,
inputs=[table_selector, super_group_selector, model_group_selector],
outputs=[data_component, caption_component, css_style]
)
table_selector.change(
fn=update_selectors,
inputs=[table_selector],
outputs=[super_group_selector, model_group_selector]
).then(
fn=update_table_and_caption,
inputs=[table_selector, super_group_selector, model_group_selector],
outputs=[data_component, caption_component, css_style]
)
# --- Introduction Tab ---
with gr.TabItem("📚 Introduction"):
gr.Markdown(LEADERBOARD_INTRODUCTION)
# --- Data Information Tab ---
with gr.TabItem("📝 Data Information"):
gr.Markdown(DATA_INFO, elem_classes="markdown-text")
# --- Submit Tab ---
with gr.TabItem("🚀 Submit"):
with gr.Row():
gr.Markdown(SUBMIT_INTRODUCTION, elem_classes="markdown-text")
# -------------------------
# Tab 2: Image Processing (Program B)
# -------------------------
with gr.TabItem("Image Processing"):
# A default image is shown for context.
gr.Image("Caracal.jpg", interactive=False)
# It is important to create a state variable to store the OCR/NER result.
ocr_state = gr.State()
with gr.Tab(label="Image Input", elem_classes="tabs"):
with gr.Row():
with gr.Column(elem_classes="input-container"):
input_img = gr.Image(label="Input Picture", elem_classes="gr-image-input")
model_selector = gr.Dropdown(
choices=list(models.keys()),
label="Model",
value="Qwen/Qwen2.5-VL-7B-Instruct",
elem_classes="gr-dropdown"
)
with gr.Row():
ner_checkbox = gr.Checkbox(label="Run Named Entity Recognition", value=False)
ner_labels = gr.Textbox(
label="NER Labels (comma-separated)",
value=DEFAULT_NER_LABELS,
visible=False
)
submit_btn = gr.Button(value="Submit", elem_classes="submit-btn")
with gr.Column(elem_classes="output-container"):
output_text = gr.HighlightedText(label="Output Text", elem_id="output")
# Toggle visibility of the NER labels textbox.
ner_checkbox.change(
lambda x: gr.update(visible=x),
inputs=[ner_checkbox],
outputs=[ner_labels]
)
submit_btn.click(
fn=run_example,
inputs=[input_img, model_selector, ner_checkbox, ner_labels],
outputs=[output_text, ocr_state]
)
with gr.Row():
filename = gr.Textbox(label="Save filename (without extension)", placeholder="Enter filename to save")
download_btn = gr.Button("Download Image & Text", elem_classes="submit-btn")
download_output = gr.File(label="Download")
download_btn.click(
fn=create_zip,
inputs=[input_img, filename, ocr_state],
outputs=[download_output]
)
# ----------------------------------------------------------------
# FUNCTION DEFINITIONS
# ----------------------------------------------------------------
def update_table_and_caption(table_type, super_group, model_group):
"""
Updates the leaderboard DataFrame, caption and CSS based on the table type and selectors.
"""
if table_type == "Default":
headers, data = default_loader.get_leaderboard_data(super_group, model_group)
caption = ("**Table 1: MEGA-Bench full results.** The number in the parentheses is the number of tasks "
"of each keyword. <br> The Core set contains $N_{\\text{core}} = 440$ tasks evaluated by rule-based metrics, and the "
"Open-ended set contains $N_{\\text{open}} = 65$ tasks evaluated by a VLM judge (we use GPT-4o-0806). <br> "
"Different from the results in our paper, we only use the Core results with CoT prompting here for clarity and compatibility "
"with the released data. <br> $\\text{Overall} \\ = \\ \\frac{\\text{Core} \\ \\cdot \\ N_{\\text{core}} \\ + \\ \\text{Open-ended} "
"\\ \\cdot \\ N_{\\text{open}}}{N_{\\text{core}} \\ + \\ N_{\\text{open}}}$ <br> * indicates self-reported results from the model authors.")
else: # Single-image table
headers, data = si_loader.get_leaderboard_data(super_group, model_group)
caption = ("**Table 2: MEGA-Bench Single-image setting results.** The number in the parentheses is the number of tasks "
"in each keyword. <br> This subset contains 273 single-image tasks from the Core set and 42 single-image tasks from the Open-ended set. "
"For open-source models, we drop the image input in the 1-shot demonstration example so that the entire query contains a single image only. <br> "
"Compared to the default table, some models with only single-image support are added.")
dataframe = gr.Dataframe(
value=data,
headers=headers,
datatype=["number", "html"] + ["number"] * (len(headers) - 2),
interactive=True,
column_widths=["100px", "240px"] + ["160px"] * 3 + ["210px"] * (len(headers) - 5),
)
style_html = f"<style>{base_css}\n{table_css}</style>"
return dataframe, caption, style_html
def update_selectors(table_type):
"""
Updates the options in the radio selectors based on the selected table type.
"""
loader = default_loader if table_type == "Default" else si_loader
return [gr.Radio.update(choices=list(loader.SUPER_GROUPS.keys())),
gr.Radio.update(choices=list(loader.MODEL_GROUPS.keys()))]
def array_to_image_path(image_array):
"""
Converts a NumPy image array to a PIL Image, saves it to disk, and returns its path.
"""
img = Image.fromarray(np.uint8(image_array))
img.thumbnail((1024, 1024))
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"image_{timestamp}.png"
img.save(filename)
return os.path.abspath(filename)
@spaces.GPU
def run_example(image, model_id="Qwen/Qwen2.5-VL-7B-Instruct", run_ner=False, ner_labels=DEFAULT_NER_LABELS):
"""
Given an input image, uses the selected VL model to perform OCR (and optionally NER).
Returns the highlighted text and stores the raw OCR output in state.
"""
text_input = "Convert the image to text."
image_path = array_to_image_path(image)
model = models[model_id]
processor = processors[model_id]
prompt = f"{user_prompt}<|image_1|>\n{text_input}{prompt_suffix}{assistant_prompt}"
image_pil = Image.fromarray(image).convert("RGB")
messages = [
{
"role": "user",
"content": [
{"type": "image", "image": image_path},
{"type": "text", "text": text_input},
],
}
]
# Prepare text and vision inputs
text_full = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
image_inputs, video_inputs = process_vision_info(messages)
inputs = processor(
text=[text_full],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
)
inputs = inputs.to("cuda")
# Generate model output
generated_ids = model.generate(**inputs, max_new_tokens=1024)
generated_ids_trimmed = [out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)]
output_text = processor.batch_decode(
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)
ocr_text = output_text[0]
if run_ner:
ner_results = gliner_model.predict_entities(ocr_text, ner_labels.split(","), threshold=0.3)
highlighted_text = []
last_end = 0
for entity in sorted(ner_results, key=lambda x: x["start"]):
if last_end < entity["start"]:
highlighted_text.append((ocr_text[last_end:entity["start"]], None))
highlighted_text.append((ocr_text[entity["start"]:entity["end"]], entity["label"]))
last_end = entity["end"]
if last_end < len(ocr_text):
highlighted_text.append((ocr_text[last_end:], None))
result = TextWithMetadata(highlighted_text, original_text=ocr_text, entities=ner_results)
return result, result # one for display, one for state
result = TextWithMetadata([(ocr_text, None)], original_text=ocr_text, entities=[])
return result, result
def create_zip(image, fname, ocr_result):
"""
Creates a zip file containing the saved image, the OCR text, and a JSON of the OCR output.
"""
if not fname or image is None:
return None
try:
if isinstance(image, np.ndarray):
image_pil = Image.fromarray(image)
elif isinstance(image, Image.Image):
image_pil = image
else:
return None
with tempfile.TemporaryDirectory() as temp_dir:
img_path = os.path.join(temp_dir, f"{fname}.png")
image_pil.save(img_path)
original_text = ocr_result.original_text if ocr_result else ""
txt_path = os.path.join(temp_dir, f"{fname}.txt")
with open(txt_path, 'w', encoding='utf-8') as f:
f.write(original_text)
json_data = {
"text": original_text,
"entities": ocr_result.entities if ocr_result else [],
"image_file": f"{fname}.png"
}
json_path = os.path.join(temp_dir, f"{fname}.json")
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(json_data, f, indent=2, ensure_ascii=False)
output_dir = "downloads"
os.makedirs(output_dir, exist_ok=True)
zip_path = os.path.join(output_dir, f"{fname}.zip")
with zipfile.ZipFile(zip_path, 'w') as zipf:
zipf.write(img_path, os.path.basename(img_path))
zipf.write(txt_path, os.path.basename(txt_path))
zipf.write(json_path, os.path.basename(json_path))
return zip_path
except Exception as e:
print(f"Error creating zip: {str(e)}")
return None
# ----------------------------------------------------------------
# Launch the merged Gradio app
# ----------------------------------------------------------------
if __name__ == "__main__":
demo.queue(api_open=False)
demo.launch(debug=True)