show_ui_test / app.py
h-siyuan's picture
Update app.py
ea2b32e verified
raw
history blame
10.6 kB
import base64
import json
from datetime import datetime
import gradio as gr
import torch
import spaces
from PIL import Image, ImageDraw
from qwen_vl_utils import process_vision_info
from transformers import Qwen2VLForConditionalGeneration, AutoProcessor
import ast
import os
import numpy as np
from huggingface_hub import hf_hub_download, list_repo_files
import boto3
from botocore.exceptions import NoCredentialsError
# Define constants
DESCRIPTION = "[ShowUI Demo](https://huggingface.co/showlab/ShowUI-2B)"
_SYSTEM = "Based on the screenshot of the page, I give a text description and you give its corresponding location. The coordinate represents a clickable location [x, y] for an element, which is a relative coordinate on the screenshot, scaled from 0 to 1."
MIN_PIXELS = 256 * 28 * 28
MAX_PIXELS = 1344 * 28 * 28
# Specify the model repository and destination folder
model_repo = "showlab/ShowUI-2B"
destination_folder = "./showui-2b"
# Ensure the destination folder exists
os.makedirs(destination_folder, exist_ok=True)
# List all files in the repository
files = list_repo_files(repo_id=model_repo)
# Download each file to the destination folder
for file in files:
file_path = hf_hub_download(repo_id=model_repo, filename=file, local_dir=destination_folder)
print(f"Downloaded {file} to {file_path}")
model = Qwen2VLForConditionalGeneration.from_pretrained(
destination_folder,
torch_dtype=torch.bfloat16,
device_map="cpu",
)
# Load the processor
processor = AutoProcessor.from_pretrained("Qwen/Qwen2-VL-2B-Instruct", min_pixels=MIN_PIXELS, max_pixels=MAX_PIXELS)
# Helper functions
def draw_point(image_input, point=None, radius=5):
"""Draw a point on the image."""
if isinstance(image_input, str):
image = Image.open(image_input)
else:
image = Image.fromarray(np.uint8(image_input))
if point:
x, y = point[0] * image.width, point[1] * image.height
ImageDraw.Draw(image).ellipse((x - radius, y - radius, x + radius, y + radius), fill='red')
return image
def array_to_image_path(image_array, session_id):
"""Save the uploaded image and return its path."""
if image_array is None:
raise ValueError("No image provided. Please upload an image before submitting.")
img = Image.fromarray(np.uint8(image_array))
filename = f"{session_id}.png"
img.save(filename)
return os.path.abspath(filename)
def upload_to_s3(file_name, bucket, object_name=None):
"""Upload a file to an S3 bucket."""
if object_name is None:
object_name = file_name
s3 = boto3.client('s3')
try:
s3.upload_file(file_name, bucket, object_name)
return True
except FileNotFoundError:
return False
except NoCredentialsError:
return False
@spaces.GPU
def run_showui(image, query, session_id):
"""Main function for inference."""
image_path = array_to_image_path(image, session_id)
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": _SYSTEM},
{"type": "image", "image": image_path, "min_pixels": MIN_PIXELS, "max_pixels": MAX_PIXELS},
{"type": "text", "text": query}
],
}
]
global model
model = model.to("cuda")
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
image_inputs, video_inputs = process_vision_info(messages)
inputs = processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt"
)
inputs = inputs.to("cuda")
generated_ids = model.generate(**inputs, max_new_tokens=128)
generated_ids_trimmed = [
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
output_text = processor.batch_decode(
generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)[0]
click_xy = ast.literal_eval(output_text)
result_image = draw_point(image_path, click_xy, radius=10)
return result_image, str(click_xy), image_path
def save_and_upload_data(image_path, query, session_id, is_example_image, votes=None):
"""Save the data to a JSON file and upload to S3."""
if is_example_image:
return
votes = votes or {"upvotes": 0, "downvotes": 0}
data = {
"image_path": image_path,
"query": query,
"votes": votes,
"timestamp": datetime.now().isoformat()
}
local_file_name = f"{session_id}.json"
with open(local_file_name, "w") as f:
json.dump(data, f)
upload_to_s3(local_file_name, 'altair.storage', object_name=f"ootb/{local_file_name}")
upload_to_s3(image_path, 'altair.storage', object_name=f"ootb/{os.path.basename(image_path)}")
return data
def update_vote(vote_type, session_id, is_example_image):
"""Update the vote count and re-upload the JSON file."""
if is_example_image:
return "Example image used. No vote recorded."
local_file_name = f"{session_id}.json"
with open(local_file_name, "r") as f:
data = json.load(f)
if vote_type == "upvote":
data["votes"]["upvotes"] += 1
elif vote_type == "downvote":
data["votes"]["downvotes"] += 1
with open(local_file_name, "w") as f:
json.dump(data, f)
upload_to_s3(local_file_name, 'altair.storage', object_name=f"ootb/{local_file_name}")
return f"Your {vote_type} has been recorded. Thank you!"
with open("./assets/showui.png", "rb") as image_file:
base64_image = base64.b64encode(image_file.read()).decode("utf-8")
examples = [
["./examples/app_store.png", "Download Kindle.", True],
["./examples/ios_setting.png", "Turn off Do not disturb.", True],
["./examples/apple_music.png", "Star to favorite.", True],
["./examples/map.png", "Boston.", True],
["./examples/wallet.png", "Scan a QR code.", True],
["./examples/word.png", "More shapes.", True],
["./examples/web_shopping.png", "Proceed to checkout.", True],
["./examples/web_forum.png", "Post my comment.", True],
["./examples/safari_google.png", "Click on search bar.", True],
]
def check_if_example(query):
"""Check if the query matches an example query."""
print(f"Checking query: {query}")
for _, example_query, is_example in examples:
print(f"Comparing with example: {example_query}")
if query.strip() == example_query.strip():
print("Match found!")
return is_example
print("No match found.")
return False
def build_demo(embed_mode, concurrency_count=1):
with gr.Blocks(title="ShowUI Demo", theme=gr.themes.Default()) as demo:
state_image_path = gr.State(value=None)
state_session_id = gr.State(value=None)
state_is_example_image = gr.State(value=False)
if not embed_mode:
gr.HTML(
f"""
<div style="text-align: center; margin-bottom: 20px;">
<div style="display: flex; justify-content: center;">
<img src="data:image/png;base64,{base64_image}" alt="ShowUI" width="320" style="margin-bottom: 10px;"/>
</div>
<p>ShowUI is a lightweight vision-language-action model for GUI agents.</p>
</div>
"""
)
with gr.Row():
with gr.Column(scale=3):
imagebox = gr.Image(type="numpy", label="Input Screenshot")
textbox = gr.Textbox(
show_label=True,
placeholder="Enter a query (e.g., 'Click Nahant')",
label="Query",
)
submit_btn = gr.Button(value="Submit", variant="primary")
gr.Examples(
examples=[[e[0], e[1]] for e in examples],
inputs=[imagebox, textbox],
outputs=[state_is_example_image],
fn=lambda img, query: check_if_example(query),
examples_per_page=3
)
with gr.Column(scale=8):
output_img = gr.Image(type="pil", label="Output Image")
output_coords = gr.Textbox(label="Clickable Coordinates")
with gr.Row(elem_id="action-buttons", equal_height=True):
upvote_btn = gr.Button(value="Looks good!", variant="secondary")
downvote_btn = gr.Button(value="Too bad!", variant="secondary")
clear_btn = gr.Button(value="🗑️ Clear", interactive=True)
def on_submit(image, query, is_example_image):
if image is None:
raise ValueError("No image provided. Please upload an image before submitting.")
session_id = datetime.now().strftime("%Y%m%d_%H%M%S")
result_image, click_coords, image_path = run_showui(image, query, session_id)
save_and_upload_data(image_path, query, session_id, is_example_image)
return result_image, click_coords, image_path, session_id, is_example_image
submit_btn.click(
on_submit,
[imagebox, textbox, state_is_example_image],
[output_img, output_coords, state_image_path, state_session_id, state_is_example_image],
)
clear_btn.click(
lambda: (None, None, None, None, None),
inputs=None,
outputs=[imagebox, textbox, output_img, output_coords, state_image_path, state_session_id, state_is_example_image],
queue=False
)
upvote_btn.click(
lambda session_id, is_example_image: update_vote("upvote", session_id, is_example_image),
inputs=[state_session_id, state_is_example_image],
outputs=[],
queue=False
)
downvote_btn.click(
lambda session_id, is_example_image: update_vote("downvote", session_id, is_example_image),
inputs=[state_session_id, state_is_example_image],
outputs=[],
queue=False
)
return demo
if __name__ == "__main__":
demo = build_demo(embed_mode=False)
demo.queue(api_open=False).launch(
server_name="0.0.0.0",
server_port=7860,
ssr_mode=False,
debug=True,
)