subthreshold / app.py
idFishWithCat's picture
Update app.py
4f46b63 verified
raw
history blame
10.2 kB
import gradio as gr
import ffmpeg
import tempfile
import os
from PIL import Image
import numpy as np
import os
import time
# 设置umask确保新创建的文件有正确的权限
os.umask(0o022)
def subliminal_insert(video_path,
frame_img_path,
interval_frames,
duration_frames,
output_dir,
alpha=0.5, # 添加透明度参数
progress=None):
"""
将特定帧按间隔植入视频流中
"""
if not os.path.exists(video_path):
raise FileNotFoundError("视频文件不存在")
if not os.path.exists(frame_img_path):
raise FileNotFoundError("图片文件不存在")
temp_output = None
frame_temp = None
try:
# 获取视频基本信息
probe = ffmpeg.probe(video_path)
video_info = next(s for s in probe['streams']
if s['codec_type'] == 'video')
width = int(video_info['width'])
height = int(video_info['height'])
fps = eval(video_info['r_frame_rate'])
print(f"视频信息: {width}x{height} @ {fps}fps")
if progress is not None:
progress(0.1, "正在处理图片...")
# 调整图片大小以匹配视频尺寸
img = Image.open(frame_img_path)
img = img.resize((width, height))
frame_temp = tempfile.NamedTemporaryFile(delete=False,
suffix='.png').name
img.save(frame_temp)
if progress is not None:
progress(0.2, "正在准备处理视频...")
# 使用系统临时目录
gradio_temp_dir = os.path.join(tempfile.gettempdir(), "gradio")
os.makedirs(gradio_temp_dir, exist_ok=True)
temp_output = os.path.join(gradio_temp_dir,
f"output_{int(time.time())}.mp4")
print(f"将创建输出文件: {temp_output}")
try:
if progress is not None:
progress(0.3, "正在处理视频...")
# 构建ffmpeg命令
input_video = ffmpeg.input(video_path)
input_image = ffmpeg.input(frame_temp)
# 构建filter complex
v = ffmpeg.filter(
[input_video, input_image],
'overlay',
enable=f'if(lt(mod(n,{interval_frames}),{duration_frames}),1,0)',
alpha=alpha
)
# 检查输入视频是否有音频流
has_audio = any(s['codec_type'] == 'audio'
for s in probe['streams'])
# 构建输出
if has_audio:
stream = ffmpeg.output(v,
input_video.audio,
temp_output,
acodec='copy',
vcodec='libx264',
preset='ultrafast',
crf=23,
movflags='+faststart')
else:
stream = ffmpeg.output(v,
temp_output,
vcodec='libx264',
preset='ultrafast',
crf=23,
movflags='+faststart')
# 打印完整命令
cmd = " ".join(stream.compile())
print("FFmpeg命令:", cmd)
# 运行命令
stdout, stderr = stream.run(capture_stdout=True,
capture_stderr=True,
overwrite_output=True)
if progress is not None:
progress(0.9, "正在完成处理...")
# 检查输出文件
if not os.path.exists(temp_output):
raise Exception(f"输出文件不存在: {temp_output}")
if os.path.getsize(temp_output) == 0:
raise Exception(f"输出文件大小为0: {temp_output}")
# 设置文件权限
try:
os.chmod(temp_output, 0o644)
except Exception as e:
print(f"设置文件权限时出错: {str(e)}")
print(
f"成功创建输出文件: {temp_output}, 大小: {os.path.getsize(temp_output)} bytes"
)
if progress is not None:
progress(1.0, "处理完成")
return temp_output
except ffmpeg.Error as e:
print("FFmpeg stdout:",
e.stdout.decode('utf8') if e.stdout else "No stdout")
print("FFmpeg stderr:",
e.stderr.decode('utf8') if e.stderr else "No stderr")
raise Exception(
f"FFmpeg处理失败: {e.stderr.decode('utf8') if e.stderr else str(e)}"
)
except Exception as e:
print(f"处理视频时出错: {str(e)}")
if isinstance(e, ffmpeg.Error):
if e.stderr:
print("FFmpeg错误输出:", e.stderr.decode('utf8'))
raise
finally:
# 清理临时文件
if frame_temp and os.path.exists(frame_temp):
try:
os.unlink(frame_temp)
except:
pass
def process_video(video_input,
frame_input,
interval,
duration,
alpha=0.5, # 添加透明度参数
progress=gr.Progress()):
"""
处理视频和图片输入
"""
# 输入验证
if interval < duration:
raise ValueError("间隔必须大于持续时间")
temp_files = [] # 跟踪临时文件
# 创建一个固定的输出目录(在当前工作目录下)
output_dir = os.path.join(os.getcwd(), "outputs")
os.makedirs(output_dir, exist_ok=True)
# 确保目录有正确的权限
os.chmod(output_dir, 0o777)
try:
# 检查输入是否为空
if video_input is None or frame_input is None:
raise ValueError("视频或图片输入为空")
# 处理视频输入
video_temp = tempfile.NamedTemporaryFile(delete=False,
suffix='.mp4').name
temp_files.append(video_temp)
if isinstance(video_input, str):
video_temp = video_input
else:
try:
with open(video_temp, 'wb') as f:
f.write(video_input.read() if hasattr(video_input, 'read'
) else video_input)
except Exception as e:
raise Exception(f"视频写入失败: {str(e)}")
# 处理图片输入
frame_temp = tempfile.NamedTemporaryFile(delete=False,
suffix='.png').name
temp_files.append(frame_temp)
try:
if hasattr(frame_input, 'save'):
frame_input.save(frame_temp)
elif isinstance(frame_input, np.ndarray):
Image.fromarray(frame_input).save(frame_temp)
else:
raise ValueError("不支持的图片格式")
except Exception as e:
raise Exception(f"图片处理失败: {str(e)}")
output_path = subliminal_insert(video_temp, frame_temp, int(interval),
int(duration), output_dir, alpha, progress)
print(f"处理完成,输出文件: {output_path}")
return output_path
except Exception as e:
print(f"处理失败: {str(e)}")
raise
finally:
# 清理临时文件
for temp_file in temp_files:
if temp_file != video_input and os.path.exists(temp_file):
try:
os.unlink(temp_file)
except Exception:
pass
# 创建Gradio界面
iface = gr.Interface(fn=process_video,
inputs=[
gr.Video(label="输入视频", format="mp4"),
gr.Image(label="要植入的帧", type="pil"),
gr.Slider(minimum=12,
maximum=300,
value=30,
step=1,
label="植入间隔(帧数)",
info="每隔多少帧插入一次图片"),
gr.Slider(minimum=1,
maximum=5,
value=1,
step=1,
label="植入持续时间(帧数)",
info="每次插入的图片持续多少帧"),
gr.Slider(minimum=0.0,
maximum=1.0,
value=0.5,
step=0.1,
label="叠加透明度",
info="图片叠加的透明度(0完全透明,1完全不透明)")
],
outputs=gr.Video(label="输出视频"),
title="视频流插入帧",
description="将特定图片帧按指定间隔植入到视频中",
article="""
使用说明:
1. 上传要处理的视频文件(MP4格式)
2. 上传要植入的图片
3. 调整植入间隔和持续时间
4. 点击提交开始处理
""",
flagging_mode="never",
theme="default")
if __name__ == "__main__":
iface.launch(
debug=True,
show_error=True,
# server_name="0.0.0.0",
# server_port=7861,
# share=False,
allowed_paths=[
"/tmp", "/tmp/gradio",
os.path.join(os.getcwd(), "outputs")
], # 明确允许访问的路径
root_path="", # 确保根路径正确
)