|
import os
|
|
import time
|
|
import json
|
|
from typing import List
|
|
from app.schemas.llm import StoryGenerationRequest
|
|
from loguru import logger
|
|
from app.models.const import StoryType, ImageStyle
|
|
from app.schemas.video import VideoGenerateRequest, StoryScene
|
|
from app.services.llm import llm_service
|
|
from app.services.voice import generate_voice
|
|
from app.utils import utils
|
|
from moviepy import (
|
|
VideoFileClip,
|
|
ImageClip,
|
|
AudioFileClip,
|
|
TextClip,
|
|
CompositeVideoClip,
|
|
concatenate_videoclips,
|
|
afx,
|
|
)
|
|
from moviepy.video.tools import subtitles
|
|
from moviepy.video.tools.subtitles import SubtitlesClip
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
import numpy as np
|
|
import requests
|
|
import random
|
|
|
|
def wrap_text(text, max_width, font="Arial", fontsize=60):
|
|
|
|
font = ImageFont.truetype(font, fontsize)
|
|
|
|
def get_text_size(inner_text):
|
|
inner_text = inner_text.strip()
|
|
left, top, right, bottom = font.getbbox(inner_text)
|
|
return right - left, bottom - top
|
|
|
|
width, height = get_text_size(text)
|
|
if width <= max_width:
|
|
return text, height
|
|
|
|
|
|
|
|
processed = True
|
|
|
|
_wrapped_lines_ = []
|
|
words = text.split(" ")
|
|
_txt_ = ""
|
|
for word in words:
|
|
_before = _txt_
|
|
_txt_ += f"{word} "
|
|
_width, _height = get_text_size(_txt_)
|
|
if _width <= max_width:
|
|
continue
|
|
else:
|
|
if _txt_.strip() == word.strip():
|
|
processed = False
|
|
break
|
|
_wrapped_lines_.append(_before)
|
|
_txt_ = f"{word} "
|
|
_wrapped_lines_.append(_txt_)
|
|
if processed:
|
|
_wrapped_lines_ = [line.strip() for line in _wrapped_lines_]
|
|
result = "\n".join(_wrapped_lines_).strip()
|
|
height = len(_wrapped_lines_) * height
|
|
|
|
return result, height
|
|
|
|
_wrapped_lines_ = []
|
|
chars = list(text)
|
|
_txt_ = ""
|
|
for word in chars:
|
|
_txt_ += word
|
|
_width, _height = get_text_size(_txt_)
|
|
if _width <= max_width:
|
|
continue
|
|
else:
|
|
_wrapped_lines_.append(_txt_)
|
|
_txt_ = ""
|
|
_wrapped_lines_.append(_txt_)
|
|
result = "\n".join(_wrapped_lines_).strip()
|
|
height = len(_wrapped_lines_) * height
|
|
|
|
return result, height
|
|
|
|
async def create_video_with_scenes(task_dir: str, scenes: List[StoryScene], voice_name: str, voice_rate: float, test_mode: bool = False) -> str:
|
|
"""创建带有场景的视频
|
|
|
|
Args:
|
|
task_dir (str): 任务目录
|
|
scenes (List[StoryScene]): 场景列表
|
|
voice_name (str): 语音名称
|
|
voice_rate (float): 语音速率
|
|
test_mode (bool): 是否为测试模式,如果是则使用已有的图片、音频、字幕文件
|
|
"""
|
|
clips = []
|
|
for i, scene in enumerate(scenes, 1):
|
|
try:
|
|
|
|
image_file = os.path.join(task_dir, f"{i}.png")
|
|
audio_file = os.path.join(task_dir, f"{i}.mp3")
|
|
subtitle_file = os.path.join(task_dir, f"{i}.srt")
|
|
|
|
|
|
if test_mode:
|
|
if not (os.path.exists(image_file) and os.path.exists(audio_file) and os.path.exists(subtitle_file)):
|
|
logger.warning(f"Test mode: Required files not found for scene {i}")
|
|
raise FileNotFoundError("Required files not found")
|
|
else:
|
|
|
|
logger.info(f"Processing scene {i}")
|
|
audio_file, subtitle_file = await generate_voice(
|
|
scene.text,
|
|
voice_name,
|
|
voice_rate,
|
|
audio_file,
|
|
subtitle_file
|
|
)
|
|
|
|
|
|
subs = subtitles.file_to_subtitles(subtitle_file, encoding="utf-8")
|
|
subtitle_duration = max([tb for ((ta, tb), txt) in subs])
|
|
|
|
|
|
image_clip = ImageClip(image_file)
|
|
origin_image_w, origin_image_h = image_clip.size
|
|
image_scale = 1.2
|
|
image_clip = image_clip.resized((origin_image_w*image_scale,origin_image_h*image_scale))
|
|
image_w, image_h = image_clip.size
|
|
|
|
image_clip = image_clip.with_duration(subtitle_duration)
|
|
|
|
width_diff = origin_image_w * (image_scale-1)
|
|
def debug_position(t):
|
|
|
|
return (-width_diff/subtitle_duration*t, 'center')
|
|
image_clip = image_clip.with_position(debug_position)
|
|
|
|
audio_clip = AudioFileClip(audio_file)
|
|
image_clip = image_clip.with_audio(audio_clip)
|
|
|
|
font_path = os.path.join(utils.resource_dir(), "fonts", "STHeitiLight.ttc")
|
|
if not os.path.exists(font_path):
|
|
logger.warning("Font file not found, using default font")
|
|
raise FileNotFoundError("Font file not found: " + font_path)
|
|
else:
|
|
logger.info(f"Using font: {font_path}")
|
|
|
|
print(f"Using font: {font_path}")
|
|
|
|
if os.path.exists(subtitle_file):
|
|
logger.info(f"Loading subtitle file: {subtitle_file}")
|
|
try:
|
|
def make_textclip(text):
|
|
return TextClip(
|
|
text=text,
|
|
font=font_path,
|
|
font_size=60,
|
|
)
|
|
def create_text_clip(subtitle_item):
|
|
phrase = subtitle_item[1]
|
|
max_width = (origin_image_w * 0.9)
|
|
wrapped_txt, txt_height = wrap_text(
|
|
phrase, max_width=max_width, font=font_path, fontsize=60
|
|
)
|
|
_clip = TextClip(
|
|
text=wrapped_txt,
|
|
font=font_path,
|
|
font_size=60,
|
|
color="white",
|
|
stroke_color="black",
|
|
stroke_width=2,
|
|
)
|
|
duration = subtitle_item[0][1] - subtitle_item[0][0]
|
|
_clip = _clip.with_start(subtitle_item[0][0])
|
|
_clip = _clip.with_end(subtitle_item[0][1])
|
|
_clip = _clip.with_duration(duration)
|
|
_clip = _clip.with_position(("center", origin_image_h * 0.95 - _clip.h - 50))
|
|
return _clip
|
|
|
|
sub = SubtitlesClip(subtitle_file, encoding="utf-8", make_textclip=make_textclip)
|
|
|
|
text_clips = []
|
|
for item in sub.subtitles:
|
|
clip = create_text_clip(subtitle_item=item)
|
|
text_clips.append(clip)
|
|
video_clip = CompositeVideoClip([image_clip, *text_clips], (origin_image_w, origin_image_h))
|
|
clips.append(video_clip)
|
|
logger.info(f"Added subtitles for scene {i}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to add subtitles for scene {i}: {str(e)}")
|
|
clips.append(image_clip)
|
|
else:
|
|
logger.warning(f"Subtitle file not found: {subtitle_file}")
|
|
clips.append(image_clip)
|
|
except Exception as e:
|
|
logger.error(f"Failed to process scene {i}: {str(e)}")
|
|
raise e
|
|
|
|
if not clips:
|
|
raise ValueError("No valid clips to combine")
|
|
|
|
|
|
logger.info("Merging all clips")
|
|
final_clip = concatenate_videoclips(clips)
|
|
video_file = os.path.join(task_dir, "video.mp4")
|
|
logger.info(f"Writing video to {video_file}")
|
|
final_clip.write_videofile(video_file, fps=24, codec='libx264', audio_codec='aac')
|
|
|
|
return video_file
|
|
|
|
|
|
async def generate_video(request: VideoGenerateRequest):
|
|
"""生成视频
|
|
|
|
Args:
|
|
request (VideoGenerateRequest): 视频生成请求
|
|
"""
|
|
try:
|
|
|
|
if request.test_mode:
|
|
task_id = request.task_id or str(int(time.time()))
|
|
task_dir = utils.task_dir(task_id)
|
|
if not os.path.exists(task_dir):
|
|
raise ValueError(f"Task directory not found: {task_dir}")
|
|
|
|
story_file = os.path.join(task_dir, "story.json")
|
|
if not os.path.exists(story_file):
|
|
raise ValueError(f"Story file not found: {story_file}")
|
|
|
|
with open(story_file, "r", encoding="utf-8") as f:
|
|
story_data = json.load(f)
|
|
print("story_data", story_data)
|
|
|
|
request = VideoGenerateRequest(**story_data)
|
|
request.test_mode = True
|
|
scenes = [StoryScene(**scene) for scene in story_data.get("scenes", [])]
|
|
else:
|
|
req = StoryGenerationRequest(
|
|
resolution=request.resolution,
|
|
story_prompt=request.story_prompt,
|
|
language=request.language,
|
|
segments=request.segments,
|
|
text_llm_provider=request.text_llm_provider,
|
|
text_llm_model=request.text_llm_model,
|
|
image_llm_provider=request.image_llm_provider,
|
|
image_llm_model=request.image_llm_model
|
|
)
|
|
story_list = await llm_service.generate_story_with_images(request=req)
|
|
scenes = [StoryScene(text=scene["text"], image_prompt=scene["image_prompt"], url=scene["url"]) for scene in story_list]
|
|
|
|
|
|
story_data = request.model_dump()
|
|
story_data["scenes"] = [scene.model_dump() for scene in scenes]
|
|
task_id = str(int(time.time()))
|
|
task_dir = utils.task_dir(task_id)
|
|
os.makedirs(task_dir, exist_ok=True)
|
|
story_file = os.path.join(task_dir, "story.json")
|
|
for i, scene in enumerate(story_list, 1):
|
|
if scene.get("url"):
|
|
image_path = os.path.join(task_dir, f"{i}.png")
|
|
try:
|
|
response = requests.get(scene["url"])
|
|
if response.status_code == 200:
|
|
with open(image_path, "wb") as f:
|
|
f.write(response.content)
|
|
logger.info(f"Downloaded image {i} to {image_path}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to download image {i}: {e}")
|
|
|
|
with open(story_file, "w", encoding="utf-8") as f:
|
|
json.dump(story_data, f, ensure_ascii=False, indent=2)
|
|
|
|
|
|
return await create_video_with_scenes(task_dir, scenes, request.voice_name, request.voice_rate, request.test_mode)
|
|
except Exception as e:
|
|
logger.error(f"Failed to generate video: {e}")
|
|
raise e |