|
import subprocess |
|
subprocess.run('pip install flash-attn==2.7.0.post2 --no-build-isolation', env={'FLASH_ATTENTION_SKIP_CUDA_BUILD': "TRUE"}, shell=True) |
|
|
|
import spaces |
|
import os |
|
import re |
|
import logging |
|
from typing import List, Any |
|
from threading import Thread |
|
|
|
import torch |
|
import gradio as gr |
|
from transformers import AutoModelForCausalLM, TextIteratorStreamer |
|
from moviepy.editor import VideoFileClip |
|
from PIL import Image |
|
|
|
model_name = 'AIDC-AI/Ovis2-16B' |
|
|
|
use_thread = False |
|
|
|
IMAGE_MAX_PARTITION = 16 |
|
|
|
VIDEO_FRAME_NUMS = 32 |
|
VIDEO_MAX_PARTITION = 1 |
|
|
|
|
|
model = AutoModelForCausalLM.from_pretrained(model_name, |
|
torch_dtype=torch.bfloat16, |
|
multimodal_max_length=8192, |
|
trust_remote_code=True).to(device='cuda') |
|
text_tokenizer = model.get_text_tokenizer() |
|
visual_tokenizer = model.get_visual_tokenizer() |
|
streamer = TextIteratorStreamer(text_tokenizer, skip_prompt=True, skip_special_tokens=True) |
|
image_placeholder = '<image>' |
|
cur_dir = os.path.dirname(os.path.abspath(__file__)) |
|
|
|
logging.getLogger("httpx").setLevel(logging.WARNING) |
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
def initialize_gen_kwargs(): |
|
return { |
|
"max_new_tokens": 1536, |
|
"do_sample": False, |
|
"top_p": None, |
|
"top_k": None, |
|
"temperature": None, |
|
"repetition_penalty": 1.05, |
|
"eos_token_id": model.generation_config.eos_token_id, |
|
"pad_token_id": text_tokenizer.pad_token_id, |
|
"use_cache": True |
|
} |
|
|
|
def submit_chat(chatbot, text_input): |
|
response = '' |
|
chatbot.append((text_input, response)) |
|
return chatbot ,'' |
|
|
|
@spaces.GPU |
|
def ovis_chat(chatbot: List[List[str]], image_input: Any, video_input: Any): |
|
conversations, model_inputs = prepare_inputs(chatbot, image_input, video_input) |
|
gen_kwargs = initialize_gen_kwargs() |
|
|
|
with torch.inference_mode(): |
|
generate_func = lambda: model.generate(**model_inputs, **gen_kwargs, streamer=streamer) |
|
|
|
if use_thread: |
|
thread = Thread(target=generate_func) |
|
thread.start() |
|
else: |
|
generate_func() |
|
|
|
response = "" |
|
for new_text in streamer: |
|
response += new_text |
|
chatbot[-1][1] = response |
|
yield chatbot |
|
|
|
if use_thread: |
|
thread.join() |
|
|
|
log_conversation(chatbot) |
|
|
|
|
|
def prepare_inputs(chatbot: List[List[str]], image_input: Any, video_input: Any): |
|
|
|
|
|
|
|
|
|
conversations= [] |
|
|
|
for query, response in chatbot[:-1]: |
|
conversations.extend([ |
|
{"from": "human", "value": query}, |
|
{"from": "gpt", "value": response} |
|
]) |
|
|
|
last_query = chatbot[-1][0].replace(image_placeholder, '') |
|
conversations.append({"from": "human", "value": last_query}) |
|
|
|
max_partition = IMAGE_MAX_PARTITION |
|
|
|
if image_input is not None: |
|
for conv in conversations: |
|
if conv["from"] == "human": |
|
conv["value"] = f'{image_placeholder}\n{conv["value"]}' |
|
break |
|
max_partition = IMAGE_MAX_PARTITION |
|
image_input = [image_input] |
|
|
|
if video_input is not None: |
|
for conv in conversations: |
|
if conv["from"] == "human": |
|
conv["value"] = f'{image_placeholder}\n' * VIDEO_FRAME_NUMS + f'{conv["value"]}' |
|
break |
|
|
|
with VideoFileClip(video_input) as clip: |
|
total_frames = int(clip.fps * clip.duration) |
|
if total_frames <= VIDEO_FRAME_NUMS: |
|
sampled_indices = range(total_frames) |
|
else: |
|
stride = total_frames / VIDEO_FRAME_NUMS |
|
sampled_indices = [min(total_frames - 1, int((stride * i + stride * (i + 1)) / 2)) for i in range(VIDEO_FRAME_NUMS)] |
|
frames = [clip.get_frame(index / clip.fps) for index in sampled_indices] |
|
frames = [Image.fromarray(frame, mode='RGB') for frame in frames] |
|
image_input = frames |
|
max_partition = VIDEO_MAX_PARTITION |
|
|
|
logger.info(conversations) |
|
|
|
prompt, input_ids, pixel_values = model.preprocess_inputs(conversations, image_input, max_partition=max_partition) |
|
attention_mask = torch.ne(input_ids, text_tokenizer.pad_token_id) |
|
|
|
model_inputs = { |
|
"inputs": input_ids.unsqueeze(0).to(device=model.device), |
|
"attention_mask": attention_mask.unsqueeze(0).to(device=model.device), |
|
"pixel_values": [pixel_values.to(dtype=visual_tokenizer.dtype, device=visual_tokenizer.device)] if image_input is not None else [None] |
|
} |
|
|
|
return conversations, model_inputs |
|
|
|
def log_conversation(chatbot): |
|
logger.info("[OVIS_CONV_START]") |
|
[print(f'Q{i}:\n {request}\nA{i}:\n {answer}') for i, (request, answer) in enumerate(chatbot, 1)] |
|
logger.info("[OVIS_CONV_END]") |
|
|
|
def clear_chat(): |
|
return [], None, "", None |
|
|
|
with open(f"{cur_dir}/resource/logo.svg", "r", encoding="utf-8") as svg_file: |
|
svg_content = svg_file.read() |
|
font_size = "2.5em" |
|
svg_content = re.sub(r'(<svg[^>]*)(>)', rf'\1 height="{font_size}" style="vertical-align: middle; display: inline-block;"\2', svg_content) |
|
html = f""" |
|
<p align="center" style="font-size: {font_size}; line-height: 1;"> |
|
<span style="display: inline-block; vertical-align: middle;">{svg_content}</span> |
|
<span style="display: inline-block; vertical-align: middle;">{model_name.split('/')[-1]}</span> |
|
</p> |
|
<center><font size=3><b>Ovis</b> has been open-sourced on <a href='https://huggingface.co/{model_name}'>😊 Huggingface</a> and <a href='https://github.com/AIDC-AI/Ovis'>🌟 GitHub</a>. If you find Ovis useful, a like❤️ or a star🌟 would be appreciated.</font></center> |
|
""" |
|
|
|
latex_delimiters_set = [{ |
|
"left": "\\(", |
|
"right": "\\)", |
|
"display": False |
|
}, { |
|
"left": "\\begin{equation}", |
|
"right": "\\end{equation}", |
|
"display": True |
|
}, { |
|
"left": "\\begin{align}", |
|
"right": "\\end{align}", |
|
"display": True |
|
}, { |
|
"left": "\\begin{alignat}", |
|
"right": "\\end{alignat}", |
|
"display": True |
|
}, { |
|
"left": "\\begin{gather}", |
|
"right": "\\end{gather}", |
|
"display": True |
|
}, { |
|
"left": "\\begin{CD}", |
|
"right": "\\end{CD}", |
|
"display": True |
|
}, { |
|
"left": "\\[", |
|
"right": "\\]", |
|
"display": True |
|
}] |
|
|
|
text_input = gr.Textbox(label="prompt", placeholder="Enter your text here...", lines=1, container=False) |
|
with gr.Blocks(title=model_name.split('/')[-1], theme=gr.themes.Ocean()) as demo: |
|
gr.HTML(html) |
|
with gr.Row(): |
|
with gr.Column(scale=3): |
|
input_type = gr.Radio(choices=["image + prompt", "video + prompt"], label="Select input type:", value="image + prompt", elem_classes="my_radio") |
|
|
|
image_input = gr.Image(label="image", height=350, type="pil", visible=True) |
|
video_input = gr.Video(label="video", height=350, format='mp4', visible=False) |
|
with gr.Column(visible=True) as image_examples_col: |
|
image_examples = gr.Examples( |
|
examples=[ |
|
[f"{cur_dir}/examples/ovis2_math0.jpg", "Each face of the polyhedron shown is either a triangle or a square. Each square borders 4 triangles, and each triangle borders 3 squares. The polyhedron has 6 squares. How many triangles does it have?\n\nProvide a step-by-step solution to the problem, and conclude with 'the answer is' followed by the final solution."], |
|
[f"{cur_dir}/examples/ovis2_math1.jpg", "A large square touches another two squares, as shown in the picture. The numbers inside the smaller squares indicate their areas. What is the area of the largest square?\n\nProvide a step-by-step solution to the problem, and conclude with 'the answer is' followed by the final solution."], |
|
[f"{cur_dir}/examples/ovis2_figure0.png", "Explain this model."], |
|
[f"{cur_dir}/examples/ovis2_figure1.png", "Organize the notes about GRPO in the figure."], |
|
[f"{cur_dir}/examples/ovis2_multi0.jpg", "Posso avere un frappuccino e un caffè americano di taglia M? Quanto costa in totale?"], |
|
], |
|
inputs=[image_input, text_input] |
|
) |
|
|
|
def update_visibility_on_example(video_input, text_input): |
|
return (gr.update(visible=True), text_input) |
|
|
|
with gr.Column(visible=False) as video_examples_col: |
|
video_examples = gr.Examples( |
|
examples=[ |
|
[f"{cur_dir}/examples/video_demo_1.mp4", "Describe the video."] |
|
], |
|
inputs=[video_input, text_input], |
|
fn = update_visibility_on_example, |
|
run_on_click = True, |
|
outputs=[video_input, text_input] |
|
) |
|
|
|
with gr.Column(scale=7): |
|
chatbot = gr.Chatbot(label="Ovis", layout="panel", height=600, show_copy_button=True, latex_delimiters=latex_delimiters_set) |
|
text_input.render() |
|
with gr.Row(): |
|
send_btn = gr.Button("Send", variant="primary") |
|
clear_btn = gr.Button("Clear", variant="secondary") |
|
|
|
def update_input_and_clear(selected): |
|
if selected == "image + prompt": |
|
visibility_updates = (gr.update(visible=True), gr.update(visible=False), |
|
gr.update(visible=True), gr.update(visible=False)) |
|
else: |
|
visibility_updates = (gr.update(visible=False), gr.update(visible=True), |
|
gr.update(visible=False), gr.update(visible=True)) |
|
clear_chat_outputs = clear_chat() |
|
return visibility_updates + clear_chat_outputs |
|
|
|
input_type.change(fn=update_input_and_clear, inputs=input_type, |
|
outputs=[image_input, video_input, image_examples_col, video_examples_col, chatbot, image_input, text_input, video_input]) |
|
|
|
send_click_event = send_btn.click(submit_chat, [chatbot, text_input], [chatbot, text_input]).then(ovis_chat,[chatbot, image_input, video_input],chatbot) |
|
submit_event = text_input.submit(submit_chat, [chatbot, text_input], [chatbot, text_input]).then(ovis_chat,[chatbot, image_input, video_input],chatbot) |
|
clear_btn.click(clear_chat, outputs=[chatbot, image_input, text_input, video_input]) |
|
|
|
demo.launch() |
|
|