Spaces:
Runtime error
Runtime error
File size: 9,014 Bytes
35a9ed4 bdee200 35a9ed4 2a274cc 5bdf407 35a9ed4 b29876f 2a274cc 35a9ed4 e856606 bdee200 19540cf bdee200 893825d bdee200 19540cf 488936c 6207473 35a9ed4 5672cc2 678631b 5672cc2 678631b 13d4a9a 678631b 5672cc2 2a274cc e5eaa21 854614c e5eaa21 854614c e5eaa21 5bdf407 35a9ed4 e5eaa21 f289e91 35a9ed4 8c2e68c e5eaa21 2a274cc e5eaa21 f289e91 e5eaa21 854614c e5eaa21 f289e91 dc9bdbf 8c2e68c e5eaa21 af1dd1a 2a274cc d28dde6 2a274cc c0784bd 5bdf407 44284c4 e5eaa21 f289e91 7f75f49 44284c4 5672cc2 e5eaa21 f289e91 e5eaa21 854614c e5eaa21 6feb9f3 35a9ed4 9d69626 35a9ed4 b47ae2e a945d67 5672cc2 27ab64e 86023a7 5672cc2 5439a6f 5672cc2 44284c4 5672cc2 35a9ed4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from PIL import Image
import numpy as np
import os
import tempfile
import spaces
import gradio as gr
import subprocess
import sys
def install_flash_attn_wheel():
flash_attn_wheel_url = "https://github.com/Dao-AILab/flash-attention/releases/download/v2.6.3/flash_attn-2.6.3+cu123torch2.4cxx11abiFALSE-cp310-cp310-linux_x86_64.whl"
try:
# Call pip to install the wheel file
subprocess.check_call([sys.executable, "-m", "pip", "install", flash_attn_wheel_url])
print("Wheel installed successfully!")
except subprocess.CalledProcessError as e:
print(f"Failed to install the flash attnetion wheel. Error: {e}")
install_flash_attn_wheel()
import cv2
try:
from mmengine.visualization import Visualizer
except ImportError:
Visualizer = None
print("Warning: mmengine is not installed, visualization is disabled.")
# Load the model and tokenizer
model_path = "ByteDance/Sa2VA-4B"
model = AutoModelForCausalLM.from_pretrained(
model_path,
torch_dtype="auto",
device_map="cuda:0",
trust_remote_code=True,
).eval().cuda()
tokenizer = AutoTokenizer.from_pretrained(
model_path,
trust_remote_code = True,
)
from third_parts import VideoReader
def read_video(video_path, video_interval):
vid_frames = VideoReader(video_path)[::video_interval]
temp_dir = tempfile.mkdtemp()
os.makedirs(temp_dir, exist_ok=True)
image_paths = [] # List to store paths of saved images
for frame_idx in range(len(vid_frames)):
frame_image = vid_frames[frame_idx]
frame_image = frame_image[..., ::-1] # BGR (opencv system) to RGB (numpy system)
frame_image = Image.fromarray(frame_image)
vid_frames[frame_idx] = frame_image
# Save the frame as a .jpg file in the temporary folder
image_path = os.path.join(temp_dir, f"frame_{frame_idx:04d}.jpg")
frame_image.save(image_path, format="JPEG")
# Append the image path to the list
image_paths.append(image_path)
return vid_frames, image_paths
def visualize(pred_mask, image_path, work_dir):
visualizer = Visualizer()
img = cv2.imread(image_path)
visualizer.set_image(img)
visualizer.draw_binary_masks(pred_mask, colors='g', alphas=0.4)
visual_result = visualizer.get_image()
output_path = os.path.join(work_dir, os.path.basename(image_path))
cv2.imwrite(output_path, visual_result)
return output_path
# μ½λ μλ¨μ import μΆκ°
from deep_translator import GoogleTranslator
# λ²μ ν¨μ μμ
def translate_to_korean(text):
try:
translator = GoogleTranslator(source='en', target='ko')
return translator.translate(text)
except Exception as e:
print(f"Translation error: {e}")
return text
@spaces.GPU
def image_vision(image_input_path, prompt):
# νκΈ μ
λ ₯ νμΈ
is_korean = any(ord('κ°') <= ord(char) <= ord('ν£') for char in prompt)
image_path = image_input_path
text_prompts = f"<image>{prompt}"
image = Image.open(image_path).convert('RGB')
input_dict = {
'image': image,
'text': text_prompts,
'past_text': '',
'mask_prompts': None,
'tokenizer': tokenizer,
}
return_dict = model.predict_forward(**input_dict)
print(return_dict)
answer = return_dict["prediction"]
# νκΈ ν둬ννΈμΈ κ²½μ° μλ΅μ νκΈλ‘ λ²μ
if is_korean:
# [SEG]λ 보쑴νλ©΄μ λλ¨Έμ§ ν
μ€νΈλ§ λ²μ
if '[SEG]' in answer:
parts = answer.split('[SEG]')
translated_parts = [translate_to_korean(part.strip()) for part in parts]
answer = '[SEG]'.join(translated_parts)
else:
answer = translate_to_korean(answer)
seg_image = return_dict["prediction_masks"]
if '[SEG]' in answer and Visualizer is not None:
pred_masks = seg_image[0]
temp_dir = tempfile.mkdtemp()
pred_mask = pred_masks
os.makedirs(temp_dir, exist_ok=True)
seg_result = visualize(pred_mask, image_input_path, temp_dir)
return answer, seg_result
else:
return answer, None
@spaces.GPU(duration=80)
def video_vision(video_input_path, prompt, video_interval):
# νκΈ μ
λ ₯ νμΈ
is_korean = any(ord('κ°') <= ord(char) <= ord('ν£') for char in prompt)
cap = cv2.VideoCapture(video_input_path)
original_fps = cap.get(cv2.CAP_PROP_FPS)
frame_skip_factor = video_interval
new_fps = original_fps / frame_skip_factor
vid_frames, image_paths = read_video(video_input_path, video_interval)
question = f"<image>{prompt}"
result = model.predict_forward(
video=vid_frames,
text=question,
tokenizer=tokenizer,
)
prediction = result['prediction']
print(prediction)
# νκΈ ν둬ννΈμΈ κ²½μ° μλ΅μ νκΈλ‘ λ²μ
if is_korean:
if '[SEG]' in prediction:
parts = prediction.split('[SEG]')
translated_parts = [translate_to_korean(part.strip()) for part in parts]
prediction = '[SEG]'.join(translated_parts)
else:
prediction = translate_to_korean(prediction)
if '[SEG]' in prediction and Visualizer is not None:
_seg_idx = 0
pred_masks = result['prediction_masks'][_seg_idx]
seg_frames = []
for frame_idx in range(len(vid_frames)):
pred_mask = pred_masks[frame_idx]
temp_dir = tempfile.mkdtemp()
os.makedirs(temp_dir, exist_ok=True)
seg_frame = visualize(pred_mask, image_paths[frame_idx], temp_dir)
seg_frames.append(seg_frame)
output_video = "output_video.mp4"
frame = cv2.imread(seg_frames[0])
height, width, layers = frame.shape
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
video = cv2.VideoWriter(output_video, fourcc, new_fps, (width, height))
for img_path in seg_frames:
frame = cv2.imread(img_path)
video.write(frame)
video.release()
print(f"Video created successfully at {output_video}")
return prediction, output_video
else:
return prediction, None
# Gradio UI
with gr.Blocks(analytics_enabled=False) as demo:
with gr.Column():
gr.Markdown("# Sa2VA: Marrying SAM2 with LLaVA for Dense Grounded Understanding of Images and Videos")
gr.HTML("""
<div style="display:flex;column-gap:4px;">
<a href="https://github.com/magic-research/Sa2VA">
<img src='https://img.shields.io/badge/GitHub-Repo-blue'>
</a>
<a href="https://arxiv.org/abs/2501.04001">
<img src='https://img.shields.io/badge/ArXiv-Paper-red'>
</a>
<a href="https://huggingface.co/spaces/fffiloni/Sa2VA-simple-demo?duplicate=true">
<img src="https://huggingface.co/datasets/huggingface/badges/resolve/main/duplicate-this-space-sm.svg" alt="Duplicate this Space">
</a>
<a href="https://huggingface.co/fffiloni">
<img src="https://huggingface.co/datasets/huggingface/badges/resolve/main/follow-me-on-HF-sm-dark.svg" alt="Follow me on HF">
</a>
</div>
""")
with gr.Tab("Single Image"):
with gr.Row():
with gr.Column():
image_input = gr.Image(label="Image IN", type="filepath")
with gr.Row():
instruction = gr.Textbox(label="Instruction", scale=4)
submit_image_btn = gr.Button("Submit", scale=1)
with gr.Column():
output_res = gr.Textbox(label="Response")
output_image = gr.Image(label="Segmentation", type="numpy")
submit_image_btn.click(
fn = image_vision,
inputs = [image_input, instruction],
outputs = [output_res, output_image]
)
with gr.Tab("Video"):
with gr.Row():
with gr.Column():
video_input = gr.Video(label="Video IN")
frame_interval = gr.Slider(label="Frame interval", step=1, minimum=1, maximum=12, value=6)
with gr.Row():
vid_instruction = gr.Textbox(label="Instruction", scale=4)
submit_video_btn = gr.Button("Submit", scale=1)
with gr.Column():
vid_output_res = gr.Textbox(label="Response")
output_video = gr.Video(label="Segmentation")
submit_video_btn.click(
fn = video_vision,
inputs = [video_input, vid_instruction, frame_interval],
outputs = [vid_output_res, output_video]
)
demo.queue().launch(show_api=False, show_error=True) |