Spaces:
Runtime error
Runtime error
import gradio as gr | |
import ffmpeg | |
import tempfile | |
import os | |
from PIL import Image | |
import numpy as np | |
import os | |
import time | |
# 设置umask确保新创建的文件有正确的权限 | |
os.umask(0o022) | |
def subliminal_insert( | |
video_path, | |
frame_img_path, | |
interval_frames, | |
duration_frames, | |
output_dir, | |
alpha=0.5, # 添加透明度参数 | |
progress=None): | |
""" | |
将特定帧按间隔植入视频流中 | |
""" | |
if not os.path.exists(video_path): | |
raise FileNotFoundError("视频文件不存在") | |
if not os.path.exists(frame_img_path): | |
raise FileNotFoundError("图片文件不存在") | |
temp_output = None | |
frame_temp = None | |
try: | |
# 获取视频基本信息 | |
probe = ffmpeg.probe(video_path) | |
video_info = next(s for s in probe['streams'] | |
if s['codec_type'] == 'video') | |
width = int(video_info['width']) | |
height = int(video_info['height']) | |
fps = eval(video_info['r_frame_rate']) | |
print(f"视频信息: {width}x{height} @ {fps}fps") | |
if progress is not None: | |
progress(0.1, "正在处理图片...") | |
# 调整图片大小以匹配视频尺寸 | |
img = Image.open(frame_img_path) | |
img = img.resize((width, height)) | |
frame_temp = tempfile.NamedTemporaryFile(delete=False, | |
suffix='.png').name | |
img.save(frame_temp) | |
if progress is not None: | |
progress(0.2, "正在准备处理视频...") | |
# 使用系统临时目录 | |
gradio_temp_dir = os.path.join(tempfile.gettempdir(), "gradio") | |
os.makedirs(gradio_temp_dir, exist_ok=True) | |
temp_output = os.path.join(gradio_temp_dir, | |
f"output_{int(time.time())}.mp4") | |
print(f"将创建输出文件: {temp_output}") | |
try: | |
if progress is not None: | |
progress(0.3, "正在处理视频...") | |
# 构建ffmpeg命令 | |
input_video = ffmpeg.input(video_path) | |
input_image = ffmpeg.input(frame_temp) | |
# 构建filter complex | |
v = ffmpeg.filter( | |
[input_video, input_image], | |
'overlay', | |
enable=f'if(lt(mod(n,{interval_frames}),{duration_frames}),1,0)', | |
format='yuva444p', # 确保支持alpha通道 | |
alpha=f"if(lt(mod(n,{interval_frames}),{duration_frames}),{alpha},0)" # 动态设置alpha值 | |
) | |
# 检查输入视频是否有音频流 | |
has_audio = any(s['codec_type'] == 'audio' | |
for s in probe['streams']) | |
# 构建输出 | |
if has_audio: | |
stream = ffmpeg.output(v, | |
input_video.audio, | |
temp_output, | |
acodec='copy', | |
vcodec='libx264', | |
pix_fmt='yuv420p', # 确保输出格式兼容 | |
preset='ultrafast', | |
crf=23, | |
movflags='+faststart') | |
else: | |
stream = ffmpeg.output(v, | |
temp_output, | |
vcodec='libx264', | |
pix_fmt='yuv420p', # 确保输出格式兼容 | |
preset='ultrafast', | |
crf=23, | |
movflags='+faststart') | |
# 打印完整命令 | |
cmd = " ".join(stream.compile()) | |
print("FFmpeg命令:", cmd) | |
# 运行命令 | |
stdout, stderr = stream.run(capture_stdout=True, | |
capture_stderr=True, | |
overwrite_output=True) | |
if progress is not None: | |
progress(0.9, "正在完成处理...") | |
# 检查输出文件 | |
if not os.path.exists(temp_output): | |
raise Exception(f"输出文件不存在: {temp_output}") | |
if os.path.getsize(temp_output) == 0: | |
raise Exception(f"输出文件大小为0: {temp_output}") | |
# 设置文件权限 | |
try: | |
os.chmod(temp_output, 0o644) | |
except Exception as e: | |
print(f"设置文件权限时出错: {str(e)}") | |
print( | |
f"成功创建输出文件: {temp_output}, 大小: {os.path.getsize(temp_output)} bytes" | |
) | |
if progress is not None: | |
progress(1.0, "处理完成") | |
return temp_output | |
except ffmpeg.Error as e: | |
print("FFmpeg stdout:", | |
e.stdout.decode('utf8') if e.stdout else "No stdout") | |
print("FFmpeg stderr:", | |
e.stderr.decode('utf8') if e.stderr else "No stderr") | |
raise Exception( | |
f"FFmpeg处理失败: {e.stderr.decode('utf8') if e.stderr else str(e)}" | |
) | |
except Exception as e: | |
print(f"处理视频时出错: {str(e)}") | |
if isinstance(e, ffmpeg.Error): | |
if e.stderr: | |
print("FFmpeg错误输出:", e.stderr.decode('utf8')) | |
raise | |
finally: | |
# 清理临时文件 | |
if frame_temp and os.path.exists(frame_temp): | |
try: | |
os.unlink(frame_temp) | |
except: | |
pass | |
def process_video( | |
video_input, | |
frame_input, | |
interval, | |
duration, | |
alpha=0.5, # 添加透明度参数 | |
progress=gr.Progress()): | |
""" | |
处理视频和图片输入 | |
""" | |
# 输入验证 | |
if interval < duration: | |
raise ValueError("间隔必须大于持续时间") | |
temp_files = [] # 跟踪临时文件 | |
# 创建一个固定的输出目录(在当前工作目录下) | |
output_dir = os.path.join(os.getcwd(), "outputs") | |
os.makedirs(output_dir, exist_ok=True) | |
# 确保目录有正确的权限 | |
os.chmod(output_dir, 0o777) | |
try: | |
# 检查输入是否为空 | |
if video_input is None or frame_input is None: | |
raise ValueError("视频或图片输入为空") | |
# 处理视频输入 | |
video_temp = tempfile.NamedTemporaryFile(delete=False, | |
suffix='.mp4').name | |
temp_files.append(video_temp) | |
if isinstance(video_input, str): | |
video_temp = video_input | |
else: | |
try: | |
with open(video_temp, 'wb') as f: | |
f.write(video_input.read() if hasattr(video_input, 'read' | |
) else video_input) | |
except Exception as e: | |
raise Exception(f"视频写入失败: {str(e)}") | |
# 处理图片输入 | |
frame_temp = tempfile.NamedTemporaryFile(delete=False, | |
suffix='.png').name | |
temp_files.append(frame_temp) | |
try: | |
if hasattr(frame_input, 'save'): | |
frame_input.save(frame_temp) | |
elif isinstance(frame_input, np.ndarray): | |
Image.fromarray(frame_input).save(frame_temp) | |
else: | |
raise ValueError("不支持的图片格式") | |
except Exception as e: | |
raise Exception(f"图片处理失败: {str(e)}") | |
output_path = subliminal_insert(video_temp, frame_temp, int(interval), | |
int(duration), output_dir, alpha, | |
progress) | |
print(f"处理完成,输出文件: {output_path}") | |
return output_path | |
except Exception as e: | |
print(f"处理失败: {str(e)}") | |
raise | |
finally: | |
# 清理临时文件 | |
for temp_file in temp_files: | |
if temp_file != video_input and os.path.exists(temp_file): | |
try: | |
os.unlink(temp_file) | |
except Exception: | |
pass | |
# 创建Gradio界面 | |
iface = gr.Interface(fn=process_video, | |
inputs=[ | |
gr.Video(label="输入视频", format="mp4"), | |
gr.Image(label="要植入的帧", type="pil"), | |
gr.Slider(minimum=12, | |
maximum=300, | |
value=30, | |
step=1, | |
label="植入间隔(帧数)", | |
info="每隔多少帧插入一次图片"), | |
gr.Slider(minimum=1, | |
maximum=5, | |
value=1, | |
step=1, | |
label="植入持续时间(帧数)", | |
info="每次插入的图片持续多少帧"), | |
gr.Slider(minimum=0.0, | |
maximum=1.0, | |
value=0.5, | |
step=0.1, | |
label="叠加透明度", | |
info="图片叠加的透明度(0完全透明,1完全不透明)") | |
], | |
outputs=gr.Video(label="输出视频"), | |
title="视频流插入帧", | |
description="将特定图片帧按指定间隔植入到视频中", | |
article=""" | |
使用说明: | |
1. 上传要处理的视频文件(MP4格式) | |
2. 上传要植入的图片 | |
3. 调整植入间隔和持续时间 | |
4. 点击提交开始处理 | |
""", | |
flagging_mode="never", | |
theme="default") | |
if __name__ == "__main__": | |
iface.launch( | |
debug=True, | |
show_error=True, | |
# server_name="0.0.0.0", | |
# server_port=7861, | |
# share=False, | |
allowed_paths=[ | |
"/tmp", "/tmp/gradio", | |
os.path.join(os.getcwd(), "outputs") | |
], # 明确允许访问的路径 | |
root_path="", # 确保根路径正确 | |
) | |