subthreshold / app.py
idFishWithCat's picture
Update app.py
1e9091b verified
import gradio as gr
import ffmpeg
import tempfile
import os
from PIL import Image
import numpy as np
import os
import time
# 设置umask确保新创建的文件有正确的权限
os.umask(0o022)
def subliminal_insert(
video_path,
frame_img_path,
interval_frames,
duration_frames,
output_dir,
alpha=0.5, # 添加透明度参数
progress=None):
"""
将特定帧按间隔植入视频流中
"""
if not os.path.exists(video_path):
raise FileNotFoundError("视频文件不存在")
if not os.path.exists(frame_img_path):
raise FileNotFoundError("图片文件不存在")
temp_output = None
frame_temp = None
try:
# 获取视频基本信息
probe = ffmpeg.probe(video_path)
video_info = next(s for s in probe['streams']
if s['codec_type'] == 'video')
width = int(video_info['width'])
height = int(video_info['height'])
fps = eval(video_info['r_frame_rate'])
print(f"视频信息: {width}x{height} @ {fps}fps")
if progress is not None:
progress(0.1, "正在处理图片...")
# 调整图片大小以匹配视频尺寸
img = Image.open(frame_img_path)
img = img.resize((width, height))
frame_temp = tempfile.NamedTemporaryFile(delete=False,
suffix='.png').name
img.save(frame_temp)
if progress is not None:
progress(0.2, "正在准备处理视频...")
# 使用系统临时目录
gradio_temp_dir = os.path.join(tempfile.gettempdir(), "gradio")
os.makedirs(gradio_temp_dir, exist_ok=True)
temp_output = os.path.join(gradio_temp_dir,
f"output_{int(time.time())}.mp4")
print(f"将创建输出文件: {temp_output}")
try:
if progress is not None:
progress(0.3, "正在处理视频...")
# 构建ffmpeg命令
input_video = ffmpeg.input(video_path)
input_image = ffmpeg.input(frame_temp)
# 构建filter complex
v = ffmpeg.filter(
[input_video, input_image],
'overlay',
enable=f'if(lt(mod(n,{interval_frames}),{duration_frames}),1,0)',
format='yuva444p', # 确保支持alpha通道
alpha=f"if(lt(mod(n,{interval_frames}),{duration_frames}),{alpha},0)" # 动态设置alpha值
)
# 检查输入视频是否有音频流
has_audio = any(s['codec_type'] == 'audio'
for s in probe['streams'])
# 构建输出
if has_audio:
stream = ffmpeg.output(v,
input_video.audio,
temp_output,
acodec='copy',
vcodec='libx264',
pix_fmt='yuv420p', # 确保输出格式兼容
preset='ultrafast',
crf=23,
movflags='+faststart')
else:
stream = ffmpeg.output(v,
temp_output,
vcodec='libx264',
pix_fmt='yuv420p', # 确保输出格式兼容
preset='ultrafast',
crf=23,
movflags='+faststart')
# 打印完整命令
cmd = " ".join(stream.compile())
print("FFmpeg命令:", cmd)
# 运行命令
stdout, stderr = stream.run(capture_stdout=True,
capture_stderr=True,
overwrite_output=True)
if progress is not None:
progress(0.9, "正在完成处理...")
# 检查输出文件
if not os.path.exists(temp_output):
raise Exception(f"输出文件不存在: {temp_output}")
if os.path.getsize(temp_output) == 0:
raise Exception(f"输出文件大小为0: {temp_output}")
# 设置文件权限
try:
os.chmod(temp_output, 0o644)
except Exception as e:
print(f"设置文件权限时出错: {str(e)}")
print(
f"成功创建输出文件: {temp_output}, 大小: {os.path.getsize(temp_output)} bytes"
)
if progress is not None:
progress(1.0, "处理完成")
return temp_output
except ffmpeg.Error as e:
print("FFmpeg stdout:",
e.stdout.decode('utf8') if e.stdout else "No stdout")
print("FFmpeg stderr:",
e.stderr.decode('utf8') if e.stderr else "No stderr")
raise Exception(
f"FFmpeg处理失败: {e.stderr.decode('utf8') if e.stderr else str(e)}"
)
except Exception as e:
print(f"处理视频时出错: {str(e)}")
if isinstance(e, ffmpeg.Error):
if e.stderr:
print("FFmpeg错误输出:", e.stderr.decode('utf8'))
raise
finally:
# 清理临时文件
if frame_temp and os.path.exists(frame_temp):
try:
os.unlink(frame_temp)
except:
pass
def process_video(
video_input,
frame_input,
interval,
duration,
alpha=0.5, # 添加透明度参数
progress=gr.Progress()):
"""
处理视频和图片输入
"""
# 输入验证
if interval < duration:
raise ValueError("间隔必须大于持续时间")
temp_files = [] # 跟踪临时文件
# 创建一个固定的输出目录(在当前工作目录下)
output_dir = os.path.join(os.getcwd(), "outputs")
os.makedirs(output_dir, exist_ok=True)
# 确保目录有正确的权限
os.chmod(output_dir, 0o777)
try:
# 检查输入是否为空
if video_input is None or frame_input is None:
raise ValueError("视频或图片输入为空")
# 处理视频输入
video_temp = tempfile.NamedTemporaryFile(delete=False,
suffix='.mp4').name
temp_files.append(video_temp)
if isinstance(video_input, str):
video_temp = video_input
else:
try:
with open(video_temp, 'wb') as f:
f.write(video_input.read() if hasattr(video_input, 'read'
) else video_input)
except Exception as e:
raise Exception(f"视频写入失败: {str(e)}")
# 处理图片输入
frame_temp = tempfile.NamedTemporaryFile(delete=False,
suffix='.png').name
temp_files.append(frame_temp)
try:
if hasattr(frame_input, 'save'):
frame_input.save(frame_temp)
elif isinstance(frame_input, np.ndarray):
Image.fromarray(frame_input).save(frame_temp)
else:
raise ValueError("不支持的图片格式")
except Exception as e:
raise Exception(f"图片处理失败: {str(e)}")
output_path = subliminal_insert(video_temp, frame_temp, int(interval),
int(duration), output_dir, alpha,
progress)
print(f"处理完成,输出文件: {output_path}")
return output_path
except Exception as e:
print(f"处理失败: {str(e)}")
raise
finally:
# 清理临时文件
for temp_file in temp_files:
if temp_file != video_input and os.path.exists(temp_file):
try:
os.unlink(temp_file)
except Exception:
pass
# 创建Gradio界面
iface = gr.Interface(fn=process_video,
inputs=[
gr.Video(label="输入视频", format="mp4"),
gr.Image(label="要植入的帧", type="pil"),
gr.Slider(minimum=12,
maximum=300,
value=30,
step=1,
label="植入间隔(帧数)",
info="每隔多少帧插入一次图片"),
gr.Slider(minimum=1,
maximum=5,
value=1,
step=1,
label="植入持续时间(帧数)",
info="每次插入的图片持续多少帧"),
gr.Slider(minimum=0.0,
maximum=1.0,
value=0.5,
step=0.1,
label="叠加透明度",
info="图片叠加的透明度(0完全透明,1完全不透明)")
],
outputs=gr.Video(label="输出视频"),
title="视频流插入帧",
description="将特定图片帧按指定间隔植入到视频中",
article="""
使用说明:
1. 上传要处理的视频文件(MP4格式)
2. 上传要植入的图片
3. 调整植入间隔和持续时间
4. 点击提交开始处理
""",
flagging_mode="never",
theme="default")
if __name__ == "__main__":
iface.launch(
debug=True,
show_error=True,
# server_name="0.0.0.0",
# server_port=7861,
# share=False,
allowed_paths=[
"/tmp", "/tmp/gradio",
os.path.join(os.getcwd(), "outputs")
], # 明确允许访问的路径
root_path="", # 确保根路径正确
)