weiyi01191's picture
Update app.py
c49cd13
raw
history blame
21.6 kB
#!/usr/bin/env python3
"""
🎥 Video Content Safety Analysis - MiniGPT4-Video + 巨量引擎规则集成版
基于MiniGPT4-Video的真实视频内容分析 + 巨量引擎299条禁投规则检测
"""
import os
import gradio as gr
import torch
import gc
import whisper
import argparse
import yaml
import random
import numpy as np
import torch.backends.cudnn as cudnn
from minigpt4.common.eval_utils import init_model
from minigpt4.conversation.conversation import CONV_VISION
import tempfile
import shutil
import cv2
import webvtt
import moviepy.editor as mp
from torchvision import transforms
from datetime import timedelta
from moviepy.editor import VideoFileClip
# 导入巨量引擎禁投规则引擎
from prohibited_rules import ProhibitedRulesEngine
# 设置中国镜像
os.environ['HF_ENDPOINT'] = 'https://hf-mirror.com'
# ZeroGPU装饰器
try:
import spaces
GPU_AVAILABLE = True
print("✅ ZeroGPU spaces 可用")
except ImportError:
print("⚠️ ZeroGPU spaces 不可用,使用CPU模式")
GPU_AVAILABLE = False
# 创建一个空的装饰器
class spaces:
@staticmethod
def GPU(duration=60):
def decorator(func):
return func
return decorator
# 全局变量
model = None
vis_processor = None
whisper_model = None
args = None
seed = 42
# 初始化巨量引擎规则引擎
rules_engine = ProhibitedRulesEngine()
print("✅ 巨量引擎299条禁投规则引擎初始化完成")
# ======================== MiniGPT4-Video 核心函数 ========================
def format_timestamp(seconds):
"""格式化时间戳为VTT格式"""
td = timedelta(seconds=seconds)
total_seconds = int(td.total_seconds())
milliseconds = int(td.microseconds / 1000)
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
return f"{hours:02}:{minutes:02}:{seconds:02}.{milliseconds:03}"
def extract_video_info(video_path, max_images_length):
"""提取视频信息"""
clip = VideoFileClip(video_path)
total_num_frames = int(clip.duration * clip.fps)
clip.close()
sampling_interval = int(total_num_frames / max_images_length)
if sampling_interval == 0:
sampling_interval = 1
return sampling_interval, clip.fps
def time_to_milliseconds(time_str):
"""将时间格式转换为毫秒"""
h, m, s = map(float, time_str.split(':'))
return int((h * 3600 + m * 60 + s) * 1000)
def extract_subtitles(subtitle_path):
"""提取字幕"""
if not subtitle_path or not os.path.exists(subtitle_path):
return []
subtitles = []
try:
for caption in webvtt.read(subtitle_path):
start_ms = time_to_milliseconds(caption.start)
end_ms = time_to_milliseconds(caption.end)
text = caption.text.strip().replace('\n', ' ')
subtitles.append((start_ms, end_ms, text))
except:
return []
return subtitles
def find_subtitle(subtitles, frame_count, fps):
"""查找对应帧的字幕"""
if not subtitles:
return None
frame_time = (frame_count / fps) * 1000
left, right = 0, len(subtitles) - 1
while left <= right:
mid = (left + right) // 2
start, end, subtitle_text = subtitles[mid]
if start <= frame_time <= end:
return subtitle_text
elif frame_time < start:
right = mid - 1
else:
left = mid + 1
return None
def match_frames_and_subtitles(video_path, subtitles, sampling_interval, max_sub_len, fps, max_frames):
"""匹配视频帧和字幕"""
global vis_processor
cap = cv2.VideoCapture(video_path)
images = []
frame_count = 0
img_placeholder = ""
subtitle_text_in_interval = ""
history_subtitles = {}
number_of_words = 0
transform = transforms.Compose([
transforms.ToPILImage(),
])
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if len(subtitles) > 0:
frame_subtitle = find_subtitle(subtitles, frame_count, fps)
if frame_subtitle and not history_subtitles.get(frame_subtitle, False):
subtitle_text_in_interval += frame_subtitle + " "
history_subtitles[frame_subtitle] = True
if frame_count % sampling_interval == 0:
frame = transform(frame[:,:,::-1]) # 转换为RGB
frame = vis_processor(frame)
images.append(frame)
img_placeholder += '<Img><ImageHere>'
if subtitle_text_in_interval != "" and number_of_words < max_sub_len:
img_placeholder += f'<Cap>{subtitle_text_in_interval}'
number_of_words += len(subtitle_text_in_interval.split(' '))
subtitle_text_in_interval = ""
frame_count += 1
if len(images) >= max_frames:
break
cap.release()
cv2.destroyAllWindows()
if len(images) == 0:
return None, None
images = torch.stack(images)
return images, img_placeholder
def extract_audio(video_path, audio_path):
"""提取音频"""
video_clip = mp.VideoFileClip(video_path)
audio_clip = video_clip.audio
audio_clip.write_audiofile(audio_path, codec="libmp3lame", bitrate="320k", verbose=False, logger=None)
video_clip.close()
def get_subtitles(video_path):
"""生成字幕"""
global whisper_model
if whisper_model is None:
return None
audio_dir = "workspace/inference_subtitles/mp3"
subtitle_dir = "workspace/inference_subtitles"
os.makedirs(subtitle_dir, exist_ok=True)
os.makedirs(audio_dir, exist_ok=True)
video_id = video_path.split('/')[-1].split('.')[0]
audio_path = f"{audio_dir}/{video_id}.mp3"
subtitle_path = f"{subtitle_dir}/{video_id}.vtt"
# 如果字幕已存在,直接返回
if os.path.exists(subtitle_path):
return subtitle_path
try:
extract_audio(video_path, audio_path)
result = whisper_model.transcribe(audio_path, language="en")
# 创建VTT文件
with open(subtitle_path, "w", encoding="utf-8") as vtt_file:
vtt_file.write("WEBVTT\n\n")
for segment in result['segments']:
start = format_timestamp(segment['start'])
end = format_timestamp(segment['end'])
text = segment['text']
vtt_file.write(f"{start} --> {end}\n{text}\n\n")
return subtitle_path
except Exception as e:
print(f"字幕生成错误: {e}")
return None
def prepare_input(video_path, subtitle_path, instruction):
"""准备输入"""
global args
# 根据模型设置参数
if args and "mistral" in args.ckpt:
max_frames = 90
max_sub_len = 800
else:
max_frames = 45
max_sub_len = 400
sampling_interval, fps = extract_video_info(video_path, max_frames)
subtitles = extract_subtitles(subtitle_path)
frames_features, input_placeholder = match_frames_and_subtitles(
video_path, subtitles, sampling_interval, max_sub_len, fps, max_frames
)
if input_placeholder:
input_placeholder += "\n" + instruction
else:
input_placeholder = instruction
return frames_features, input_placeholder
def model_generate(*model_args, **kwargs):
"""模型生成函数"""
global model
with model.maybe_autocast():
output = model.llama_model.generate(*model_args, **kwargs)
return output
def generate_prediction(video_path, instruction, gen_subtitles=True, stream=False):
"""生成预测结果"""
global model, args, seed
if gen_subtitles:
subtitle_path = get_subtitles(video_path)
else:
subtitle_path = None
prepared_images, prepared_instruction = prepare_input(video_path, subtitle_path, instruction)
if prepared_images is None:
return "视频无法打开,请检查视频路径"
length = len(prepared_images)
prepared_images = prepared_images.unsqueeze(0)
conv = CONV_VISION.copy()
conv.system = ""
conv.append_message(conv.roles[0], prepared_instruction)
conv.append_message(conv.roles[1], None)
prompt = [conv.get_prompt()]
# 设置随机种子
setup_seeds(seed)
try:
answers = model.generate(
prepared_images,
prompt,
max_new_tokens=args.max_new_tokens if args else 512,
do_sample=True,
lengths=[length],
num_beams=1
)
return answers[0]
except Exception as e:
return f"生成预测时出错: {str(e)}"
# ======================== 巨量引擎规则检测函数 ========================
def format_violations_report(violations_result):
"""格式化违规检测报告"""
if not violations_result["has_violations"]:
return """
🛡️ **巨量引擎规则检测结果**: ✅ 无违规内容
- 已检测规则: 299条巨量引擎禁投规则
- 检测维度: 低危(P1) + 中危(P2) + 高危(P3)
- 检测结果: 内容符合平台规范
"""
report = f"""
🚨 **巨量引擎规则检测结果**: ⚠️ 发现 {violations_result["total_violations"]} 项违规
📊 **违规统计**:
- 🔴 高危违规(P3): {violations_result["high_risk"]["count"]}
- 🟡 中危违规(P2): {violations_result["medium_risk"]["count"]}
- 🟠 低危违规(P1): {violations_result["low_risk"]["count"]}
📋 **详细违规列表**:
"""
# 按风险等级排序显示违规
for violation in sorted(violations_result["all_violations"],
key=lambda x: {"P3": 3, "P2": 2, "P1": 1}[x["risk_level"]],
reverse=True):
risk_icon = {"P3": "🚨", "P2": "⚠️", "P1": "💭"}[violation["risk_level"]]
report += f"""
{risk_icon} **{violation["risk_level"]} - {violation["category"]}**
规则: {violation["description"]}
匹配词: "{violation["matched_keyword"]}"
规则ID: {violation["rule_id"]}
"""
return report
def get_overall_risk_level(violations_result):
"""获取综合风险等级"""
if not violations_result["has_violations"]:
return "✅ P3 (安全) - 内容健康,符合平台规范"
if violations_result["high_risk"]["count"] > 0:
return f"🚨 P0 (极高危) - 发现 {violations_result['high_risk']['count']} 项高危违规,禁止投放"
elif violations_result["medium_risk"]["count"] > 2:
return f"⚠️ P1 (高危) - 发现 {violations_result['medium_risk']['count']} 项中危违规,需严格审核"
elif violations_result["medium_risk"]["count"] > 0:
return f"⚠️ P1 (中危) - 发现 {violations_result['medium_risk']['count']} 项中危违规,需要审核"
else:
return f"⚡ P2 (低危) - 发现 {violations_result['low_risk']['count']} 项低危违规,建议关注"
# ======================== 应用主要函数 ========================
def setup_seeds(seed):
"""设置随机种子"""
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
cudnn.benchmark = False
cudnn.deterministic = True
def optimize_gpu_memory():
"""GPU内存优化"""
print("🔍 开始GPU内存优化...")
# 设置环境变量优化内存分配
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:256,garbage_collection_threshold:0.6'
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
if torch.cuda.is_available():
print(f"🔍 GPU: {torch.cuda.get_device_name(0)}")
print(f"💾 总显存: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")
# 强制清理所有GPU缓存
torch.cuda.empty_cache()
torch.cuda.ipc_collect()
gc.collect()
# 设置内存增长策略
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
print(f"💾 清理后可用显存: {(torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated(0)) / 1024**3:.1f} GB")
def get_arguments():
"""获取参数配置"""
parser = argparse.ArgumentParser(description="MiniGPT4-Video参数")
parser.add_argument("--cfg-path", help="配置文件路径",
default="test_configs/minigpt4_optimized_config.yaml")
parser.add_argument("--ckpt", type=str,
default='checkpoints/video_llama_checkpoint_last.pth',
help="模型检查点路径")
parser.add_argument("--max_new_tokens", type=int, default=512,
help="最大生成token数")
parser.add_argument("--lora_r", type=int, default=96, help="LoRA rank")
parser.add_argument("--lora_alpha", type=int, default=24, help="LoRA alpha")
parser.add_argument("--options", nargs="+", help="覆盖配置选项")
return parser.parse_args()
def load_minigpt4_model():
"""加载MiniGPT4-Video模型"""
global model, vis_processor, whisper_model, args, seed
if model is not None:
return model, vis_processor, whisper_model
try:
print("🔄 正在加载MiniGPT4-Video模型...")
# 获取参数
args = get_arguments()
# 加载配置
config_path = args.cfg_path
if not os.path.exists(config_path):
config_path = "test_configs/llama2_test_config.yaml" # 回退到默认配置
with open(config_path) as file:
config = yaml.load(file, Loader=yaml.FullLoader)
seed = config['run']['seed']
setup_seeds(seed)
# GPU内存优化
optimize_gpu_memory()
print("🚀 开始初始化MiniGPT4-Video模型...")
model, vis_processor, whisper_gpu_id, minigpt4_gpu_id, answer_module_gpu_id = init_model(args)
# 清理缓存
if torch.cuda.is_available():
torch.cuda.empty_cache()
print(f"💾 模型加载后显存使用: {torch.cuda.memory_allocated(0) / 1024**3:.1f} GB")
print("🚀 开始初始化Whisper模型...")
whisper_model = whisper.load_model("base").to(f"cuda:{whisper_gpu_id}" if torch.cuda.is_available() else "cpu")
if torch.cuda.is_available():
print(f"💾 全部加载后显存使用: {torch.cuda.memory_allocated(0) / 1024**3:.1f} GB")
print("✅ 所有模型加载完成!")
return model, vis_processor, whisper_model
except Exception as e:
print(f"❌ 模型加载失败: {e}")
print("🔄 回退到模拟模式...")
return None, None, None
@spaces.GPU(duration=120)
def analyze_video_with_minigpt4(video_file, instruction):
"""使用MiniGPT4-Video分析视频内容并进行巨量引擎规则检测"""
if video_file is None:
return "❌ 请上传视频文件", "无法评估"
try:
# 加载模型
model_loaded, vis_proc, whisper_loaded = load_minigpt4_model()
if model_loaded is None:
# 模拟模式
return f"""
🎬 **视频内容分析结果 (模拟模式)**
📋 **基本信息**:
- 视频文件: {video_file}
- 分析指令: {instruction}
⚠️ **注意**: 当前运行在模拟模式,真实模型加载失败
请检查模型文件和配置是否正确
🛡️ **巨量引擎规则检测**: 仅在真实模式下可用
""", "⚠️ 模拟模式"
print(f"🔄 开始分析视频: {video_file}")
print(f"📝 分析指令: {instruction}")
# 复制视频到临时路径(如果需要)
temp_video_path = video_file
if not os.path.exists(video_file):
# 如果是Gradio的临时文件,复制到工作目录
temp_dir = "workspace/tmp"
os.makedirs(temp_dir, exist_ok=True)
temp_video_path = os.path.join(temp_dir, "analysis_video.mp4")
shutil.copy2(video_file, temp_video_path)
# 使用MiniGPT4-Video进行真实分析
if not instruction or instruction.strip() == "":
instruction = "请详细分析这个视频的内容,包括场景、人物、动作、对话等,并描述所有可见和可听的元素。"
# 调用MiniGPT4-Video的生成函数
prediction = generate_prediction(
video_path=temp_video_path,
instruction=instruction,
gen_subtitles=True, # 生成字幕
stream=False
)
# 🚨 巨量引擎规则检测 🚨
print("🔍 开始巨量引擎299条规则检测...")
violations_result = rules_engine.check_all_content(prediction, instruction)
# 格式化完整分析报告
enhanced_result = f"""
🎬 **MiniGPT4-Video 视频内容分析 + 巨量引擎规则检测报告**
📋 **基本信息**:
- 视频文件: {os.path.basename(video_file)}
- 分析设备: {torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'CPU模式'}
- 分析指令: {instruction}
🔍 **视频内容描述**:
{prediction}
{format_violations_report(violations_result)}
📊 **技术信息**:
- 内容理解: MiniGPT4-Video + Whisper
- 规则引擎: 巨量引擎299条禁投规则
- 检测等级: P1(低危) + P2(中危) + P3(高危)
- 分析模式: 多模态理解 (视觉+语音+文本)
💡 **说明**:
基于MiniGPT4-Video的深度内容理解,结合巨量引擎完整禁投规则库进行专业违规检测。
"""
# 获取综合风险等级
safety_score = get_overall_risk_level(violations_result)
return enhanced_result, safety_score
except Exception as e:
error_msg = f"""
❌ **分析过程中出错**
错误信息: {str(e)}
🔄 **可能的解决方案**:
1. 检查视频文件格式 (建议MP4)
2. 确认模型文件是否正确加载
3. 检查GPU内存是否充足
4. 验证配置文件路径
💡 **提示**: 如果问题持续,请检查模型和依赖项安装
"""
return error_msg, "⚠️ 错误"
def create_app():
"""创建Gradio应用"""
interface = gr.Interface(
fn=analyze_video_with_minigpt4,
inputs=[
gr.Video(label="上传视频文件"),
gr.Textbox(
label="分析指令",
value="请详细分析这个视频的内容,包括场景、人物、动作、对话等,并描述所有可见和可听的元素。",
placeholder="输入您希望AI如何分析这个视频...",
lines=3
)
],
outputs=[
gr.Textbox(label="MiniGPT4-Video 内容分析 + 巨量引擎规则检测", lines=20),
gr.Textbox(label="巨量引擎风险评级")
],
title="🎥 智能视频内容安全分析 - MiniGPT4-Video + 巨量引擎",
description="""
## 🎬 基于MiniGPT4-Video + 巨量引擎299条禁投规则的专业视频安全检测系统
⚡ **ZeroGPU加速** | 🎬 **MiniGPT4-Video** | 🎙️ **Whisper语音** | 🛡️ **巨量引擎299条规则**
**🔥 核心功能:**
- 🎞️ **深度视频理解**: MiniGPT4-Video多模态分析
- 🎙️ **语音转文字**: Whisper自动生成字幕
- 🛡️ **专业违规检测**: 巨量引擎完整禁投规则库
- 📊 **智能风险评级**: P0-P3四级风险等级
**🎯 检测维度:**
- **高危(P3)**: 违法出版物、烟草、医疗等严重违规
- **中危(P2)**: 赌博周边、房地产、金融等中等风险
- **低危(P1)**: 化妆品、汽车、游戏等轻微风险
**📋 规则覆盖:**
涵盖化妆品类、汽车类、游戏类、赌博类、房地产类、工具软件类、教育培训类、
金融类、医疗类、烟草类等全部299条巨量引擎禁投规则
""",
examples=[
[None, "分析这个视频是否包含禁投内容"],
[None, "检测视频中是否有巨量引擎禁止的产品或服务"],
[None, "评估视频内容的投放风险等级"],
[None, "详细描述视频内容并进行合规检查"]
],
cache_examples=False
)
return interface
def main():
"""主函数"""
print("🚀 启动MiniGPT4-Video + 巨量引擎视频安全分析应用")
print("🎬 MiniGPT4-Video: 深度视频内容理解")
print("🛡️ 巨量引擎: 299条禁投规则检测")
if torch.cuda.is_available():
print(f"✅ GPU可用: {torch.cuda.get_device_name(0)}")
else:
print("⚠️ 使用CPU模式")
# 创建必要的目录
os.makedirs("workspace/tmp", exist_ok=True)
os.makedirs("workspace/inference_subtitles", exist_ok=True)
os.makedirs("workspace/inference_subtitles/mp3", exist_ok=True)
print("📁 工作目录准备完成")
print("🚀 正在启动Gradio应用...")
app = create_app()
# 启动应用
app.launch(
share=True,
server_name="0.0.0.0",
server_port=7860,
show_error=True
)
if __name__ == "__main__":
main()