import gradio as gr import torch import math import os from transformers import AutoTokenizer, AutoModel, AutoProcessor from decord import VideoReader, cpu from PIL import Image from torchvision.transforms import Compose, Resize, ToTensor, Normalize # === 视觉预处理 === IMAGENET_MEAN = (0.485, 0.456, 0.406) IMAGENET_STD = (0.229, 0.224, 0.225) transform = Compose([ Resize((448, 448)), ToTensor(), Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD) ]) # === 模型加载 === PERSISTENT_DIR = "/data/internvl3_model" # 持久路径 MODEL_NAME = "OpenGVLab/InternVL3-14B" # 如果第一次运行:下载模型并缓存到 /data if not os.path.exists(PERSISTENT_DIR): print("Downloading model to persistent storage...") from transformers import snapshot_download snapshot_download(repo_id=MODEL_NAME, local_dir=PERSISTENT_DIR, trust_remote_code=True) # 模型加载(从本地) tokenizer = AutoTokenizer.from_pretrained(PERSISTENT_DIR, trust_remote_code=True) processor = AutoProcessor.from_pretrained(PERSISTENT_DIR, trust_remote_code=True) def split_model(model_path): from transformers import AutoConfig device_map = {} world_size = torch.cuda.device_count() config = AutoConfig.from_pretrained(model_path, trust_remote_code=True) num_layers = config.llm_config.num_hidden_layers num_layers_per_gpu = math.ceil(num_layers / (world_size - 0.5)) num_layers_per_gpu = [num_layers_per_gpu] * world_size num_layers_per_gpu[0] = math.ceil(num_layers_per_gpu[0] * 0.5) layer_cnt = 0 for i, num_layer in enumerate(num_layers_per_gpu): for _ in range(num_layer): device_map[f'language_model.model.layers.{layer_cnt}'] = i layer_cnt += 1 device_map['vision_model'] = 0 device_map['mlp1'] = 0 device_map['language_model.model.tok_embeddings'] = 0 device_map['language_model.model.embed_tokens'] = 0 device_map['language_model.output'] = 0 device_map['language_model.model.norm'] = 0 device_map['language_model.model.rotary_emb'] = 0 device_map['language_model.lm_head'] = 0 device_map[f'language_model.model.layers.{num_layers - 1}'] = 0 return device_map device_map = split_model(PERSISTENT_DIR) model = AutoModel.from_pretrained( PERSISTENT_DIR, torch_dtype=torch.bfloat16, low_cpu_mem_usage=True, use_flash_attn=True, trust_remote_code=True, device_map=device_map ).eval() # === 视频帧采样 === def extract_frames(video_path, num_frames=8): vr = VideoReader(video_path, ctx=cpu(0)) total_frames = len(vr) frame_indices = list(torch.linspace(0, total_frames - 1, num_frames).int().tolist()) images = [] for idx in frame_indices: img = Image.fromarray(vr[idx].asnumpy()).convert("RGB") img_tensor = transform(img) images.append(img_tensor) return torch.stack(images) # === 推理函数 === def evaluate_ar(video): frames = extract_frames(video.name).to(torch.bfloat16).cuda() prompt = "Evaluate the quality of AR occlusion and rendering in the uploaded video." # 可换成具体任务 num_patches = [1] * frames.shape[0] output, _ = model.chat( tokenizer, frames, prompt, generation_config=dict(max_new_tokens=512), num_patches_list=num_patches, history=None, return_history=True ) return output # === Gradio 界面 === gr.Interface( fn=evaluate_ar, inputs=gr.Video(label="Upload your AR video"), outputs="text", title="InternVL3 AR Evaluation (Single-turn)", description="Upload a video clip. The model will analyze AR occlusion and rendering quality." ).launch()