Spaces:
Runtime error
Runtime error
File size: 10,363 Bytes
c0f89c5 b480e77 c0f89c5 1e9091b c0f89c5 1e9091b c0f89c5 1e9091b c0f89c5 b480e77 c0f89c5 a5625e8 c0f89c5 b480e77 c0f89c5 a5625e8 95e7e3a 3afc86b c0f89c5 a5625e8 c0f89c5 4f46b63 c0f89c5 eb39bc8 c0f89c5 9122f52 a5625e8 c0f89c5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 |
import gradio as gr
import ffmpeg
import tempfile
import os
from PIL import Image
import numpy as np
import os
import time
# 设置umask确保新创建的文件有正确的权限
os.umask(0o022)
def subliminal_insert(
video_path,
frame_img_path,
interval_frames,
duration_frames,
output_dir,
alpha=0.5, # 添加透明度参数
progress=None):
"""
将特定帧按间隔植入视频流中
"""
if not os.path.exists(video_path):
raise FileNotFoundError("视频文件不存在")
if not os.path.exists(frame_img_path):
raise FileNotFoundError("图片文件不存在")
temp_output = None
frame_temp = None
try:
# 获取视频基本信息
probe = ffmpeg.probe(video_path)
video_info = next(s for s in probe['streams']
if s['codec_type'] == 'video')
width = int(video_info['width'])
height = int(video_info['height'])
fps = eval(video_info['r_frame_rate'])
print(f"视频信息: {width}x{height} @ {fps}fps")
if progress is not None:
progress(0.1, "正在处理图片...")
# 调整图片大小以匹配视频尺寸
img = Image.open(frame_img_path)
img = img.resize((width, height))
frame_temp = tempfile.NamedTemporaryFile(delete=False,
suffix='.png').name
img.save(frame_temp)
if progress is not None:
progress(0.2, "正在准备处理视频...")
# 使用系统临时目录
gradio_temp_dir = os.path.join(tempfile.gettempdir(), "gradio")
os.makedirs(gradio_temp_dir, exist_ok=True)
temp_output = os.path.join(gradio_temp_dir,
f"output_{int(time.time())}.mp4")
print(f"将创建输出文件: {temp_output}")
try:
if progress is not None:
progress(0.3, "正在处理视频...")
# 构建ffmpeg命令
input_video = ffmpeg.input(video_path)
input_image = ffmpeg.input(frame_temp)
# 构建filter complex
v = ffmpeg.filter(
[input_video, input_image],
'overlay',
enable=f'if(lt(mod(n,{interval_frames}),{duration_frames}),1,0)',
format='yuva444p', # 确保支持alpha通道
alpha=f"if(lt(mod(n,{interval_frames}),{duration_frames}),{alpha},0)" # 动态设置alpha值
)
# 检查输入视频是否有音频流
has_audio = any(s['codec_type'] == 'audio'
for s in probe['streams'])
# 构建输出
if has_audio:
stream = ffmpeg.output(v,
input_video.audio,
temp_output,
acodec='copy',
vcodec='libx264',
pix_fmt='yuv420p', # 确保输出格式兼容
preset='ultrafast',
crf=23,
movflags='+faststart')
else:
stream = ffmpeg.output(v,
temp_output,
vcodec='libx264',
pix_fmt='yuv420p', # 确保输出格式兼容
preset='ultrafast',
crf=23,
movflags='+faststart')
# 打印完整命令
cmd = " ".join(stream.compile())
print("FFmpeg命令:", cmd)
# 运行命令
stdout, stderr = stream.run(capture_stdout=True,
capture_stderr=True,
overwrite_output=True)
if progress is not None:
progress(0.9, "正在完成处理...")
# 检查输出文件
if not os.path.exists(temp_output):
raise Exception(f"输出文件不存在: {temp_output}")
if os.path.getsize(temp_output) == 0:
raise Exception(f"输出文件大小为0: {temp_output}")
# 设置文件权限
try:
os.chmod(temp_output, 0o644)
except Exception as e:
print(f"设置文件权限时出错: {str(e)}")
print(
f"成功创建输出文件: {temp_output}, 大小: {os.path.getsize(temp_output)} bytes"
)
if progress is not None:
progress(1.0, "处理完成")
return temp_output
except ffmpeg.Error as e:
print("FFmpeg stdout:",
e.stdout.decode('utf8') if e.stdout else "No stdout")
print("FFmpeg stderr:",
e.stderr.decode('utf8') if e.stderr else "No stderr")
raise Exception(
f"FFmpeg处理失败: {e.stderr.decode('utf8') if e.stderr else str(e)}"
)
except Exception as e:
print(f"处理视频时出错: {str(e)}")
if isinstance(e, ffmpeg.Error):
if e.stderr:
print("FFmpeg错误输出:", e.stderr.decode('utf8'))
raise
finally:
# 清理临时文件
if frame_temp and os.path.exists(frame_temp):
try:
os.unlink(frame_temp)
except:
pass
def process_video(
video_input,
frame_input,
interval,
duration,
alpha=0.5, # 添加透明度参数
progress=gr.Progress()):
"""
处理视频和图片输入
"""
# 输入验证
if interval < duration:
raise ValueError("间隔必须大于持续时间")
temp_files = [] # 跟踪临时文件
# 创建一个固定的输出目录(在当前工作目录下)
output_dir = os.path.join(os.getcwd(), "outputs")
os.makedirs(output_dir, exist_ok=True)
# 确保目录有正确的权限
os.chmod(output_dir, 0o777)
try:
# 检查输入是否为空
if video_input is None or frame_input is None:
raise ValueError("视频或图片输入为空")
# 处理视频输入
video_temp = tempfile.NamedTemporaryFile(delete=False,
suffix='.mp4').name
temp_files.append(video_temp)
if isinstance(video_input, str):
video_temp = video_input
else:
try:
with open(video_temp, 'wb') as f:
f.write(video_input.read() if hasattr(video_input, 'read'
) else video_input)
except Exception as e:
raise Exception(f"视频写入失败: {str(e)}")
# 处理图片输入
frame_temp = tempfile.NamedTemporaryFile(delete=False,
suffix='.png').name
temp_files.append(frame_temp)
try:
if hasattr(frame_input, 'save'):
frame_input.save(frame_temp)
elif isinstance(frame_input, np.ndarray):
Image.fromarray(frame_input).save(frame_temp)
else:
raise ValueError("不支持的图片格式")
except Exception as e:
raise Exception(f"图片处理失败: {str(e)}")
output_path = subliminal_insert(video_temp, frame_temp, int(interval),
int(duration), output_dir, alpha,
progress)
print(f"处理完成,输出文件: {output_path}")
return output_path
except Exception as e:
print(f"处理失败: {str(e)}")
raise
finally:
# 清理临时文件
for temp_file in temp_files:
if temp_file != video_input and os.path.exists(temp_file):
try:
os.unlink(temp_file)
except Exception:
pass
# 创建Gradio界面
iface = gr.Interface(fn=process_video,
inputs=[
gr.Video(label="输入视频", format="mp4"),
gr.Image(label="要植入的帧", type="pil"),
gr.Slider(minimum=12,
maximum=300,
value=30,
step=1,
label="植入间隔(帧数)",
info="每隔多少帧插入一次图片"),
gr.Slider(minimum=1,
maximum=5,
value=1,
step=1,
label="植入持续时间(帧数)",
info="每次插入的图片持续多少帧"),
gr.Slider(minimum=0.0,
maximum=1.0,
value=0.5,
step=0.1,
label="叠加透明度",
info="图片叠加的透明度(0完全透明,1完全不透明)")
],
outputs=gr.Video(label="输出视频"),
title="视频流插入帧",
description="将特定图片帧按指定间隔植入到视频中",
article="""
使用说明:
1. 上传要处理的视频文件(MP4格式)
2. 上传要植入的图片
3. 调整植入间隔和持续时间
4. 点击提交开始处理
""",
flagging_mode="never",
theme="default")
if __name__ == "__main__":
iface.launch(
debug=True,
show_error=True,
# server_name="0.0.0.0",
# server_port=7861,
# share=False,
allowed_paths=[
"/tmp", "/tmp/gradio",
os.path.join(os.getcwd(), "outputs")
], # 明确允许访问的路径
root_path="", # 确保根路径正确
)
|