File size: 10,363 Bytes
c0f89c5
 
 
 
 
 
 
 
 
 
 
 
 
b480e77
 
 
 
 
 
 
 
c0f89c5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1e9091b
 
 
 
c0f89c5
 
 
 
 
 
 
 
1e9091b
 
 
 
 
 
 
 
c0f89c5
 
1e9091b
 
 
 
 
 
c0f89c5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b480e77
 
 
 
 
 
 
c0f89c5
 
 
 
a5625e8
 
c0f89c5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b480e77
 
c0f89c5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a5625e8
95e7e3a
3afc86b
c0f89c5
 
 
 
a5625e8
c0f89c5
 
 
4f46b63
 
 
 
 
 
 
c0f89c5
eb39bc8
 
c0f89c5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9122f52
 
a5625e8
c0f89c5
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
import gradio as gr
import ffmpeg
import tempfile
import os
from PIL import Image
import numpy as np
import os
import time

# 设置umask确保新创建的文件有正确的权限
os.umask(0o022)


def subliminal_insert(
        video_path,
        frame_img_path,
        interval_frames,
        duration_frames,
        output_dir,
        alpha=0.5,  # 添加透明度参数
        progress=None):
    """
    将特定帧按间隔植入视频流中
    """
    if not os.path.exists(video_path):
        raise FileNotFoundError("视频文件不存在")
    if not os.path.exists(frame_img_path):
        raise FileNotFoundError("图片文件不存在")

    temp_output = None
    frame_temp = None

    try:
        # 获取视频基本信息
        probe = ffmpeg.probe(video_path)
        video_info = next(s for s in probe['streams']
                          if s['codec_type'] == 'video')
        width = int(video_info['width'])
        height = int(video_info['height'])
        fps = eval(video_info['r_frame_rate'])

        print(f"视频信息: {width}x{height} @ {fps}fps")

        if progress is not None:
            progress(0.1, "正在处理图片...")

        # 调整图片大小以匹配视频尺寸
        img = Image.open(frame_img_path)
        img = img.resize((width, height))
        frame_temp = tempfile.NamedTemporaryFile(delete=False,
                                                 suffix='.png').name
        img.save(frame_temp)

        if progress is not None:
            progress(0.2, "正在准备处理视频...")

        # 使用系统临时目录
        gradio_temp_dir = os.path.join(tempfile.gettempdir(), "gradio")
        os.makedirs(gradio_temp_dir, exist_ok=True)
        temp_output = os.path.join(gradio_temp_dir,
                                   f"output_{int(time.time())}.mp4")
        print(f"将创建输出文件: {temp_output}")

        try:
            if progress is not None:
                progress(0.3, "正在处理视频...")

            # 构建ffmpeg命令
            input_video = ffmpeg.input(video_path)
            input_image = ffmpeg.input(frame_temp)

            # 构建filter complex
            v = ffmpeg.filter(
                [input_video, input_image],
                'overlay',
                enable=f'if(lt(mod(n,{interval_frames}),{duration_frames}),1,0)',
                format='yuva444p',  # 确保支持alpha通道
                alpha=f"if(lt(mod(n,{interval_frames}),{duration_frames}),{alpha},0)"  # 动态设置alpha值
            )

            # 检查输入视频是否有音频流
            has_audio = any(s['codec_type'] == 'audio'
                            for s in probe['streams'])

            # 构建输出
            if has_audio:
                stream = ffmpeg.output(v,
                                    input_video.audio,
                                    temp_output,
                                    acodec='copy',
                                    vcodec='libx264',
                                    pix_fmt='yuv420p',  # 确保输出格式兼容
                                    preset='ultrafast',
                                    crf=23,
                                    movflags='+faststart')
            else:
                stream = ffmpeg.output(v,
                                    temp_output,
                                    vcodec='libx264',
                                    pix_fmt='yuv420p',  # 确保输出格式兼容
                                    preset='ultrafast',
                                    crf=23,
                                    movflags='+faststart')

            # 打印完整命令
            cmd = " ".join(stream.compile())
            print("FFmpeg命令:", cmd)

            # 运行命令
            stdout, stderr = stream.run(capture_stdout=True,
                                        capture_stderr=True,
                                        overwrite_output=True)

            if progress is not None:
                progress(0.9, "正在完成处理...")

            # 检查输出文件
            if not os.path.exists(temp_output):
                raise Exception(f"输出文件不存在: {temp_output}")

            if os.path.getsize(temp_output) == 0:
                raise Exception(f"输出文件大小为0: {temp_output}")

            # 设置文件权限
            try:
                os.chmod(temp_output, 0o644)
            except Exception as e:
                print(f"设置文件权限时出错: {str(e)}")

            print(
                f"成功创建输出文件: {temp_output}, 大小: {os.path.getsize(temp_output)} bytes"
            )

            if progress is not None:
                progress(1.0, "处理完成")

            return temp_output

        except ffmpeg.Error as e:
            print("FFmpeg stdout:",
                  e.stdout.decode('utf8') if e.stdout else "No stdout")
            print("FFmpeg stderr:",
                  e.stderr.decode('utf8') if e.stderr else "No stderr")
            raise Exception(
                f"FFmpeg处理失败: {e.stderr.decode('utf8') if e.stderr else str(e)}"
            )

    except Exception as e:
        print(f"处理视频时出错: {str(e)}")
        if isinstance(e, ffmpeg.Error):
            if e.stderr:
                print("FFmpeg错误输出:", e.stderr.decode('utf8'))
        raise

    finally:
        # 清理临时文件
        if frame_temp and os.path.exists(frame_temp):
            try:
                os.unlink(frame_temp)
            except:
                pass


def process_video(
    video_input,
    frame_input,
    interval,
    duration,
    alpha=0.5,  # 添加透明度参数
    progress=gr.Progress()):
    """
    处理视频和图片输入
    """
    # 输入验证
    if interval < duration:
        raise ValueError("间隔必须大于持续时间")

    temp_files = []  # 跟踪临时文件

    # 创建一个固定的输出目录(在当前工作目录下)
    output_dir = os.path.join(os.getcwd(), "outputs")
    os.makedirs(output_dir, exist_ok=True)
    # 确保目录有正确的权限
    os.chmod(output_dir, 0o777)

    try:
        # 检查输入是否为空
        if video_input is None or frame_input is None:
            raise ValueError("视频或图片输入为空")

        # 处理视频输入
        video_temp = tempfile.NamedTemporaryFile(delete=False,
                                                 suffix='.mp4').name
        temp_files.append(video_temp)

        if isinstance(video_input, str):
            video_temp = video_input
        else:
            try:
                with open(video_temp, 'wb') as f:
                    f.write(video_input.read() if hasattr(video_input, 'read'
                                                          ) else video_input)
            except Exception as e:
                raise Exception(f"视频写入失败: {str(e)}")

        # 处理图片输入
        frame_temp = tempfile.NamedTemporaryFile(delete=False,
                                                 suffix='.png').name
        temp_files.append(frame_temp)

        try:
            if hasattr(frame_input, 'save'):
                frame_input.save(frame_temp)
            elif isinstance(frame_input, np.ndarray):
                Image.fromarray(frame_input).save(frame_temp)
            else:
                raise ValueError("不支持的图片格式")
        except Exception as e:
            raise Exception(f"图片处理失败: {str(e)}")

        output_path = subliminal_insert(video_temp, frame_temp, int(interval),
                                        int(duration), output_dir, alpha,
                                        progress)

        print(f"处理完成,输出文件: {output_path}")
        return output_path

    except Exception as e:
        print(f"处理失败: {str(e)}")
        raise

    finally:
        # 清理临时文件
        for temp_file in temp_files:
            if temp_file != video_input and os.path.exists(temp_file):
                try:
                    os.unlink(temp_file)
                except Exception:
                    pass


# 创建Gradio界面
iface = gr.Interface(fn=process_video,
                     inputs=[
                         gr.Video(label="输入视频", format="mp4"),
                         gr.Image(label="要植入的帧", type="pil"),
                         gr.Slider(minimum=12,
                                   maximum=300,
                                   value=30,
                                   step=1,
                                   label="植入间隔(帧数)",
                                   info="每隔多少帧插入一次图片"),
                         gr.Slider(minimum=1,
                                   maximum=5,
                                   value=1,
                                   step=1,
                                   label="植入持续时间(帧数)",
                                   info="每次插入的图片持续多少帧"),
                         gr.Slider(minimum=0.0,
                                   maximum=1.0,
                                   value=0.5,
                                   step=0.1,
                                   label="叠加透明度",
                                   info="图片叠加的透明度(0完全透明,1完全不透明)")
                     ],
                     outputs=gr.Video(label="输出视频"),
                     title="视频流插入帧",
                     description="将特定图片帧按指定间隔植入到视频中",
                     article="""
    使用说明:
    1. 上传要处理的视频文件(MP4格式)
    2. 上传要植入的图片
    3. 调整植入间隔和持续时间
    4. 点击提交开始处理
    """,
                     flagging_mode="never",
                     theme="default")

if __name__ == "__main__":
    iface.launch(
        debug=True,
        show_error=True,
        # server_name="0.0.0.0",
        # server_port=7861,
        # share=False,
        allowed_paths=[
            "/tmp", "/tmp/gradio",
            os.path.join(os.getcwd(), "outputs")
        ],  # 明确允许访问的路径
        root_path="",  # 确保根路径正确
    )