In [1]:
!pip install sacrebleu

Collecting sacrebleu
  Downloading sacrebleu-2.5.1-py3-none-any.whl.metadata (51 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/51.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m51.8/51.8 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting portalocker (from sacrebleu)
  Downloading portalocker-3.2.0-py3-none-any.whl.metadata (8.7 kB)
Collecting colorama (from sacrebleu)
  Downloading colorama-0.4.6-py2.py3-none-any.whl.metadata (17 kB)
Downloading sacrebleu-2.5.1-py3-none-any.whl (104 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m104.1/104.1 kB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading colorama-0.4.6-py2.py3-none-any.whl (25 kB)
Downloading portalocker-3.2.0-py3-none-any.whl (22 kB)
Installing collected packages: portalocker, colorama, sacrebleu
Successfully installed colorama-0.4.6 portalocker-3.2.0 sacrebleu-2.5.1


In [2]:
!pip install gradio ffmpeg

Collecting ffmpeg
  Downloading ffmpeg-1.4.tar.gz (5.1 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: ffmpeg
  Building wheel for ffmpeg (setup.py) ... [?25l[?25hdone
  Created wheel for ffmpeg: filename=ffmpeg-1.4-py3-none-any.whl size=6083 sha256=4dccf8185b38521d5a4bb388563dd90ad11db98d010fabcd2fbf6881755f8891
  Stored in directory: /root/.cache/pip/wheels/56/30/c5/576bdd729f3bc062d62a551be7fefd6ed2f761901568171e4e
Successfully built ffmpeg
Installing collected packages: ffmpeg
Successfully installed ffmpeg-1.4


In [5]:
!pip install -r /content/requirements.txt

Collecting nemo_toolkit@ git+https://github.com/NVIDIA/NeMo.git (from nemo_toolkit[asr,tts]@ git+https://github.com/NVIDIA/NeMo.git->-r /content/requirements.txt (line 1))
  Cloning https://github.com/NVIDIA/NeMo.git to /tmp/pip-install-7j0ourtb/nemo-toolkit_f8f688ddcc76456592aa95f279152eb7
  Running command git clone --filter=blob:none --quiet https://github.com/NVIDIA/NeMo.git /tmp/pip-install-7j0ourtb/nemo-toolkit_f8f688ddcc76456592aa95f279152eb7
  Resolved https://github.com/NVIDIA/NeMo.git to commit 99e5dac685f88718f81281c0a1f51ca0ac2cb64d
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting fsspec==2024.12.0 (from nemo_toolkit@ git+https://github.com/NVIDIA/NeMo.git->nemo_toolkit[asr,tts]@ git+https://github.com/NVIDIA/NeMo.git->-r /content/requirements.txt (line 1))
  Downloading fsspec-2024.12.0-py3-none-any.whl.metadata (11 kB)
Collecting onnx>=1.7.0

In [6]:
%%writefile app.py
import gradio as gr
import os
import tempfile
import subprocess
import librosa
import soundfile as sf
import torch
from pathlib import Path
import traceback
from typing import List, Dict, Tuple, Optional
import time

# Install required packages
def install_requirements():
    """Install required packages if not already installed"""
    try:
        import nemo
        print("NeMo already installed")
    except ImportError:
        print("Installing NeMo...")
        subprocess.run([
            "pip", "install",
            "nemo_toolkit[asr,tts] @ git+https://github.com/NVIDIA/NeMo.git"
        ], check=True)

    try:
        import moviepy
        print("MoviePy already installed")
    except ImportError:
        print("Installing MoviePy...")
        subprocess.run(["pip", "install", "moviepy"], check=True)

# Try to install requirements
try:
    install_requirements()
    from nemo.collections.speechlm2.models import SALM
    import moviepy.editor as mp
    DEPENDENCIES_AVAILABLE = True
except Exception as e:
    print(f"Warning: Could not install dependencies: {e}")
    DEPENDENCIES_AVAILABLE = False

class VideoQASummarizer:
    def __init__(self):
        self.model = None
        self.current_transcript = ""
        self.model_loaded = False
        self.device = self._get_device()

    def _get_device(self):
        """Determine the best available device"""
        if torch.cuda.is_available():
            device = torch.device("cuda")
            print(f"CUDA available: {torch.cuda.get_device_name()}")
            print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")
            return device
        else:
            print("CUDA not available, using CPU")
            return torch.device("cpu")

    def load_model(self):
        """Load the Canary-Qwen-2.5B model with CUDA support"""
        if not DEPENDENCIES_AVAILABLE:
            return "Error: Required dependencies not available. Please install manually."

        try:
            if self.model is None:
                print(f"Loading Canary-Qwen-2.5B model on {self.device}...")

                # Load model with device specification
                self.model = SALM.from_pretrained('nvidia/canary-qwen-2.5b')

                # Move model to GPU if available
                if self.device.type == "cuda":
                    self.model = self.model.to(self.device)
                    print(f"Model moved to GPU: {torch.cuda.get_device_name()}")

                    # Enable mixed precision for better GPU performance
                    if hasattr(self.model, 'half'):
                        # Use half precision for inference to save memory
                        self.model = self.model.half()
                        print("Enabled half precision for better GPU performance")

                # Set model to evaluation mode for inference
                self.model.eval()

                self.model_loaded = True

                # Display memory usage if using GPU
                if self.device.type == "cuda":
                    memory_allocated = torch.cuda.memory_allocated() / 1024**3
                    memory_reserved = torch.cuda.memory_reserved() / 1024**3
                    return f"Model loaded successfully on GPU!\nMemory allocated: {memory_allocated:.2f} GB\nMemory reserved: {memory_reserved:.2f} GB"
                else:
                    return "Model loaded successfully on CPU!"
            return "Model already loaded."
        except Exception as e:
            error_msg = f"Error loading model: {str(e)}"
            print(error_msg)
            print(traceback.format_exc())
            return error_msg

    def extract_audio_from_video(self, video_path: str) -> str:
        """Extract audio from video file"""
        try:
            # Create temporary audio file
            temp_audio = tempfile.NamedTemporaryFile(delete=False, suffix='.wav')
            temp_audio_path = temp_audio.name
            temp_audio.close()

            # Load video and extract audio
            video = mp.VideoFileClip(video_path)
            audio = video.audio

            # Write audio to temporary file
            audio.write_audiofile(temp_audio_path, verbose=False, logger=None)

            # Clean up
            audio.close()
            video.close()

            return temp_audio_path
        except Exception as e:
            raise Exception(f"Error extracting audio: {str(e)}")

    def split_audio_by_duration(self, audio_path: str, max_duration: int = 30) -> List[str]:
        """Split long audio files into smaller chunks"""
        try:
            # Load audio to check duration
            audio, sr = librosa.load(audio_path, sr=16000)
            total_duration = len(audio) / sr

            if total_duration <= max_duration:
                return [audio_path]

            # Split audio into chunks
            chunk_paths = []
            chunk_samples = max_duration * sr

            for i in range(0, len(audio), chunk_samples):
                chunk = audio[i:i + chunk_samples]

                # Create temporary file for chunk
                temp_chunk = tempfile.NamedTemporaryFile(delete=False, suffix=f'_chunk_{i//chunk_samples}.wav')
                chunk_path = temp_chunk.name
                temp_chunk.close()

                # Save chunk
                sf.write(chunk_path, chunk, sr)
                chunk_paths.append(chunk_path)

            return chunk_paths
        except Exception as e:
            raise Exception(f"Error splitting audio: {str(e)}")

    def preprocess_audio(self, audio_path: str) -> str:
        """Preprocess audio for the model (ensure correct format)"""
        try:
            # Load audio
            audio, sr = librosa.load(audio_path, sr=16000)  # Resample to 16kHz if needed

            # Create new temporary file for processed audio
            temp_processed = tempfile.NamedTemporaryFile(delete=False, suffix='.wav')
            temp_processed_path = temp_processed.name
            temp_processed.close()

            # Save processed audio
            sf.write(temp_processed_path, audio, 16000)

            return temp_processed_path
        except Exception as e:
            raise Exception(f"Error preprocessing audio: {str(e)}")

    def transcribe_audio_chunk(self, audio_path: str) -> str:
        """Transcribe a single audio chunk"""
        try:
            # Preprocess audio
            processed_audio_path = self.preprocess_audio(audio_path)

            # Transcribe using ASR mode with increased token limit
            answer_ids = self.model.generate(
                prompts=[
                    [{"role": "user", "content": f"Transcribe the following: {self.model.audio_locator_tag}", "audio": [processed_audio_path]}]
                ],
                max_new_tokens=2048,  # Increased from 512 to handle longer content
                temperature=0.1,      # Lower temperature for more consistent transcription
                do_sample=True,
            )

            transcript = self.model.tokenizer.ids_to_text(answer_ids[0].cpu())

            # Clean up temporary file
            os.unlink(processed_audio_path)

            return transcript.strip()
        except Exception as e:
            raise Exception(f"Error transcribing chunk: {str(e)}")

    def transcribe_audio(self, audio_path: str) -> str:
        """Transcribe audio using Canary-Qwen-2.5B in ASR mode with chunking for long files"""
        try:
            if not self.model_loaded:
                return "Error: Model not loaded. Please load the model first."

            # Check audio duration and split if necessary
            audio, sr = librosa.load(audio_path, sr=16000)
            duration = len(audio) / sr
            print(f"Audio duration: {duration:.2f} seconds")

            if duration > 30:  # Split long audio files
                print("Long audio detected, splitting into chunks...")
                chunk_paths = self.split_audio_by_duration(audio_path, max_duration=30)

                full_transcript = ""
                for i, chunk_path in enumerate(chunk_paths):
                    print(f"Transcribing chunk {i+1}/{len(chunk_paths)}")
                    chunk_transcript = self.transcribe_audio_chunk(chunk_path)

                    # Clean up chunk transcript (remove model artifacts)
                    chunk_transcript = self.clean_transcript(chunk_transcript)

                    if chunk_transcript:
                        full_transcript += chunk_transcript + " "

                    # Clean up chunk file if we created it
                    if chunk_path != audio_path:
                        os.unlink(chunk_path)

                return full_transcript.strip()
            else:
                # Short audio, transcribe directly
                transcript = self.transcribe_audio_chunk(audio_path)
                return self.clean_transcript(transcript)

        except Exception as e:
            error_msg = f"Error during transcription: {str(e)}"
            print(error_msg)
            print(traceback.format_exc())
            return error_msg

    def clean_transcript(self, transcript: str) -> str:
        """Clean up transcript by removing model artifacts and formatting issues"""
        try:
            # Remove common model artifacts
            artifacts_to_remove = [
                "Sure! Here's the transcription without the timestamps, written as a single paragraph:",
                "Here's the transcription:",
                "Transcription:",
                "<|im_start|>",
                "<|im_end|>",
                "<audio>",
                "</audio>",
            ]

            cleaned = transcript
            for artifact in artifacts_to_remove:
                cleaned = cleaned.replace(artifact, "")

            # Remove extra whitespace and normalize
            cleaned = " ".join(cleaned.split())

            # Remove any leading/trailing punctuation issues
            cleaned = cleaned.strip(" .,!?")

            return cleaned
        except Exception as e:
            print(f"Error cleaning transcript: {e}")
            return transcript

    def answer_question(self, question: str, transcript: str) -> str:
        """Answer questions about the transcript using LLM mode"""
        try:
            if not self.model_loaded:
                return "Error: Model not loaded. Please load the model first."

            if not transcript:
                return "Error: No transcript available. Please transcribe a video first."

            # Use LLM mode to answer questions
            prompt = f"Based on the following transcript, please answer this question: {question}\n\nTranscript: {transcript}"

            with self.model.llm.disable_adapter():
                answer_ids = self.model.generate(
                    prompts=[[{"role": "user", "content": prompt}]],
                    max_new_tokens=1024,  # Increased for longer answers
                    temperature=0.3,
                    do_sample=True,
                )

            answer = self.model.tokenizer.ids_to_text(answer_ids[0].cpu())
            return answer.strip()
        except Exception as e:
            error_msg = f"Error answering question: {str(e)}"
            print(error_msg)
            print(traceback.format_exc())
            return error_msg

    def summarize_transcript(self, transcript: str, summary_type: str = "general") -> str:
        """Summarize the transcript using LLM mode"""
        try:
            if not self.model_loaded:
                return "Error: Model not loaded. Please load the model first."

            if not transcript:
                return "Error: No transcript available. Please transcribe a video first."

            # Create different summary prompts based on type
            if summary_type == "bullet_points":
                prompt = f"Please create a bullet-point summary of the key points from this transcript:\n\n{transcript}"
            elif summary_type == "detailed":
                prompt = f"Please provide a detailed summary of this transcript, including main topics and important details:\n\n{transcript}"
            else:  # general
                prompt = f"Please provide a concise summary of this transcript:\n\n{transcript}"

            with self.model.llm.disable_adapter():
                answer_ids = self.model.generate(
                    prompts=[[{"role": "user", "content": prompt}]],
                    max_new_tokens=1536,  # Increased for longer summaries
                    temperature=0.3,
                    do_sample=True,
                )

            summary = self.model.tokenizer.ids_to_text(answer_ids[0].cpu())
            return summary.strip()
        except Exception as e:
            error_msg = f"Error creating summary: {str(e)}"
            print(error_msg)
            print(traceback.format_exc())
            return error_msg

# Initialize the model
qa_summarizer = VideoQASummarizer()

def load_model_interface():
    """Interface function to load the model"""
    return qa_summarizer.load_model()

def process_video(video_file, progress=gr.Progress()):
    """Process uploaded video and return transcript"""
    if video_file is None:
        return "Please upload a video file.", ""

    try:
        progress(0.1, desc="Extracting audio from video...")
        # Extract audio from video
        audio_path = qa_summarizer.extract_audio_from_video(video_file)

        progress(0.3, desc="Analyzing audio duration...")
        # Check audio duration for progress estimation
        audio, sr = librosa.load(audio_path, sr=16000)
        duration = len(audio) / sr

        progress(0.4, desc="Starting transcription...")
        # Transcribe audio
        transcript = qa_summarizer.transcribe_audio(audio_path)

        progress(0.9, desc="Finalizing transcript...")
        # Store transcript for later use
        qa_summarizer.current_transcript = transcript

        # Clean up temporary audio file
        if os.path.exists(audio_path):
            os.unlink(audio_path)

        progress(1.0, desc="Complete!")
        return f"Video processed successfully! (Duration: {duration:.1f}s)", transcript
    except Exception as e:
        error_msg = f"Error processing video: {str(e)}"
        print(error_msg)
        print(traceback.format_exc())
        return error_msg, ""

def answer_question_interface(question, transcript):
    """Interface function to answer questions"""
    if not question.strip():
        return "Please enter a question."

    return qa_summarizer.answer_question(question, transcript or qa_summarizer.current_transcript)

def summarize_interface(transcript, summary_type):
    """Interface function to create summaries"""
    return qa_summarizer.summarize_transcript(transcript or qa_summarizer.current_transcript, summary_type)

# Create Gradio interface
def create_interface():
    with gr.Blocks(title="Video Q&A and Summarizer", theme=gr.themes.Soft()) as app:
        gr.Markdown("""
        # 🎥 Video Question Answering and Summarizer

        Upload a video file to transcribe its audio content, then ask questions or generate summaries using NVIDIA's Canary-Qwen-2.5B model.

        **Features:**
        - Extract and transcribe audio from video files (handles long videos with chunking)
        - Ask questions about the video content
        - Generate different types of summaries
        - Powered by NVIDIA NeMo Canary-Qwen-2.5B
        """)

        # Model loading section
        with gr.Row():
            gr.Markdown("## 🚀 Step 1: Load Model")

        with gr.Row():
            load_btn = gr.Button("Load Canary-Qwen-2.5B Model", variant="primary")
            model_status = gr.Textbox(label="Model Status", interactive=False)

        load_btn.click(load_model_interface, outputs=model_status)

        # Video processing section
        with gr.Row():
            gr.Markdown("## 📹 Step 2: Upload and Process Video")

        with gr.Row():
            with gr.Column():
                video_input = gr.Video(label="Upload Video File")
                process_btn = gr.Button("Process Video", variant="primary")

            with gr.Column():
                process_status = gr.Textbox(label="Processing Status", interactive=False)
                transcript_output = gr.Textbox(
                    label="Transcript",
                    lines=15,
                    max_lines=25,
                    interactive=False,
                    show_copy_button=True
                )

        process_btn.click(
            process_video,
            inputs=video_input,
            outputs=[process_status, transcript_output],
            show_progress=True
        )

        # Question answering section
        with gr.Row():
            gr.Markdown("## ❓ Step 3: Ask Questions")

        with gr.Row():
            with gr.Column():
                question_input = gr.Textbox(
                    label="Your Question",
                    placeholder="What is this video about?",
                    lines=2
                )
                ask_btn = gr.Button("Ask Question", variant="secondary")

            with gr.Column():
                answer_output = gr.Textbox(
                    label="Answer",
                    lines=6,
                    interactive=False,
                    show_copy_button=True
                )

        ask_btn.click(
            answer_question_interface,
            inputs=[question_input, transcript_output],
            outputs=answer_output
        )

        # Summarization section
        with gr.Row():
            gr.Markdown("## 📝 Step 4: Generate Summary")

        with gr.Row():
            with gr.Column():
                summary_type = gr.Dropdown(
                    choices=["general", "detailed", "bullet_points"],
                    value="general",
                    label="Summary Type"
                )
                summarize_btn = gr.Button("Generate Summary", variant="secondary")

            with gr.Column():
                summary_output = gr.Textbox(
                    label="Summary",
                    lines=10,
                    interactive=False,
                    show_copy_button=True
                )

        summarize_btn.click(
            summarize_interface,
            inputs=[transcript_output, summary_type],
            outputs=summary_output
        )

        # Instructions and tips
        with gr.Row():
            gr.Markdown("""
            ## 💡 Tips & Improvements:

            1. **Supported formats**: MP4, AVI, MOV, MKV, and other common video formats
            2. **Audio quality**: Better audio quality leads to more accurate transcriptions
            3. **Long videos**: The app now automatically splits long audio files into chunks for complete transcription
            4. **Processing time**: Longer videos are processed in chunks, which may take more time but ensures completeness
            5. **Questions**: Be specific with your questions for better answers
            6. **Summaries**: Choose the summary type that best fits your needs

            ## 🔧 GPU & Performance Features:
            - **CUDA GPU Support** - Automatically detects and uses GPU if available
            - **Mixed Precision** - Uses half precision (FP16) on GPU for better performance
            - **Memory Management** - Automatic GPU memory cleanup after each operation
            - **Performance Optimization** - Gradient disabled for inference, caching enabled
            - **Memory Monitoring** - Shows GPU memory usage when model loads

            ## 🔧 Recent Fixes:
            - **Increased token limits** for complete transcriptions (4096 tokens for all operations)
            - **Audio chunking** for videos longer than 30 seconds to prevent cutoffs
            - **Improved transcript cleaning** to remove model artifacts
            - **Better progress tracking** during video processing
            - **Copy buttons** for easy text copying

            ## ⚠️ Requirements:
            - **CUDA GPU** - Strongly recommended for optimal performance (RTX 3090, 4090, A100, etc.)
            - **GPU Memory** - At least 8GB VRAM recommended for the 2.5B model
            - **PyTorch** - Version 2.6+ with CUDA support for FSDP2
            - **Disk Space** - Sufficient space for temporary audio files and model cache
            """)

    return app

# Launch the application
if __name__ == "__main__":
    app = create_interface()
    app.launch(
        share=True
    )

Overwriting app.py


In [None]:
!python app.py

NeMo already installed
MoviePy already installed
2025-07-31 19:08:14.606014: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1753988894.861095    4326 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1753988894.929380    4326 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-07-31 19:08:15.484273: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
error: XDG_RUNTIME_DIR not set in the environment.
ALSA lib confmisc