Spaces:
Paused
Paused
from fastapi import FastAPI, HTTPException, Response | |
from fastapi.responses import JSONResponse | |
from pydantic import BaseModel | |
from audio_separator.separator import Separator | |
import ffmpeg | |
from datetime import datetime | |
import logging | |
import os | |
import uuid | |
from youtube_transcript_api import YouTubeTranscriptApi | |
import asyncio | |
from fastapi.concurrency import run_in_threadpool | |
from concurrent.futures import ThreadPoolExecutor | |
app = FastAPI() | |
tmp_directory = "tmp" | |
separator = Separator(output_dir=tmp_directory, log_level=logging.INFO) | |
logging.getLogger().setLevel(logging.INFO) | |
separator.load_model("UVR-MDX-NET-Inst_Main.onnx") | |
executor = ThreadPoolExecutor(max_workers=8) | |
class IsolationRequest(BaseModel): | |
url: str | |
start_time: float | |
duration_seconds: float | |
async def isolate_voice(request: IsolationRequest): | |
media_url = request.url | |
start_seconds = request.start_time | |
duration_seconds = request.duration_seconds | |
try: | |
extracted_audio_path = f"{tmp_directory}/{uuid.uuid4()}.wav" | |
# TODO switch to CUDA | |
await extract_audio( | |
media_url, start_seconds, duration_seconds, extracted_audio_path | |
) | |
( | |
primary_stem_output_path, | |
secondary_stem_output_path, | |
) = await asyncio.get_event_loop().run_in_executor( | |
executor, | |
separator.separate, | |
extracted_audio_path, | |
) | |
with open(f"{tmp_directory}/{primary_stem_output_path}", "rb") as f: | |
isolated_audio_data = f.read() | |
except Exception as e: | |
logging.error(f"An error occurred: {str(e)}") | |
raise HTTPException( | |
status_code=500, detail="An error occurred during vocal isolation" | |
) | |
finally: | |
try: | |
os.remove(extracted_audio_path) | |
os.remove(f"{tmp_directory}/{primary_stem_output_path}") | |
os.remove(f"{tmp_directory}/{secondary_stem_output_path}") | |
except OSError as e: | |
logging.warning( | |
f"Error occurred while cleaning up temporary files: {str(e)}" | |
) | |
return Response(content=isolated_audio_data, media_type="audio/wav") | |
async def extract_audio( | |
media_url: str, start_seconds: float, duration_seconds: float, output_path: str | |
): | |
start_time = datetime.now() | |
await asyncio.get_event_loop().run_in_executor( | |
None, # Uses the default executor | |
lambda: ffmpeg.input(media_url, ss=start_seconds) | |
.output(output_path, format="wav", t=duration_seconds) | |
.global_args("-loglevel", "error", "-hide_banner") | |
.global_args("-nostats") | |
.run(), | |
) | |
end_time = datetime.now() | |
logging.info( | |
f"Audio extraction took {(end_time - start_time).total_seconds()} seconds" | |
) | |
def scrape_subtitles(video_id, translate_to, translate_from): | |
transcript_list = YouTubeTranscriptApi.list_transcripts( | |
video_id, | |
) | |
# see if translation already exists | |
try: | |
return transcript_list.find_transcript([translate_to]).fetch() | |
except: | |
pass | |
# find transcription in video language | |
try: | |
return ( | |
transcript_list.find_transcript([translate_from]) | |
.translate(translate_to) | |
.fetch() | |
) | |
except: | |
pass | |
# search for any other translatable languages | |
for transcript in transcript_list: | |
try: | |
return transcript.translate(translate_to).fetch() | |
except: | |
continue | |
return None | |
def format_language_code(lang: str) -> str: | |
mapping = { | |
"he": "iw", | |
"zh": "zh-Hans", | |
"zh-TW": "zh-Hant", | |
} | |
return mapping.get(lang, lang.split("-")[0]) | |
class SubtitleRequest(BaseModel): | |
video_id: str | |
translate_to: str | |
translate_from: str | |
async def get_subtitles(request: SubtitleRequest): | |
try: | |
subtitles = await run_in_threadpool( | |
scrape_subtitles, | |
request.video_id, | |
format_language_code(request.translate_to), | |
format_language_code(request.translate_from), | |
) | |
if subtitles is None: | |
return Response("Not available", 400) | |
return JSONResponse(subtitles, 200) | |
except Exception as e: | |
logging.warn(e) | |
raise HTTPException( | |
status_code=500, detail="An error occurred while getting subtitles" | |
) | |
# if __name__ == "__main__": | |
# import uvicorn | |
# uvicorn.run(app, host="0.0.0.0", port=8000) | |