ShortiFoley / app.py
Bils's picture
Update app.py
d278100 verified
raw
history blame
19.8 kB
# app.py β€” ShortiFoley (Video -> Foley)
# Created by bilsimaging.com
import os
# Prefer safetensors globally (fixes CLAP .bin crash on ZeroGPU)
os.environ.setdefault("HF_PREFER_SAFETENSORS", "1")
import sys
import json
import base64
import random
import tempfile
import datetime
from pathlib import Path
from typing import List, Optional, Tuple, Dict
import numpy as np
import torch
import torchaudio
import gradio as gr
from loguru import logger
from huggingface_hub import snapshot_download
import spaces # HF Spaces ZeroGPU & MCP integration
# -------------------------
# Constants & configuration
# -------------------------
ROOT = Path(__file__).parent.resolve()
REPO_DIR = ROOT / "HunyuanVideo-Foley"
WEIGHTS_DIR = Path(os.environ.get("HIFI_FOLEY_MODEL_PATH", str(ROOT / "weights")))
CONFIG_PATH = Path(os.environ.get("HIFI_FOLEY_CONFIG", str(REPO_DIR / "configs" / "hunyuanvideo-foley-xxl.yaml")))
OUTPUTS_DIR = Path(os.environ.get("OUTPUTS_DIR", str(ROOT / "outputs")))
OUTPUTS_DIR.mkdir(parents=True, exist_ok=True)
SPACE_TITLE = "🎡 ShortiFoley β€” HunyuanVideo-Foley"
SPACE_TAGLINE = "Text/Video β†’ Audio Foley Β· Created by bilsimaging.com"
WATERMARK_NOTE = "Made with ❀️ by bilsimaging.com"
# ZeroGPU limit (<=120)
GPU_DURATION = int(os.environ.get("GPU_DURATION_SECS", "110"))
# Globals
_model_dict = None
_cfg = None
_device: Optional[torch.device] = None
# ------------
# Small helpers
# ------------
def _setup_device(pref: str = "auto", gpu_id: int = 0) -> torch.device:
"""Pick CUDA if available, else MPS, else CPU."""
if pref == "auto":
if torch.cuda.is_available():
d = torch.device(f"cuda:{gpu_id}")
elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
d = torch.device("mps")
else:
d = torch.device("cpu")
else:
d = torch.device(pref)
logger.info(f"Using {d}")
return d
def _ensure_repo() -> None:
"""Shallow-clone Tencent repo with LFS smudge disabled (avoid LFS quota checkout)."""
if REPO_DIR.exists():
return
cmd = (
"GIT_LFS_SKIP_SMUDGE=1 "
"git -c filter.lfs.smudge= -c filter.lfs.required=false "
f"clone --depth 1 https://github.com/Tencent-Hunyuan/HunyuanVideo-Foley.git {REPO_DIR}"
)
logger.info(f">> {cmd}")
os.system(cmd)
def _download_weights_if_needed() -> None:
"""Snapshot only needed files from HF weights/model hub."""
WEIGHTS_DIR.mkdir(parents=True, exist_ok=True)
snapshot_download(
repo_id="tencent/HunyuanVideo-Foley",
local_dir=str(WEIGHTS_DIR),
resume_download=True,
allow_patterns=[
"hunyuanvideo_foley.pth",
"synchformer_state_dict.pth",
"vae_128d_48k.pth",
"assets/*",
"config.yaml",
],
)
def prepare_once() -> None:
_ensure_repo()
_download_weights_if_needed()
# -----------------------
# Model load & inference
# -----------------------
def auto_load_models() -> str:
"""
Load HunyuanVideo-Foley + encoders on the chosen device.
Ensures safetensors is preferred to avoid ZeroGPU issues with .bin checkpoints.
"""
global _model_dict, _cfg, _device
if _model_dict is not None and _cfg is not None:
return "βœ… Model already loaded."
# Make absolutely sure safetensors is preferred
os.environ["HF_PREFER_SAFETENSORS"] = "1"
sys.path.append(str(REPO_DIR))
from hunyuanvideo_foley.utils.model_utils import load_model
_device = _setup_device("auto", 0)
logger.info("Loading HunyuanVideo-Foley model...")
logger.info(f"MODEL_PATH: {WEIGHTS_DIR}")
logger.info(f"CONFIG_PATH: {CONFIG_PATH}")
try:
_model_dict, _cfg = load_model(str(WEIGHTS_DIR), str(CONFIG_PATH), _device)
return "βœ… Model loaded."
except OSError as e:
logger.error(str(e))
logger.info("Retrying after enforcing safetensors preference...")
os.environ["HF_PREFER_SAFETENSORS"] = "1"
try:
_model_dict, _cfg = load_model(str(WEIGHTS_DIR), str(CONFIG_PATH), _device)
return "βœ… Model loaded (after safetensors retry)."
except Exception as e2:
logger.error(str(e2))
return f"❌ Failed to load model: {e2}"
except Exception as e:
logger.error(str(e))
return f"❌ Failed to load model: {e}"
def _merge_audio_video(audio_path: str, video_path: str, out_path: str) -> None:
"""Preferred: project's util; fallback to ffmpeg."""
sys.path.append(str(REPO_DIR))
try:
from hunyuanvideo_foley.utils.media_utils import merge_audio_video
merge_audio_video(audio_path, video_path, out_path)
except Exception as e:
logger.warning(f"merge_audio_video failed, falling back to ffmpeg: {e}")
import subprocess
cmd = [
"ffmpeg", "-y",
"-i", video_path,
"-i", audio_path,
"-c:v", "copy",
"-c:a", "aac",
"-shortest",
out_path
]
subprocess.run(cmd, check=True)
def _save_outputs(video_src: str, audio_tensor: torch.Tensor, sr: int, idx: int,
prompt: str) -> str:
"""Save WAV + MP4 in outputs/, add metadata with a soft watermark note."""
# torchaudio expects [C, N]
if audio_tensor.ndim == 1:
audio_tensor = audio_tensor.unsqueeze(0)
tmpdir = Path(tempfile.mkdtemp())
wav_path = tmpdir / f"gen_{idx}.wav"
torchaudio.save(str(wav_path), audio_tensor.cpu(), sr)
ts = datetime.datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f")
base = f"shortifoley_{ts}_{idx}"
out_mp4 = OUTPUTS_DIR / f"{base}.mp4"
_merge_audio_video(str(wav_path), video_src, str(out_mp4))
# Sidecar JSON
meta = {
"id": base,
"created_utc": datetime.datetime.utcnow().isoformat() + "Z",
"source_video": Path(video_src).name,
"output_video": Path(out_mp4).name,
"prompt": prompt or "",
"watermark_note": WATERMARK_NOTE,
"tool": "ShortiFoley (HunyuanVideo-Foley)"
}
(OUTPUTS_DIR / f"{base}.json").write_text(json.dumps(meta, ensure_ascii=False, indent=2))
return str(out_mp4)
def _list_gallery(limit: int = 100) -> List[str]:
vids = []
for p in sorted(OUTPUTS_DIR.glob("*.mp4"), key=lambda x: x.stat().st_mtime, reverse=True):
vids.append(str(p))
if len(vids) >= limit:
break
return vids
# ================
# Inference kernel
# ================
@spaces.GPU(duration=GPU_DURATION)
@torch.inference_mode()
def infer_single_video(
video_file: str,
text_prompt: str,
guidance_scale: float = 4.5,
num_inference_steps: int = 50,
sample_nums: int = 1,
) -> Tuple[List[str], str]:
"""
Generate Foley audio for an uploaded video (1–6 variants).
Returns: (list of output video paths, status message)
"""
# Lazy-load if needed
if _model_dict is None or _cfg is None:
msg = auto_load_models()
if not str(msg).startswith("βœ…"):
return [], f"❌ {msg}"
if not video_file:
return [], "❌ Please provide a video."
sys.path.append(str(REPO_DIR))
from hunyuanvideo_foley.utils.feature_utils import feature_process
from hunyuanvideo_foley.utils.model_utils import denoise_process
# preprocess
visual_feats, text_feats, audio_len_s = feature_process(
video_file, (text_prompt or "").strip(), _model_dict, _cfg
)
# generate batch
n = int(max(1, min(6, sample_nums)))
audio, sr = denoise_process(
visual_feats,
text_feats,
audio_len_s,
_model_dict,
_cfg,
guidance_scale=float(guidance_scale),
num_inference_steps=int(num_inference_steps),
batch_size=n,
)
# save results
outs = []
for i in range(n):
outs.append(_save_outputs(video_file, audio[i], sr, i + 1, text_prompt or ""))
return outs, f"βœ… Generated {len(outs)} result(s). Saved to {OUTPUTS_DIR}/"
# -------------
# Gradio UI (with MCP+API inside the same app)
# -------------
def _about_html() -> str:
return f"""
<div style="line-height:1.6">
<h2>About ShortiFoley</h2>
<p><b>ShortiFoley</b> turns short videos (and an optional text hint) into realistic Foley sound.
Powered by Tencent’s HunyuanVideo-Foley (SigLIP2 + CLAP), with autosave and an MCP server for automation (e.g., n8n).</p>
<p><b>Created by <a href="https://bilsimaging.com" target="_blank" rel="noopener">bilsimaging.com</a></b></p>
<h3>Quick Steps</h3>
<ol>
<li>Upload a clip (ideally &lt; 120s).</li>
<li>Optionally describe the sound (English).</li>
<li>Pick variants (1–6), adjust CFG and steps.</li>
<li>Hit <b>Generate</b>. Results show on the right and save into the Gallery.</li>
</ol>
<h3>Tips for Best Quality</h3>
<ul>
<li>Use tight clips (5–30s) around the action.</li>
<li>Include material & action cues: β€œmetal clang”, β€œglass shatter”, β€œrubber on wet tile”.</li>
<li>Describe ambience: β€œroomy”, β€œechoey”, β€œdistant crowd”.</li>
<li>Generate 2–4 variants and pick the most natural.</li>
</ul>
<h3>MCP & API</h3>
<p>This Space exposes an <b>MCP server</b> and simple REST endpoints (see β€œAPI & MCP” tab).
Perfect for pipelines and tools like <b>n8n</b>.</p>
<h3>Watermark</h3>
<p>Each output writes a JSON sidecar including: <i>{WATERMARK_NOTE}</i>. Ask if you want a visible overlay.</p>
</div>
"""
def create_ui() -> gr.Blocks:
css = """
.main-header{ text-align:center; padding:1.2rem; border-radius:18px; background:linear-gradient(135deg,#6366f1,#8b5cf6); color:white; box-shadow:0 12px 40px rgba(99,102,241,.35); margin-bottom:16px;}
.main-header h1{ margin:0; font-size:2.0rem; font-weight:800;}
.main-header p{ margin:.25rem 0 0; opacity:.95; font-weight:500;}
.card{ background:white; border:1px solid #e7e9ef; border-radius:16px; padding:14px; box-shadow:0 10px 28px rgba(0,0,0,.06);}
.generate-btn button{ font-weight:800; border-radius:12px; padding:10px 18px;}
.minor-btn button{ border-radius:10px;}
.muted{ color:#64748b; }
"""
with gr.Blocks(title="ShortiFoley β€” HunyuanVideo-Foley", css=css) as demo:
gr.HTML(f"<div class='main-header'><h1>{SPACE_TITLE}</h1><p>{SPACE_TAGLINE}</p></div>")
with gr.Tabs():
with gr.Tab("Run"):
with gr.Row():
# LEFT: input
with gr.Column(scale=1, elem_classes=["card"]):
gr.Markdown("### πŸ“Ή Input")
video_input = gr.Video(label="Upload Video", height=300)
text_input = gr.Textbox(
label="🎯 Audio Description (optional, English)",
placeholder="e.g., Rubber soles on wet tile; distant chatter; occasional splashes.",
lines=3
)
with gr.Row():
guidance_scale = gr.Slider(1.0, 10.0, value=4.5, step=0.1, label="CFG")
steps = gr.Slider(10, 100, value=50, step=5, label="Steps")
samples = gr.Slider(1, 6, value=1, step=1, label="Variants")
with gr.Row():
load_btn = gr.Button("βš™οΈ Load model", variant="secondary", elem_classes=["minor-btn"])
generate = gr.Button("🎡 Generate", variant="primary", elem_classes=["generate-btn"])
status = gr.Textbox(label="Status", interactive=False)
# RIGHT: results
with gr.Column(scale=1, elem_classes=["card"]):
gr.Markdown("### πŸŽ₯ Result(s)")
v1 = gr.Video(label="Sample 1", height=260, visible=True)
with gr.Row():
v2 = gr.Video(label="Sample 2", height=160, visible=False)
v3 = gr.Video(label="Sample 3", height=160, visible=False)
with gr.Row():
v4 = gr.Video(label="Sample 4", height=160, visible=False)
v5 = gr.Video(label="Sample 5", height=160, visible=False)
v6 = gr.Video(label="Sample 6", height=160, visible=False)
gr.Markdown("<span class='muted'>Autosaved to the Gallery tab.</span>")
# Generate handler
def _process_and_update(video_file, text_prompt, cfg, nsteps, nsamples):
outs, msg = infer_single_video(video_file, text_prompt, cfg, nsteps, nsamples)
vis = []
for i in range(6):
if i < len(outs):
vis.append(gr.update(visible=True, value=outs[i]))
else:
vis.append(gr.update(visible=False, value=None))
return (*vis, msg)
generate.click(
fn=_process_and_update,
inputs=[video_input, text_input, guidance_scale, steps, samples],
outputs=[v1, v2, v3, v4, v5, v6, status],
api_name="/infer",
api_description="Generate Foley audio for an uploaded video. Returns up to 6 video+audio files."
)
load_btn.click(
fn=auto_load_models,
inputs=[],
outputs=[status],
api_name="/load_model",
api_description="Load/initialize the ShortiFoley model and encoders."
)
# Toggle visibility based on variants
def _toggle_vis(n):
n = int(n)
return [
gr.update(visible=True),
gr.update(visible=n >= 2),
gr.update(visible=n >= 3),
gr.update(visible=n >= 4),
gr.update(visible=n >= 5),
gr.update(visible=n >= 6),
]
samples.change(_toggle_vis, inputs=[samples], outputs=[v1, v2, v3, v4, v5, v6])
with gr.Tab("πŸ“ Gallery"):
gr.Markdown("Latest generated videos (autosaved to `outputs/`).")
gallery = gr.Gallery(
value=_list_gallery(),
columns=3,
preview=True,
label="Saved Results"
)
refresh = gr.Button("πŸ”„ Refresh Gallery")
refresh.click(lambda: gr.update(value=_list_gallery()), outputs=[gallery])
with gr.Tab("API & MCP"):
gr.Markdown("""
### REST examples
**POST** `/api_generate_from_url`
```json
{
"video_url_or_b64": "https://yourhost/sample.mp4",
"text_prompt": "metallic clink; hollow room reverb",
"guidance_scale": 4.5,
"num_inference_steps": 50,
"sample_nums": 2
}
```
**POST** `/load_model_tool`
Loads the model proactively (useful before batch runs).
### MCP resources & prompt
- `shortifoley://status` β†’ quick health info
- `foley_prompt` β†’ reusable guidance for describing the sound
Works great with n8n: call `load_model_tool` once, then `api_generate_from_url` for each clip.
""")
with gr.Tab("ℹ️ About"):
gr.HTML(_about_html())
# Keep gallery fresh after generation
generate.click(lambda: gr.update(value=_list_gallery()), outputs=[gallery])
# ---- REST + MCP endpoints (inside Blocks) ----
def _download_to_tmp(url: str) -> str:
try:
import requests
except Exception:
raise RuntimeError("Missing dependency 'requests'. Add it to requirements.txt to use URL inputs.")
r = requests.get(url, timeout=30)
r.raise_for_status()
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
tmp.write(r.content)
tmp.flush()
tmp.close()
return tmp.name
def _maybe_from_base64(data_url_or_b64: str) -> str:
b64 = data_url_or_b64
if data_url_or_b64.startswith("data:"):
b64 = data_url_or_b64.split(",", 1)[-1]
raw = base64.b64decode(b64)
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
tmp.write(raw)
tmp.flush()
tmp.close()
return tmp.name
def _normalize_video_input(video_url_or_b64: str) -> str:
v = (video_url_or_b64 or "").strip()
if v.startswith("http://") or v.startswith("https://"):
return _download_to_tmp(v)
return _maybe_from_base64(v)
@gr.api
def api_generate_from_url(
video_url_or_b64: str,
text_prompt: str = "",
guidance_scale: float = 4.5,
num_inference_steps: int = 50,
sample_nums: int = 1,
) -> Dict[str, List[str]]:
if _model_dict is None or _cfg is None:
msg = auto_load_models()
if not str(msg).startswith("βœ…"):
raise RuntimeError(msg)
local = _normalize_video_input(video_url_or_b64)
outs, msg = infer_single_video(local, text_prompt, guidance_scale, num_inference_steps, sample_nums)
return {"videos": outs, "message": msg}
@gr.api
def load_model_tool() -> str:
"""Ensure model is loaded on server (convenient for MCP/REST)."""
return auto_load_models()
@gr.mcp.resource("shortifoley://status")
def shortifoley_status() -> str:
"""Return a simple readiness string for MCP clients."""
ready = _model_dict is not None and _cfg is not None
dev = "cuda" if (_device and _device.type == "cuda") else ("mps" if (_device and _device.type == "mps") else "cpu")
return f"ShortiFoley status: {'ready' if ready else 'loading'} | device={dev} | outputs={OUTPUTS_DIR}"
@gr.mcp.prompt()
def foley_prompt(name: str = "default") -> str:
"""Reusable guidance for describing sound ambience."""
return (
"Describe the expected environmental sound precisely. Mention material, rhythm, intensity, and ambience.\n"
"Example: 'Soft leather footfalls on wet pavement with distant traffic hiss; occasional splashes.'"
)
# Auto-load model when UI first renders
demo.load(
fn=auto_load_models,
inputs=None,
outputs=[status]
)
return demo
def set_seeds(s: int = 1):
random.seed(s)
np.random.seed(s)
torch.manual_seed(s)
# -------------
# App bootstrap
# -------------
if __name__ == "__main__":
logger.remove()
logger.add(lambda m: print(m, end=""), level="INFO")
set_seeds(1)
logger.info("===== Application Startup =====\n")
prepare_once()
# Probe imports (early surfacing)
sys.path.append(str(REPO_DIR))
try:
from hunyuanvideo_foley.utils.model_utils import load_model, denoise_process # noqa: F401
from hunyuanvideo_foley.utils.feature_utils import feature_process # noqa: F401
from hunyuanvideo_foley.utils.media_utils import merge_audio_video # noqa: F401
except Exception as e:
logger.warning(f"Repo imports not ready yet: {e}")
ui = create_ui()
# Enable MCP server so tools/resources/prompts are discoverable
ui.launch(
server_name="0.0.0.0",
share=False,
show_error=True,
mcp_server=True, # MCP on (great for n8n)
)