Spaces:
Build error
Build error
| import torch | |
| import gradio as gr | |
| import yt_dlp as youtube_dl | |
| from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, WhisperTokenizer, pipeline | |
| from transformers.pipelines.audio_utils import ffmpeg_read | |
| import tempfile | |
| import os | |
| import time | |
| import requests | |
| from playwright.sync_api import sync_playwright | |
| from languages import get_language_names | |
| from subtitle import text_output, subtitle_output | |
| import datetime | |
| import psutil | |
| import subprocess | |
| from gpustat import GPUStatCollection | |
| import cpuinfo | |
| try: | |
| import spaces | |
| USING_SPACES = True | |
| except ImportError: | |
| USING_SPACES = False | |
| subprocess.run( | |
| "pip install flash-attn --no-build-isolation", | |
| env={"FLASH_ATTENTION_SKIP_CUDA_BUILD": "TRUE"}, | |
| shell=True, | |
| ) | |
| os.system("playwright install") | |
| YT_LENGTH_LIMIT_S = 360 | |
| SPACES_GPU_DURATION = 90 | |
| device = 0 if torch.cuda.is_available() else "cpu" | |
| def gpu_decorator(duration=60): | |
| def actual_decorator(func): | |
| if USING_SPACES: | |
| return spaces.GPU(duration=duration)(func) | |
| return func | |
| return actual_decorator | |
| def device_info(): | |
| try: | |
| subprocess.run(["df", "-h"], check=True) | |
| subprocess.run(["lsblk"], check=True) | |
| subprocess.run(["free", "-h"], check=True) | |
| subprocess.run(["lscpu"], check=True) | |
| subprocess.run(["nvidia-smi"], check=True) | |
| except subprocess.CalledProcessError as e: | |
| print(f"Command failed: {e}") | |
| def update_gpu_status(): | |
| if torch.cuda.is_available() == False: | |
| return "No Nvidia Device" | |
| try: | |
| gpu_stats = GPUStatCollection.new_query() | |
| for gpu in gpu_stats: | |
| # Assuming you want to monitor the first GPU, index 0 | |
| gpu_id = gpu.index | |
| gpu_name = gpu.name | |
| gpu_utilization = gpu.utilization | |
| memory_used = gpu.memory_used | |
| memory_total = gpu.memory_total | |
| memory_utilization = (memory_used / memory_total) * 100 | |
| gpu_status=(f"**GPU Name** {gpu_id}: {gpu_name}\nUtilization: {gpu_utilization}%\n**Memory Used**: {memory_used}MB\n**Memory Total**: {memory_total}MB\n**Memory Utilization**: {memory_utilization:.2f}%\n") | |
| return gpu_status | |
| except Exception as e: | |
| return torch_update_gpu_status() | |
| def torch_update_gpu_status(): | |
| if torch.cuda.is_available(): | |
| gpu_info = torch.cuda.get_device_name(0) | |
| gpu_memory = torch.cuda.mem_get_info(0) | |
| total_memory = gpu_memory[1] / (1024 * 1024 * 1024) | |
| free_memory=gpu_memory[0] /(1024 *1024 * 1024) | |
| used_memory = (gpu_memory[1] - gpu_memory[0]) / (1024 * 1024 * 1024) | |
| gpu_status = f"**GPU Name**: {gpu_info}\n**Free Memory**: {free_memory:.2f}GB\n**Total Memory**: {total_memory:.2f} GB\n**Used Memory**: {used_memory:.2f} GB\n" | |
| else: | |
| gpu_status = "No GPU available" | |
| return gpu_status | |
| def update_cpu_status(): | |
| current_time = datetime.datetime.utcnow() | |
| time_str = current_time.strftime("%Y-%m-%d %H:%M:%S") | |
| cpu_percent = psutil.cpu_percent() | |
| cpu_freq = psutil.cpu_freq() | |
| cpu_count = psutil.cpu_count(logical=True) | |
| cpu_name = cpuinfo.get_cpu_info().get("brand_raw", "Unknown CPU") | |
| virtual_memory = psutil.virtual_memory() | |
| cpu_status = f"**{time_str} (UTC+0)**\n\n" | |
| cpu_status += f"**CPU Name**: {cpu_name}\n" | |
| cpu_status += f"**CPU Usage**: {cpu_percent}%\n" | |
| cpu_status += f"**CPU Frequency**: *Current*: {cpu_freq.current:.2f}MHz, *Max*: {cpu_freq.max:.2f}MHz, *Min*: {cpu_freq.min:.2f}MHz\n" | |
| cpu_status += f"**CPU Cores**: {cpu_count}\n" | |
| cpu_status += f"**Virtual Memory**: *Total*: {(virtual_memory.total / (1024 * 1024 * 1024)):.2f}GB, *Available*: {(virtual_memory.available / (1024 * 1024 * 1024)):.2f}GB, *Used*: {(virtual_memory.used / (1024 * 1024 * 1024)):.2f}GB, *Percentage*: {virtual_memory.percent}%\n\n" | |
| return cpu_status | |
| def update_status(): | |
| gpu_status = update_gpu_status() | |
| cpu_status = update_cpu_status() | |
| sys_status=cpu_status+gpu_status | |
| return sys_status | |
| def refresh_status(): | |
| return update_status() | |
| def transcribe(inputs, model, language, batch_size, chunk_length_s, stride_length_s, task, timestamp_mode, progress=gr.Progress(track_tqdm=True)): | |
| try: | |
| if inputs is None: | |
| raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.") | |
| torch_dtype = torch.float16 | |
| model_gen = AutoModelForSpeechSeq2Seq.from_pretrained( | |
| model, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True | |
| ) | |
| model_gen.to(device) | |
| processor = AutoProcessor.from_pretrained(model) | |
| tokenizer = WhisperTokenizer.from_pretrained(model) | |
| pipe = pipeline( | |
| task="automatic-speech-recognition", | |
| model=model_gen, | |
| chunk_length_s=chunk_length_s, | |
| stride_length_s=stride_length_s, | |
| tokenizer=tokenizer, | |
| feature_extractor=processor.feature_extractor, | |
| torch_dtype=torch_dtype, | |
| model_kwargs={"attn_implementation": "flash_attention_2"}, | |
| device=device, | |
| ) | |
| generate_kwargs = {} | |
| if language != "Automatic Detection" and model.endswith(".en") == False: | |
| generate_kwargs["language"] = language | |
| if model.endswith(".en") == False: | |
| generate_kwargs["task"] = task | |
| output = pipe(inputs, batch_size=batch_size, generate_kwargs=generate_kwargs, return_timestamps=timestamp_mode) | |
| print(output) | |
| print({"inputs": inputs, "model": model, "language": language, "batch_size": batch_size, "chunk_length_s": chunk_length_s, "stride_length_s": stride_length_s, "task": task, "timestamp_mode": timestamp_mode}) | |
| if not timestamp_mode: | |
| text = output['text'] | |
| return text_output(inputs, text) | |
| else: | |
| chunks = output['chunks'] | |
| return subtitle_output(inputs, chunks) | |
| except Exception as e: | |
| error_message = str(e) | |
| raise gr.Error(error_message, duration=20) | |
| def _download_yt_audio(yt_url, filename): | |
| info_loader = youtube_dl.YoutubeDL() | |
| try: | |
| info = info_loader.extract_info(yt_url, download=False) | |
| except youtube_dl.utils.DownloadError as err: | |
| raise gr.Error(str(err)) | |
| file_length = info.get("duration_string") | |
| if not file_length: | |
| raise gr.Error("Video duration is unavailable.") | |
| file_h_m_s = file_length.split(":") | |
| file_h_m_s = [int(sub_length) for sub_length in file_h_m_s] | |
| if len(file_h_m_s) == 1: | |
| file_h_m_s.insert(0, 0) | |
| if len(file_h_m_s) == 2: | |
| file_h_m_s.insert(0, 0) | |
| file_length_s = file_h_m_s[0] * 3600 + file_h_m_s[1] * 60 + file_h_m_s[2] | |
| if file_length_s > YT_LENGTH_LIMIT_S: | |
| yt_length_limit_hms = time.strftime("%HH:%MM:%SS", time.gmtime(YT_LENGTH_LIMIT_S)) | |
| file_length_hms = time.strftime("%HH:%MM:%SS", time.gmtime(file_length_s)) | |
| raise gr.Error(f"Maximum YouTube length is {yt_length_limit_hms}, got {file_length_hms} YouTube video.", duration=20) | |
| try: | |
| ydl_opts = { | |
| "outtmpl": filename, | |
| "format": "bestaudio[ext=m4a]/best", | |
| } | |
| with youtube_dl.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([yt_url]) | |
| except youtube_dl.utils.ExtractorError as err: | |
| available_formats = info_loader.extract_info(yt_url, download=False)['formats'] | |
| raise gr.Error(f"Requested format not available. Available formats: {available_formats}", duration=20) | |
| def _return_yt_video_id(yt_url): | |
| if "youtube.com/watch?v=" in yt_url: | |
| video_id = yt_url.split("?v=")[1].split("&")[0] | |
| elif "youtu.be/" in yt_url: | |
| video_id = yt_url.split("youtu.be/")[1].split("?")[0] | |
| return video_id | |
| def _return_yt_html_embed(yt_url): | |
| video_id = _return_yt_video_id(yt_url) | |
| HTML_str = ( | |
| f'<center> <iframe width="500" height="320" src="https://www.youtube.com/embed/{video_id}"> </iframe>' | |
| " </center>" | |
| ) | |
| return HTML_str | |
| def _return_yt_thumbnail(yt_url): | |
| video_id = _return_yt_video_id(yt_url) | |
| if not video_id: | |
| raise ValueError("Invalid YouTube URL: Unable to extract video ID.") | |
| thumbnail_url = f"https://img.youtube.com/vi/{video_id}/maxresdefault.jpg" | |
| thumbnail_path = None | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as temp_file: | |
| response = requests.get(thumbnail_url) | |
| if response.status_code == 200: | |
| temp_file.write(response.content) | |
| thumbnail_path = temp_file.name | |
| else: | |
| raise Exception(f"Failed to retrieve thumbnail. Status code: {response.status_code}") | |
| except Exception as e: | |
| print(f"Error occurred: {e}") | |
| return None | |
| return thumbnail_path | |
| def _return_yt_info(yt_url): | |
| video_id = _return_yt_video_id(yt_url) | |
| try: | |
| with sync_playwright() as p: | |
| browser = p.chromium.launch(headless=True) | |
| page = browser.new_page() | |
| page.goto(yt_url) | |
| page.wait_for_load_state("networkidle") | |
| title = page.title() | |
| description = page.query_selector("meta[name='description']").get_attribute("content") | |
| keywords = page.query_selector("meta[name='keywords']").get_attribute("content") | |
| gr_title = gr.Textbox(label="YouTube Title", visible=True, value=title) | |
| gr_description = gr.Textbox(label="YouTube Description", visible=True, value=description) | |
| gr_keywords = gr.Textbox(label="YouTube Keywords", visible=True, value=keywords) | |
| browser.close() | |
| return gr_title, gr_description, gr_keywords | |
| except Exception as e: | |
| print(e) | |
| return gr.Textbox(visible=False), gr.Textbox(visible=False), gr.Textbox(visible=False) | |
| def return_youtube(yt_url): | |
| html_embed_str = _return_yt_html_embed(yt_url) | |
| thumbnail = _return_yt_thumbnail(yt_url) | |
| gr_html = gr.HTML(label="Youtube Video", visible=True, value=html_embed_str) | |
| gr_thumbnail = gr.Image(label="Youtube Thumbnail", visible=True, value=thumbnail) | |
| gr_title, gr_description, gr_keywords = _return_yt_info(yt_url) | |
| return gr_html, gr_thumbnail, gr_title, gr_description, gr_keywords | |
| def yt_transcribe(yt_url, model, language, batch_size, chunk_length_s, stride_length_s, task, timestamp_mode): | |
| gr_html, gr_thumbnail, gr_title, gr_description, gr_keywords = return_youtube(yt_url) | |
| try: | |
| with tempfile.TemporaryDirectory() as tmpdirname: | |
| filepath = os.path.join(tmpdirname, "video.mp4") | |
| _download_yt_audio(yt_url, filepath) | |
| with open(filepath, "rb") as f: | |
| inputs = f.read() | |
| inputs = ffmpeg_read(inputs, pipe.feature_extractor.sampling_rate) | |
| inputs = {"array": inputs, "sampling_rate": pipe.feature_extractor.sampling_rate} | |
| torch_dtype = torch.float16 | |
| model_gen = AutoModelForSpeechSeq2Seq.from_pretrained( | |
| model, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True | |
| ) | |
| model_gen.to(device) | |
| processor = AutoProcessor.from_pretrained(model) | |
| tokenizer = WhisperTokenizer.from_pretrained(model) | |
| pipe = pipeline( | |
| task="automatic-speech-recognition", | |
| model=model_gen, | |
| chunk_length_s=chunk_length_s, | |
| stride_length_s=stride_length_s, | |
| tokenizer=tokenizer, | |
| feature_extractor=processor.feature_extractor, | |
| torch_dtype=torch_dtype, | |
| model_kwargs={"attn_implementation": "flash_attention_2"}, | |
| device=device, | |
| ) | |
| generate_kwargs = {} | |
| if language != "Automatic Detection" and model.endswith(".en") == False: | |
| generate_kwargs["language"] = language | |
| if model.endswith(".en") == False: | |
| generate_kwargs["task"] = task | |
| output = pipe(inputs, batch_size=batch_size, generate_kwargs=generate_kwargs, return_timestamps=timestamp_mode) | |
| print(output) | |
| print({"inputs": yt_url, "model": model, "language": language, "batch_size": batch_size, "chunk_length_s": chunk_length_s, "stride_length_s": stride_length_s, "task": task, "timestamp_mode": timestamp_mode}) | |
| if not timestamp_mode: | |
| text = output['text'] | |
| subtitle, files = text_output(inputs, text) | |
| else: | |
| chunks = output['chunks'] | |
| subtitle, files = subtitle_output(inputs, chunks) | |
| return subtitle, files, gr_title, gr_html, gr_thumbnail, gr_description, gr_keywords | |
| except Exception as e: | |
| error_message = str(e) | |
| gr.Warning(error_message, duration=20) | |
| return gr.Textbox(visible=False),gr.Textbox(visible=False), gr_title, gr_html, gr_thumbnail, gr_description, gr_keywords | |
| demo = gr.Blocks() | |
| file_transcribe = gr.Interface( | |
| fn=transcribe, | |
| inputs=[ | |
| gr.Audio(sources=['upload', 'microphone'], type="filepath", label="Audio file"), | |
| gr.Dropdown( | |
| choices=[ | |
| "openai/whisper-tiny", | |
| "openai/whisper-base", | |
| "openai/whisper-small", | |
| "openai/whisper-medium", | |
| "openai/whisper-large", | |
| "openai/whisper-large-v1", | |
| "openai/whisper-large-v2", "distil-whisper/distil-large-v2", | |
| "openai/whisper-large-v3", "openai/whisper-large-v3-turbo", "distil-whisper/distil-large-v3", "xaviviro/whisper-large-v3-catalan-finetuned-v2", | |
| ], | |
| value="openai/whisper-large-v3-turbo", | |
| label="Model Name", | |
| allow_custom_value=True, | |
| ), | |
| gr.Dropdown(choices=["Automatic Detection"] + sorted(get_language_names()), value="Automatic Detection", label="Language", interactive = True,), | |
| gr.Slider(label="Batch Size", minimum=1, maximum=32, value=16, step=1), | |
| gr.Slider(label="Chunk Length (s)", minimum=1, maximum=60, value=17.5, step=0.1), | |
| gr.Slider(label="Stride Length (s)", minimum=1, maximum=30, value=1, step=0.1), | |
| gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"), | |
| gr.Dropdown( | |
| choices=[True, False, "word"], | |
| value=True, | |
| label="Timestamp Mode" | |
| ), | |
| ], | |
| outputs=[gr.Textbox(label="Output"), gr.File(label="Download Files")], | |
| title="Whisper: Transcribe Audio", | |
| flagging_mode="auto", | |
| ) | |
| video_transcribe = gr.Interface( | |
| fn=transcribe, | |
| inputs=[ | |
| gr.Video(sources=["upload", "webcam"], label="Video file", show_label=False, show_download_button=False, show_share_button=False, streaming=True), | |
| gr.Dropdown( | |
| choices=[ | |
| "openai/whisper-tiny", | |
| "openai/whisper-base", | |
| "openai/whisper-small", | |
| "openai/whisper-medium", | |
| "openai/whisper-large", | |
| "openai/whisper-large-v1", | |
| "openai/whisper-large-v2", "distil-whisper/distil-large-v2", | |
| "openai/whisper-large-v3", "openai/whisper-large-v3-turbo", "distil-whisper/distil-large-v3", "xaviviro/whisper-large-v3-catalan-finetuned-v2", | |
| ], | |
| value="openai/whisper-large-v3-turbo", | |
| label="Model Name", | |
| allow_custom_value=True, | |
| ), | |
| gr.Dropdown(choices=["Automatic Detection"] + sorted(get_language_names()), value="Automatic Detection", label="Language", interactive = True,), | |
| gr.Slider(label="Batch Size", minimum=1, maximum=32, value=16, step=1), | |
| gr.Slider(label="Chunk Length (s)", minimum=1, maximum=60, value=17.5, step=0.1), | |
| gr.Slider(label="Stride Length (s)", minimum=1, maximum=30, value=1, step=0.1), | |
| gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"), | |
| gr.Dropdown( | |
| choices=[True, False, "word"], | |
| value=True, | |
| label="Timestamp Mode" | |
| ), | |
| ], | |
| outputs=[gr.Textbox(label="Output"), gr.File(label="Download Files")], | |
| title="Whisper: Transcribe Video", | |
| flagging_mode="auto", | |
| ) | |
| yt_transcribe = gr.Interface( | |
| fn=yt_transcribe, | |
| inputs=[ | |
| gr.Textbox(lines=1, placeholder="Paste the URL to a YouTube video here", label="YouTube URL"), | |
| gr.Dropdown( | |
| choices=[ | |
| "openai/whisper-tiny", | |
| "openai/whisper-base", | |
| "openai/whisper-small", | |
| "openai/whisper-medium", | |
| "openai/whisper-large", | |
| "openai/whisper-large-v1", | |
| "openai/whisper-large-v2", "distil-whisper/distil-large-v2", | |
| "openai/whisper-large-v3", "openai/whisper-large-v3-turbo", "distil-whisper/distil-large-v3", "xaviviro/whisper-large-v3-catalan-finetuned-v2", | |
| ], | |
| value="openai/whisper-large-v3-turbo", | |
| label="Model Name", | |
| allow_custom_value=True, | |
| ), | |
| gr.Dropdown(choices=["Automatic Detection"] + sorted(get_language_names()), value="Automatic Detection", label="Language", interactive = True,), | |
| gr.Slider(label="Batch Size", minimum=1, maximum=32, value=16, step=1), | |
| gr.Slider(label="Chunk Length (s)", minimum=1, maximum=60, value=17.5, step=0.1), | |
| gr.Slider(label="Stride Length (s)", minimum=1, maximum=30, value=1, step=0.1), | |
| gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"), | |
| gr.Dropdown( | |
| choices=[True, False, "word"], | |
| value=True, | |
| label="Timestamp Mode" | |
| ), | |
| ], | |
| outputs=[ | |
| gr.Textbox(label="Output"), | |
| gr.File(label="Download Files"), | |
| gr.Textbox(label="Youtube Title"), | |
| gr.HTML(label="Youtube Video"), | |
| gr.Image(label="Youtube Thumbnail"), | |
| gr.Textbox(label="Youtube Description"), | |
| gr.Textbox(label="Youtube Keywords"), | |
| ], | |
| title="Whisper: Transcribe YouTube", | |
| flagging_mode="auto", | |
| ) | |
| with demo: | |
| gr.TabbedInterface( | |
| interface_list=[file_transcribe, video_transcribe, yt_transcribe], | |
| tab_names=["Audio", "Video", "YouTube"] | |
| ) | |
| with gr.Group(): | |
| sys_status_output = gr.Markdown(value=refresh_status, label="System Status", container=True, line_breaks=True, show_copy_button=True, every=30) | |
| refresh_button = gr.Button("Refresh System Status") | |
| refresh_button.click(refresh_status, None, sys_status_output) | |
| sys_status_output.value = refresh_status() | |
| if __name__ == "__main__": | |
| demo.queue().launch(ssr_mode=False) |