Ovis2-16B / app.py
rabbit19731's picture
Update app.py
20e8385 verified
import subprocess
subprocess.run('pip install flash-attn==2.7.0.post2 --no-build-isolation', env={'FLASH_ATTENTION_SKIP_CUDA_BUILD': "TRUE"}, shell=True)
import spaces
import os
import re
import logging
from typing import List, Any
from threading import Thread
import torch
import gradio as gr
from transformers import AutoModelForCausalLM, TextIteratorStreamer
from moviepy.editor import VideoFileClip
from PIL import Image
model_name = 'AIDC-AI/Ovis2-16B'
use_thread = False
IMAGE_MAX_PARTITION = 16
VIDEO_FRAME_NUMS = 32
VIDEO_MAX_PARTITION = 1
# load model
model = AutoModelForCausalLM.from_pretrained(model_name,
torch_dtype=torch.bfloat16,
multimodal_max_length=8192,
trust_remote_code=True).to(device='cuda')
text_tokenizer = model.get_text_tokenizer()
visual_tokenizer = model.get_visual_tokenizer()
streamer = TextIteratorStreamer(text_tokenizer, skip_prompt=True, skip_special_tokens=True)
image_placeholder = '<image>'
cur_dir = os.path.dirname(os.path.abspath(__file__))
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def initialize_gen_kwargs():
return {
"max_new_tokens": 1536,
"do_sample": False,
"top_p": None,
"top_k": None,
"temperature": None,
"repetition_penalty": 1.05,
"eos_token_id": model.generation_config.eos_token_id,
"pad_token_id": text_tokenizer.pad_token_id,
"use_cache": True
}
def submit_chat(chatbot, text_input):
response = ''
chatbot.append((text_input, response))
return chatbot ,''
@spaces.GPU
def ovis_chat(chatbot: List[List[str]], image_input: Any, video_input: Any):
conversations, model_inputs = prepare_inputs(chatbot, image_input, video_input)
gen_kwargs = initialize_gen_kwargs()
with torch.inference_mode():
generate_func = lambda: model.generate(**model_inputs, **gen_kwargs, streamer=streamer)
if use_thread:
thread = Thread(target=generate_func)
thread.start()
else:
generate_func()
response = ""
for new_text in streamer:
response += new_text
chatbot[-1][1] = response
yield chatbot
if use_thread:
thread.join()
log_conversation(chatbot)
def prepare_inputs(chatbot: List[List[str]], image_input: Any, video_input: Any):
# conversations = [{
# "from": "system",
# "value": "You are a helpful assistant, and your task is to provide reliable and structured responses to users."
# }]
conversations= []
for query, response in chatbot[:-1]:
conversations.extend([
{"from": "human", "value": query},
{"from": "gpt", "value": response}
])
last_query = chatbot[-1][0].replace(image_placeholder, '')
conversations.append({"from": "human", "value": last_query})
max_partition = IMAGE_MAX_PARTITION
if image_input is not None:
for conv in conversations:
if conv["from"] == "human":
conv["value"] = f'{image_placeholder}\n{conv["value"]}'
break
max_partition = IMAGE_MAX_PARTITION
image_input = [image_input]
if video_input is not None:
for conv in conversations:
if conv["from"] == "human":
conv["value"] = f'{image_placeholder}\n' * VIDEO_FRAME_NUMS + f'{conv["value"]}'
break
# extract video frames here
with VideoFileClip(video_input) as clip:
total_frames = int(clip.fps * clip.duration)
if total_frames <= VIDEO_FRAME_NUMS:
sampled_indices = range(total_frames)
else:
stride = total_frames / VIDEO_FRAME_NUMS
sampled_indices = [min(total_frames - 1, int((stride * i + stride * (i + 1)) / 2)) for i in range(VIDEO_FRAME_NUMS)]
frames = [clip.get_frame(index / clip.fps) for index in sampled_indices]
frames = [Image.fromarray(frame, mode='RGB') for frame in frames]
image_input = frames
max_partition = VIDEO_MAX_PARTITION
logger.info(conversations)
prompt, input_ids, pixel_values = model.preprocess_inputs(conversations, image_input, max_partition=max_partition)
attention_mask = torch.ne(input_ids, text_tokenizer.pad_token_id)
model_inputs = {
"inputs": input_ids.unsqueeze(0).to(device=model.device),
"attention_mask": attention_mask.unsqueeze(0).to(device=model.device),
"pixel_values": [pixel_values.to(dtype=visual_tokenizer.dtype, device=visual_tokenizer.device)] if image_input is not None else [None]
}
return conversations, model_inputs
def log_conversation(chatbot):
logger.info("[OVIS_CONV_START]")
[print(f'Q{i}:\n {request}\nA{i}:\n {answer}') for i, (request, answer) in enumerate(chatbot, 1)]
logger.info("[OVIS_CONV_END]")
def clear_chat():
return [], None, "", None
with open(f"{cur_dir}/resource/logo.svg", "r", encoding="utf-8") as svg_file:
svg_content = svg_file.read()
font_size = "2.5em"
svg_content = re.sub(r'(<svg[^>]*)(>)', rf'\1 height="{font_size}" style="vertical-align: middle; display: inline-block;"\2', svg_content)
html = f"""
<p align="center" style="font-size: {font_size}; line-height: 1;">
<span style="display: inline-block; vertical-align: middle;">{svg_content}</span>
<span style="display: inline-block; vertical-align: middle;">{model_name.split('/')[-1]}</span>
</p>
<center><font size=3><b>Ovis</b> has been open-sourced on <a href='https://huggingface.co/{model_name}'>😊 Huggingface</a> and <a href='https://github.com/AIDC-AI/Ovis'>🌟 GitHub</a>. If you find Ovis useful, a like❤️ or a star🌟 would be appreciated.</font></center>
"""
latex_delimiters_set = [{
"left": "\\(",
"right": "\\)",
"display": False
}, {
"left": "\\begin{equation}",
"right": "\\end{equation}",
"display": True
}, {
"left": "\\begin{align}",
"right": "\\end{align}",
"display": True
}, {
"left": "\\begin{alignat}",
"right": "\\end{alignat}",
"display": True
}, {
"left": "\\begin{gather}",
"right": "\\end{gather}",
"display": True
}, {
"left": "\\begin{CD}",
"right": "\\end{CD}",
"display": True
}, {
"left": "\\[",
"right": "\\]",
"display": True
}]
text_input = gr.Textbox(label="prompt", placeholder="Enter your text here...", lines=1, container=False)
with gr.Blocks(title=model_name.split('/')[-1], theme=gr.themes.Ocean()) as demo:
gr.HTML(html)
with gr.Row():
with gr.Column(scale=3):
input_type = gr.Radio(choices=["image + prompt", "video + prompt"], label="Select input type:", value="image + prompt", elem_classes="my_radio")
image_input = gr.Image(label="image", height=350, type="pil", visible=True)
video_input = gr.Video(label="video", height=350, format='mp4', visible=False)
with gr.Column(visible=True) as image_examples_col:
image_examples = gr.Examples(
examples=[
[f"{cur_dir}/examples/ovis2_math0.jpg", "Each face of the polyhedron shown is either a triangle or a square. Each square borders 4 triangles, and each triangle borders 3 squares. The polyhedron has 6 squares. How many triangles does it have?\n\nProvide a step-by-step solution to the problem, and conclude with 'the answer is' followed by the final solution."],
[f"{cur_dir}/examples/ovis2_math1.jpg", "A large square touches another two squares, as shown in the picture. The numbers inside the smaller squares indicate their areas. What is the area of the largest square?\n\nProvide a step-by-step solution to the problem, and conclude with 'the answer is' followed by the final solution."],
[f"{cur_dir}/examples/ovis2_figure0.png", "Explain this model."],
[f"{cur_dir}/examples/ovis2_figure1.png", "Organize the notes about GRPO in the figure."],
[f"{cur_dir}/examples/ovis2_multi0.jpg", "Posso avere un frappuccino e un caffè americano di taglia M? Quanto costa in totale?"],
],
inputs=[image_input, text_input]
)
def update_visibility_on_example(video_input, text_input):
return (gr.update(visible=True), text_input)
with gr.Column(visible=False) as video_examples_col:
video_examples = gr.Examples(
examples=[
[f"{cur_dir}/examples/video_demo_1.mp4", "Describe the video."]
],
inputs=[video_input, text_input],
fn = update_visibility_on_example,
run_on_click = True,
outputs=[video_input, text_input]
)
with gr.Column(scale=7):
chatbot = gr.Chatbot(label="Ovis", layout="panel", height=600, show_copy_button=True, latex_delimiters=latex_delimiters_set)
text_input.render()
with gr.Row():
send_btn = gr.Button("Send", variant="primary")
clear_btn = gr.Button("Clear", variant="secondary")
def update_input_and_clear(selected):
if selected == "image + prompt":
visibility_updates = (gr.update(visible=True), gr.update(visible=False),
gr.update(visible=True), gr.update(visible=False))
else:
visibility_updates = (gr.update(visible=False), gr.update(visible=True),
gr.update(visible=False), gr.update(visible=True))
clear_chat_outputs = clear_chat()
return visibility_updates + clear_chat_outputs
input_type.change(fn=update_input_and_clear, inputs=input_type,
outputs=[image_input, video_input, image_examples_col, video_examples_col, chatbot, image_input, text_input, video_input])
send_click_event = send_btn.click(submit_chat, [chatbot, text_input], [chatbot, text_input]).then(ovis_chat,[chatbot, image_input, video_input],chatbot)
submit_event = text_input.submit(submit_chat, [chatbot, text_input], [chatbot, text_input]).then(ovis_chat,[chatbot, image_input, video_input],chatbot)
clear_btn.click(clear_chat, outputs=[chatbot, image_input, text_input, video_input])
demo.launch()