Spaces:
Paused
Paused
File size: 4,553 Bytes
17ff85c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
from fastapi import FastAPI, HTTPException, Response
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from audio_separator.separator import Separator
import ffmpeg
from datetime import datetime
import logging
import os
import uuid
from youtube_transcript_api import YouTubeTranscriptApi
import asyncio
from fastapi.concurrency import run_in_threadpool
from concurrent.futures import ThreadPoolExecutor
app = FastAPI()
tmp_directory = "tmp"
separator = Separator(output_dir=tmp_directory, log_level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
separator.load_model("UVR-MDX-NET-Inst_Main.onnx")
executor = ThreadPoolExecutor(max_workers=8)
class IsolationRequest(BaseModel):
url: str
start_time: float
duration_seconds: float
@app.post("/isolate")
async def isolate_voice(request: IsolationRequest):
media_url = request.url
start_seconds = request.start_time
duration_seconds = request.duration_seconds
try:
extracted_audio_path = f"{tmp_directory}/{uuid.uuid4()}.wav"
# TODO switch to CUDA
await extract_audio(
media_url, start_seconds, duration_seconds, extracted_audio_path
)
(
primary_stem_output_path,
secondary_stem_output_path,
) = await asyncio.get_event_loop().run_in_executor(
executor,
separator.separate,
extracted_audio_path,
)
with open(f"{tmp_directory}/{primary_stem_output_path}", "rb") as f:
isolated_audio_data = f.read()
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
raise HTTPException(
status_code=500, detail="An error occurred during vocal isolation"
)
finally:
try:
os.remove(extracted_audio_path)
os.remove(f"{tmp_directory}/{primary_stem_output_path}")
os.remove(f"{tmp_directory}/{secondary_stem_output_path}")
except OSError as e:
logging.warning(
f"Error occurred while cleaning up temporary files: {str(e)}"
)
return Response(content=isolated_audio_data, media_type="audio/wav")
async def extract_audio(
media_url: str, start_seconds: float, duration_seconds: float, output_path: str
):
start_time = datetime.now()
await asyncio.get_event_loop().run_in_executor(
None, # Uses the default executor
lambda: ffmpeg.input(media_url, ss=start_seconds)
.output(output_path, format="wav", t=duration_seconds)
.global_args("-loglevel", "error", "-hide_banner")
.global_args("-nostats")
.run(),
)
end_time = datetime.now()
logging.info(
f"Audio extraction took {(end_time - start_time).total_seconds()} seconds"
)
def scrape_subtitles(video_id, translate_to, translate_from):
transcript_list = YouTubeTranscriptApi.list_transcripts(
video_id,
)
# see if translation already exists
try:
return transcript_list.find_transcript([translate_to]).fetch()
except:
pass
# find transcription in video language
try:
return (
transcript_list.find_transcript([translate_from])
.translate(translate_to)
.fetch()
)
except:
pass
# search for any other translatable languages
for transcript in transcript_list:
try:
return transcript.translate(translate_to).fetch()
except:
continue
return None
def format_language_code(lang: str) -> str:
mapping = {
"he": "iw",
"zh": "zh-Hans",
"zh-TW": "zh-Hant",
}
return mapping.get(lang, lang.split("-")[0])
class SubtitleRequest(BaseModel):
video_id: str
translate_to: str
translate_from: str
@app.post("/subtitles")
async def get_subtitles(request: SubtitleRequest):
try:
subtitles = await run_in_threadpool(
scrape_subtitles,
request.video_id,
format_language_code(request.translate_to),
format_language_code(request.translate_from),
)
if subtitles is None:
return Response("Not available", 400)
return JSONResponse(subtitles, 200)
except Exception as e:
logging.warn(e)
raise HTTPException(
status_code=500, detail="An error occurred while getting subtitles"
)
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run(app, host="0.0.0.0", port=8000)
|