#!/usr/bin/env python3 """ 🎥 Video Content Safety Analysis - MiniGPT4-Video + 巨量引擎规则集成版 基于MiniGPT4-Video的真实视频内容分析 + 巨量引擎299条禁投规则检测 """ import os import gradio as gr import torch import gc import whisper import argparse import yaml import random import numpy as np import torch.backends.cudnn as cudnn from minigpt4.common.eval_utils import init_model from minigpt4.conversation.conversation import CONV_VISION import tempfile import shutil import cv2 import webvtt import moviepy.editor as mp from torchvision import transforms from datetime import timedelta from moviepy.editor import VideoFileClip # 导入巨量引擎禁投规则引擎 from prohibited_rules import ProhibitedRulesEngine # 设置中国镜像 os.environ['HF_ENDPOINT'] = 'https://hf-mirror.com' # ZeroGPU装饰器 try: import spaces GPU_AVAILABLE = True print("✅ ZeroGPU spaces 可用") except ImportError: print("⚠️ ZeroGPU spaces 不可用,使用CPU模式") GPU_AVAILABLE = False # 创建一个空的装饰器 class spaces: @staticmethod def GPU(duration=60): def decorator(func): return func return decorator # 全局变量 model = None vis_processor = None whisper_model = None args = None seed = 42 # 初始化巨量引擎规则引擎 rules_engine = ProhibitedRulesEngine() print("✅ 巨量引擎299条禁投规则引擎初始化完成") # ======================== MiniGPT4-Video 核心函数 ======================== def format_timestamp(seconds): """格式化时间戳为VTT格式""" td = timedelta(seconds=seconds) total_seconds = int(td.total_seconds()) milliseconds = int(td.microseconds / 1000) hours, remainder = divmod(total_seconds, 3600) minutes, seconds = divmod(remainder, 60) return f"{hours:02}:{minutes:02}:{seconds:02}.{milliseconds:03}" def extract_video_info(video_path, max_images_length): """提取视频信息""" clip = VideoFileClip(video_path) total_num_frames = int(clip.duration * clip.fps) clip.close() sampling_interval = int(total_num_frames / max_images_length) if sampling_interval == 0: sampling_interval = 1 return sampling_interval, clip.fps def time_to_milliseconds(time_str): """将时间格式转换为毫秒""" h, m, s = map(float, time_str.split(':')) return int((h * 3600 + m * 60 + s) * 1000) def extract_subtitles(subtitle_path): """提取字幕""" if not subtitle_path or not os.path.exists(subtitle_path): return [] subtitles = [] try: for caption in webvtt.read(subtitle_path): start_ms = time_to_milliseconds(caption.start) end_ms = time_to_milliseconds(caption.end) text = caption.text.strip().replace('\n', ' ') subtitles.append((start_ms, end_ms, text)) except: return [] return subtitles def find_subtitle(subtitles, frame_count, fps): """查找对应帧的字幕""" if not subtitles: return None frame_time = (frame_count / fps) * 1000 left, right = 0, len(subtitles) - 1 while left <= right: mid = (left + right) // 2 start, end, subtitle_text = subtitles[mid] if start <= frame_time <= end: return subtitle_text elif frame_time < start: right = mid - 1 else: left = mid + 1 return None def match_frames_and_subtitles(video_path, subtitles, sampling_interval, max_sub_len, fps, max_frames): """匹配视频帧和字幕""" global vis_processor cap = cv2.VideoCapture(video_path) images = [] frame_count = 0 img_placeholder = "" subtitle_text_in_interval = "" history_subtitles = {} number_of_words = 0 transform = transforms.Compose([ transforms.ToPILImage(), ]) while cap.isOpened(): ret, frame = cap.read() if not ret: break if len(subtitles) > 0: frame_subtitle = find_subtitle(subtitles, frame_count, fps) if frame_subtitle and not history_subtitles.get(frame_subtitle, False): subtitle_text_in_interval += frame_subtitle + " " history_subtitles[frame_subtitle] = True if frame_count % sampling_interval == 0: frame = transform(frame[:,:,::-1]) # 转换为RGB frame = vis_processor(frame) images.append(frame) img_placeholder += '' if subtitle_text_in_interval != "" and number_of_words < max_sub_len: img_placeholder += f'{subtitle_text_in_interval}' number_of_words += len(subtitle_text_in_interval.split(' ')) subtitle_text_in_interval = "" frame_count += 1 if len(images) >= max_frames: break cap.release() cv2.destroyAllWindows() if len(images) == 0: return None, None images = torch.stack(images) return images, img_placeholder def extract_audio(video_path, audio_path): """提取音频""" video_clip = mp.VideoFileClip(video_path) audio_clip = video_clip.audio audio_clip.write_audiofile(audio_path, codec="libmp3lame", bitrate="320k", verbose=False, logger=None) video_clip.close() def get_subtitles(video_path): """生成字幕""" global whisper_model if whisper_model is None: return None audio_dir = "workspace/inference_subtitles/mp3" subtitle_dir = "workspace/inference_subtitles" os.makedirs(subtitle_dir, exist_ok=True) os.makedirs(audio_dir, exist_ok=True) video_id = video_path.split('/')[-1].split('.')[0] audio_path = f"{audio_dir}/{video_id}.mp3" subtitle_path = f"{subtitle_dir}/{video_id}.vtt" # 如果字幕已存在,直接返回 if os.path.exists(subtitle_path): return subtitle_path try: extract_audio(video_path, audio_path) result = whisper_model.transcribe(audio_path, language="en") # 创建VTT文件 with open(subtitle_path, "w", encoding="utf-8") as vtt_file: vtt_file.write("WEBVTT\n\n") for segment in result['segments']: start = format_timestamp(segment['start']) end = format_timestamp(segment['end']) text = segment['text'] vtt_file.write(f"{start} --> {end}\n{text}\n\n") return subtitle_path except Exception as e: print(f"字幕生成错误: {e}") return None def prepare_input(video_path, subtitle_path, instruction): """准备输入""" global args # 根据模型设置参数 if args and "mistral" in args.ckpt: max_frames = 90 max_sub_len = 800 else: max_frames = 45 max_sub_len = 400 sampling_interval, fps = extract_video_info(video_path, max_frames) subtitles = extract_subtitles(subtitle_path) frames_features, input_placeholder = match_frames_and_subtitles( video_path, subtitles, sampling_interval, max_sub_len, fps, max_frames ) if input_placeholder: input_placeholder += "\n" + instruction else: input_placeholder = instruction return frames_features, input_placeholder def model_generate(*model_args, **kwargs): """模型生成函数""" global model with model.maybe_autocast(): output = model.llama_model.generate(*model_args, **kwargs) return output def generate_prediction(video_path, instruction, gen_subtitles=True, stream=False): """生成预测结果""" global model, args, seed if gen_subtitles: subtitle_path = get_subtitles(video_path) else: subtitle_path = None prepared_images, prepared_instruction = prepare_input(video_path, subtitle_path, instruction) if prepared_images is None: return "视频无法打开,请检查视频路径" length = len(prepared_images) prepared_images = prepared_images.unsqueeze(0) conv = CONV_VISION.copy() conv.system = "" conv.append_message(conv.roles[0], prepared_instruction) conv.append_message(conv.roles[1], None) prompt = [conv.get_prompt()] # 设置随机种子 setup_seeds(seed) try: answers = model.generate( prepared_images, prompt, max_new_tokens=args.max_new_tokens if args else 512, do_sample=True, lengths=[length], num_beams=1 ) return answers[0] except Exception as e: return f"生成预测时出错: {str(e)}" # ======================== 巨量引擎规则检测函数 ======================== def format_violations_report(violations_result): """格式化违规检测报告""" if not violations_result["has_violations"]: return """ 🛡️ **巨量引擎规则检测结果**: ✅ 无违规内容 - 已检测规则: 299条巨量引擎禁投规则 - 检测维度: 低危(P1) + 中危(P2) + 高危(P3) - 检测结果: 内容符合平台规范 """ report = f""" 🚨 **巨量引擎规则检测结果**: ⚠️ 发现 {violations_result["total_violations"]} 项违规 📊 **违规统计**: - 🔴 高危违规(P3): {violations_result["high_risk"]["count"]} 项 - 🟡 中危违规(P2): {violations_result["medium_risk"]["count"]} 项 - 🟠 低危违规(P1): {violations_result["low_risk"]["count"]} 项 📋 **详细违规列表**: """ # 按风险等级排序显示违规 for violation in sorted(violations_result["all_violations"], key=lambda x: {"P3": 3, "P2": 2, "P1": 1}[x["risk_level"]], reverse=True): risk_icon = {"P3": "🚨", "P2": "⚠️", "P1": "💭"}[violation["risk_level"]] report += f""" {risk_icon} **{violation["risk_level"]} - {violation["category"]}** 规则: {violation["description"]} 匹配词: "{violation["matched_keyword"]}" 规则ID: {violation["rule_id"]} """ return report def get_overall_risk_level(violations_result): """获取综合风险等级""" if not violations_result["has_violations"]: return "✅ P3 (安全) - 内容健康,符合平台规范" if violations_result["high_risk"]["count"] > 0: return f"🚨 P0 (极高危) - 发现 {violations_result['high_risk']['count']} 项高危违规,禁止投放" elif violations_result["medium_risk"]["count"] > 2: return f"⚠️ P1 (高危) - 发现 {violations_result['medium_risk']['count']} 项中危违规,需严格审核" elif violations_result["medium_risk"]["count"] > 0: return f"⚠️ P1 (中危) - 发现 {violations_result['medium_risk']['count']} 项中危违规,需要审核" else: return f"⚡ P2 (低危) - 发现 {violations_result['low_risk']['count']} 项低危违规,建议关注" # ======================== 应用主要函数 ======================== def setup_seeds(seed): """设置随机种子""" random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed(seed) cudnn.benchmark = False cudnn.deterministic = True def optimize_gpu_memory(): """GPU内存优化""" print("🔍 开始GPU内存优化...") # 设置环境变量优化内存分配 os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:256,garbage_collection_threshold:0.6' os.environ['CUDA_LAUNCH_BLOCKING'] = '1' if torch.cuda.is_available(): print(f"🔍 GPU: {torch.cuda.get_device_name(0)}") print(f"💾 总显存: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB") # 强制清理所有GPU缓存 torch.cuda.empty_cache() torch.cuda.ipc_collect() gc.collect() # 设置内存增长策略 torch.backends.cudnn.benchmark = False torch.backends.cudnn.deterministic = True print(f"💾 清理后可用显存: {(torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated(0)) / 1024**3:.1f} GB") def get_arguments(): """获取参数配置""" parser = argparse.ArgumentParser(description="MiniGPT4-Video参数") parser.add_argument("--cfg-path", help="配置文件路径", default="test_configs/minigpt4_optimized_config.yaml") parser.add_argument("--ckpt", type=str, default='checkpoints/video_llama_checkpoint_last.pth', help="模型检查点路径") parser.add_argument("--max_new_tokens", type=int, default=512, help="最大生成token数") parser.add_argument("--lora_r", type=int, default=96, help="LoRA rank") parser.add_argument("--lora_alpha", type=int, default=24, help="LoRA alpha") parser.add_argument("--options", nargs="+", help="覆盖配置选项") return parser.parse_args() def load_minigpt4_model(): """加载MiniGPT4-Video模型""" global model, vis_processor, whisper_model, args, seed if model is not None: return model, vis_processor, whisper_model try: print("🔄 正在加载MiniGPT4-Video模型...") # 获取参数 args = get_arguments() # 加载配置 config_path = args.cfg_path if not os.path.exists(config_path): config_path = "test_configs/llama2_test_config.yaml" # 回退到默认配置 with open(config_path) as file: config = yaml.load(file, Loader=yaml.FullLoader) seed = config['run']['seed'] setup_seeds(seed) # GPU内存优化 optimize_gpu_memory() print("🚀 开始初始化MiniGPT4-Video模型...") model, vis_processor, whisper_gpu_id, minigpt4_gpu_id, answer_module_gpu_id = init_model(args) # 清理缓存 if torch.cuda.is_available(): torch.cuda.empty_cache() print(f"💾 模型加载后显存使用: {torch.cuda.memory_allocated(0) / 1024**3:.1f} GB") print("🚀 开始初始化Whisper模型...") whisper_model = whisper.load_model("base").to(f"cuda:{whisper_gpu_id}" if torch.cuda.is_available() else "cpu") if torch.cuda.is_available(): print(f"💾 全部加载后显存使用: {torch.cuda.memory_allocated(0) / 1024**3:.1f} GB") print("✅ 所有模型加载完成!") return model, vis_processor, whisper_model except Exception as e: print(f"❌ 模型加载失败: {e}") print("🔄 回退到模拟模式...") return None, None, None @spaces.GPU(duration=120) def analyze_video_with_minigpt4(video_file, instruction): """使用MiniGPT4-Video分析视频内容并进行巨量引擎规则检测""" if video_file is None: return "❌ 请上传视频文件", "无法评估" try: # 加载模型 model_loaded, vis_proc, whisper_loaded = load_minigpt4_model() if model_loaded is None: # 模拟模式 return f""" 🎬 **视频内容分析结果 (模拟模式)** 📋 **基本信息**: - 视频文件: {video_file} - 分析指令: {instruction} ⚠️ **注意**: 当前运行在模拟模式,真实模型加载失败 请检查模型文件和配置是否正确 🛡️ **巨量引擎规则检测**: 仅在真实模式下可用 """, "⚠️ 模拟模式" print(f"🔄 开始分析视频: {video_file}") print(f"📝 分析指令: {instruction}") # 复制视频到临时路径(如果需要) temp_video_path = video_file if not os.path.exists(video_file): # 如果是Gradio的临时文件,复制到工作目录 temp_dir = "workspace/tmp" os.makedirs(temp_dir, exist_ok=True) temp_video_path = os.path.join(temp_dir, "analysis_video.mp4") shutil.copy2(video_file, temp_video_path) # 使用MiniGPT4-Video进行真实分析 if not instruction or instruction.strip() == "": instruction = "请详细分析这个视频的内容,包括场景、人物、动作、对话等,并描述所有可见和可听的元素。" # 调用MiniGPT4-Video的生成函数 prediction = generate_prediction( video_path=temp_video_path, instruction=instruction, gen_subtitles=True, # 生成字幕 stream=False ) # 🚨 巨量引擎规则检测 🚨 print("🔍 开始巨量引擎299条规则检测...") violations_result = rules_engine.check_all_content(prediction, instruction) # 格式化完整分析报告 enhanced_result = f""" 🎬 **MiniGPT4-Video 视频内容分析 + 巨量引擎规则检测报告** 📋 **基本信息**: - 视频文件: {os.path.basename(video_file)} - 分析设备: {torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'CPU模式'} - 分析指令: {instruction} 🔍 **视频内容描述**: {prediction} {format_violations_report(violations_result)} 📊 **技术信息**: - 内容理解: MiniGPT4-Video + Whisper - 规则引擎: 巨量引擎299条禁投规则 - 检测等级: P1(低危) + P2(中危) + P3(高危) - 分析模式: 多模态理解 (视觉+语音+文本) 💡 **说明**: 基于MiniGPT4-Video的深度内容理解,结合巨量引擎完整禁投规则库进行专业违规检测。 """ # 获取综合风险等级 safety_score = get_overall_risk_level(violations_result) return enhanced_result, safety_score except Exception as e: error_msg = f""" ❌ **分析过程中出错** 错误信息: {str(e)} 🔄 **可能的解决方案**: 1. 检查视频文件格式 (建议MP4) 2. 确认模型文件是否正确加载 3. 检查GPU内存是否充足 4. 验证配置文件路径 💡 **提示**: 如果问题持续,请检查模型和依赖项安装 """ return error_msg, "⚠️ 错误" def create_app(): """创建Gradio应用""" interface = gr.Interface( fn=analyze_video_with_minigpt4, inputs=[ gr.Video(label="上传视频文件"), gr.Textbox( label="分析指令", value="请详细分析这个视频的内容,包括场景、人物、动作、对话等,并描述所有可见和可听的元素。", placeholder="输入您希望AI如何分析这个视频...", lines=3 ) ], outputs=[ gr.Textbox(label="MiniGPT4-Video 内容分析 + 巨量引擎规则检测", lines=20), gr.Textbox(label="巨量引擎风险评级") ], title="🎥 智能视频内容安全分析 - MiniGPT4-Video + 巨量引擎", description=""" ## 🎬 基于MiniGPT4-Video + 巨量引擎299条禁投规则的专业视频安全检测系统 ⚡ **ZeroGPU加速** | 🎬 **MiniGPT4-Video** | 🎙️ **Whisper语音** | 🛡️ **巨量引擎299条规则** **🔥 核心功能:** - 🎞️ **深度视频理解**: MiniGPT4-Video多模态分析 - 🎙️ **语音转文字**: Whisper自动生成字幕 - 🛡️ **专业违规检测**: 巨量引擎完整禁投规则库 - 📊 **智能风险评级**: P0-P3四级风险等级 **🎯 检测维度:** - **高危(P3)**: 违法出版物、烟草、医疗等严重违规 - **中危(P2)**: 赌博周边、房地产、金融等中等风险 - **低危(P1)**: 化妆品、汽车、游戏等轻微风险 **📋 规则覆盖:** 涵盖化妆品类、汽车类、游戏类、赌博类、房地产类、工具软件类、教育培训类、 金融类、医疗类、烟草类等全部299条巨量引擎禁投规则 """, examples=[ [None, "分析这个视频是否包含禁投内容"], [None, "检测视频中是否有巨量引擎禁止的产品或服务"], [None, "评估视频内容的投放风险等级"], [None, "详细描述视频内容并进行合规检查"] ], cache_examples=False ) return interface def main(): """主函数""" print("🚀 启动MiniGPT4-Video + 巨量引擎视频安全分析应用") print("🎬 MiniGPT4-Video: 深度视频内容理解") print("🛡️ 巨量引擎: 299条禁投规则检测") if torch.cuda.is_available(): print(f"✅ GPU可用: {torch.cuda.get_device_name(0)}") else: print("⚠️ 使用CPU模式") # 创建必要的目录 os.makedirs("workspace/tmp", exist_ok=True) os.makedirs("workspace/inference_subtitles", exist_ok=True) os.makedirs("workspace/inference_subtitles/mp3", exist_ok=True) print("📁 工作目录准备完成") print("🚀 正在启动Gradio应用...") app = create_app() # 启动应用 app.launch( share=True, server_name="0.0.0.0", server_port=7860, show_error=True ) if __name__ == "__main__": main()