import streamlit as st import moviepy.editor as mp import speech_recognition as sr from pydub import AudioSegment import tempfile import os import io from transformers import pipeline import matplotlib.pyplot as plt import gc import warnings warnings.filterwarnings("ignore") # Configure Streamlit for large file uploads st.set_page_config( page_title="Video/Audio Transcription with Emotion Detection", page_icon="🎬", layout="wide" ) # Set maximum upload size (this needs to be set before any file upload widgets) # Note: You'll also need to configure this in your Streamlit config file or environment @st.cache_data def get_config(): return {"maxUploadSize": 1024} # 1GB in MB # Function to convert video to audio with progress tracking def video_to_audio(video_file, progress_callback=None): """Convert video to audio with memory optimization""" try: # Load the video using moviepy with memory optimization video = mp.VideoFileClip(video_file) # Extract audio audio = video.audio temp_audio_path = tempfile.mktemp(suffix=".mp3") # Write the audio to a file with progress tracking if progress_callback: progress_callback(50) # 50% progress audio.write_audiofile(temp_audio_path, verbose=False, logger=None) # Clean up video object to free memory audio.close() video.close() del video, audio gc.collect() if progress_callback: progress_callback(100) # 100% progress return temp_audio_path except Exception as e: st.error(f"Error converting video to audio: {str(e)}") return None # Function to convert MP3 audio to WAV def convert_mp3_to_wav(mp3_file): """Convert MP3 to WAV with memory optimization""" try: # Load the MP3 file using pydub audio = AudioSegment.from_mp3(mp3_file) # Create a temporary WAV file temp_wav_path = tempfile.mktemp(suffix=".wav") # Export the audio to the temporary WAV file audio.export(temp_wav_path, format="wav") # Clean up to free memory del audio gc.collect() return temp_wav_path except Exception as e: st.error(f"Error converting MP3 to WAV: {str(e)}") return None # Function to transcribe audio to text with chunking for large files def transcribe_audio(audio_file, chunk_duration=60): """Transcribe audio to text with chunking for large files""" try: # Initialize recognizer recognizer = sr.Recognizer() # Load audio and get duration audio_segment = AudioSegment.from_wav(audio_file) duration = len(audio_segment) / 1000 # Duration in seconds transcriptions = [] # If audio is longer than chunk_duration, split it if duration > chunk_duration: num_chunks = int(duration / chunk_duration) + 1 for i in range(num_chunks): start_time = i * chunk_duration * 1000 # Convert to milliseconds end_time = min((i + 1) * chunk_duration * 1000, len(audio_segment)) # Extract chunk chunk = audio_segment[start_time:end_time] # Save chunk temporarily chunk_path = tempfile.mktemp(suffix=".wav") chunk.export(chunk_path, format="wav") # Transcribe chunk try: with sr.AudioFile(chunk_path) as source: audio_data = recognizer.record(source) text = recognizer.recognize_google(audio_data) transcriptions.append(text) except (sr.UnknownValueError, sr.RequestError): transcriptions.append(f"[Chunk {i+1}: Audio could not be transcribed]") # Clean up chunk file os.remove(chunk_path) # Update progress progress = int(((i + 1) / num_chunks) * 100) st.progress(progress / 100, text=f"Transcribing... {progress}%") else: # For shorter audio, transcribe directly with sr.AudioFile(audio_file) as source: audio_data = recognizer.record(source) text = recognizer.recognize_google(audio_data) transcriptions.append(text) # Join all transcriptions full_transcription = " ".join(transcriptions) # Clean up del audio_segment gc.collect() return full_transcription except sr.UnknownValueError: return "Audio could not be understood." except sr.RequestError as e: return f"Could not request results from Google Speech Recognition service: {str(e)}" except Exception as e: return f"Error during transcription: {str(e)}" # Function to perform emotion detection using Hugging Face transformers @st.cache_resource def load_emotion_model(): """Load emotion detection model (cached)""" return pipeline("text-classification", model="j-hartmann/emotion-english-distilroberta-base", return_all_scores=True) def detect_emotion(text): """Detect emotions in text""" try: emotion_pipeline = load_emotion_model() # Split text into chunks if it's too long (model has token limits) max_length = 500 if len(text) > max_length: chunks = [text[i:i+max_length] for i in range(0, len(text), max_length)] all_emotions = {} for chunk in chunks: result = emotion_pipeline(chunk) chunk_emotions = {emotion['label']: emotion['score'] for emotion in result[0]} # Aggregate emotions for emotion, score in chunk_emotions.items(): if emotion in all_emotions: all_emotions[emotion] = (all_emotions[emotion] + score) / 2 else: all_emotions[emotion] = score return all_emotions else: result = emotion_pipeline(text) emotions = {emotion['label']: emotion['score'] for emotion in result[0]} return emotions except Exception as e: st.error(f"Error in emotion detection: {str(e)}") return {"error": "Could not analyze emotions"} # Function to visualize emotions def plot_emotions(emotions): """Create a bar chart of emotions""" if "error" in emotions: return None fig, ax = plt.subplots(figsize=(10, 6)) emotions_sorted = dict(sorted(emotions.items(), key=lambda x: x[1], reverse=True)) colors = ['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4', '#FFEAA7', '#DDA0DD', '#98D8C8'] bars = ax.bar(emotions_sorted.keys(), emotions_sorted.values(), color=colors[:len(emotions_sorted)]) ax.set_xlabel('Emotions') ax.set_ylabel('Confidence Score') ax.set_title('Emotion Detection Results') ax.set_ylim(0, 1) # Add value labels on bars for bar in bars: height = bar.get_height() ax.text(bar.get_x() + bar.get_width()/2., height + 0.01, f'{height:.3f}', ha='center', va='bottom') plt.xticks(rotation=45) plt.tight_layout() return fig # Streamlit app layout st.title("🎬 Video and Audio Transcription with Emotion Detection") st.write("Upload video files up to 1GB or audio files for transcription and emotion analysis.") # Display file size information st.info("📁 **File Size Limits**: Video files up to 1GB, Audio files up to 500MB") # Add instructions for large file uploads with st.expander("📋 Instructions for Large Files"): st.write(""" **For optimal performance with large files:** 1. Ensure stable internet connection 2. Be patient - large files take time to process 3. Don't close the browser tab during processing 4. For very large files, consider splitting them beforehand **Supported formats:** - **Video**: MP4, MOV, AVI - **Audio**: WAV, MP3 """) # Create tabs to separate video and audio uploads tab1, tab2 = st.tabs(["📹 Video Upload", "🎵 Audio Upload"]) with tab1: st.header("Video File Processing") # File uploader for video with increased size limit uploaded_video = st.file_uploader( "Upload Video File", type=["mp4", "mov", "avi"], help="Maximum file size: 1GB" ) if uploaded_video is not None: # Display file information file_size_mb = uploaded_video.size / (1024 * 1024) st.info(f"📊 **File Info**: {uploaded_video.name} ({file_size_mb:.1f} MB)") # Show video preview for smaller files if file_size_mb < 100: # Only show preview for files under 100MB st.video(uploaded_video) # Save the uploaded video file temporarily with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_video: tmp_video.write(uploaded_video.read()) tmp_video_path = tmp_video.name # Add an "Analyze Video" button if st.button("🔄 Analyze Video", type="primary"): progress_bar = st.progress(0) status_text = st.empty() try: with st.spinner("Processing video... This may take several minutes for large files."): status_text.text("Step 1/4: Converting video to audio...") progress_bar.progress(10) # Convert video to audio audio_file = video_to_audio(tmp_video_path, lambda p: progress_bar.progress(10 + p * 0.3)) if audio_file is None: st.error("Failed to extract audio from video.") st.stop() status_text.text("Step 2/4: Converting audio format...") progress_bar.progress(50) # Convert the extracted MP3 audio to WAV wav_audio_file = convert_mp3_to_wav(audio_file) if wav_audio_file is None: st.error("Failed to convert audio format.") st.stop() status_text.text("Step 3/4: Transcribing audio to text...") progress_bar.progress(60) # Transcribe audio to text transcription = transcribe_audio(wav_audio_file) status_text.text("Step 4/4: Analyzing emotions...") progress_bar.progress(90) # Emotion detection emotions = detect_emotion(transcription) progress_bar.progress(100) status_text.text("✅ Processing complete!") # Display results st.success("Analysis completed successfully!") # Show the transcription st.subheader("📝 Transcription") st.text_area("", transcription, height=300, key="video_transcription") # Show emotions st.subheader("😊 Emotion Analysis") col1, col2 = st.columns([1, 1]) with col1: st.write("**Detected Emotions:**") for emotion, score in emotions.items(): st.write(f"- **{emotion.title()}**: {score:.3f}") with col2: fig = plot_emotions(emotions) if fig: st.pyplot(fig) # Store results in session state st.session_state.video_transcription = transcription st.session_state.video_emotions = emotions # Store the audio file as a BytesIO object in memory with open(wav_audio_file, "rb") as f: audio_data = f.read() st.session_state.video_wav_audio_file = io.BytesIO(audio_data) # Cleanup temporary files os.remove(tmp_video_path) os.remove(audio_file) os.remove(wav_audio_file) except Exception as e: st.error(f"An error occurred during processing: {str(e)}") # Clean up files in case of error try: os.remove(tmp_video_path) if 'audio_file' in locals() and audio_file: os.remove(audio_file) if 'wav_audio_file' in locals() and wav_audio_file: os.remove(wav_audio_file) except: pass # Check if results are stored in session state if 'video_transcription' in st.session_state and 'video_wav_audio_file' in st.session_state: st.subheader("📥 Download Results") col1, col2, col3 = st.columns(3) with col1: # Provide the audio file to the user for playback st.audio(st.session_state.video_wav_audio_file, format='audio/wav') with col2: # Downloadable transcription file st.download_button( label="📄 Download Transcription", data=st.session_state.video_transcription, file_name="video_transcription.txt", mime="text/plain" ) with col3: # Downloadable audio file st.download_button( label="🎵 Download Audio", data=st.session_state.video_wav_audio_file, file_name="extracted_audio.wav", mime="audio/wav" ) with tab2: st.header("Audio File Processing") # File uploader for audio uploaded_audio = st.file_uploader( "Upload Audio File", type=["wav", "mp3"], help="Maximum file size: 500MB" ) if uploaded_audio is not None: # Display file information file_size_mb = uploaded_audio.size / (1024 * 1024) st.info(f"📊 **File Info**: {uploaded_audio.name} ({file_size_mb:.1f} MB)") # Show audio player st.audio(uploaded_audio) # Save the uploaded audio file temporarily with tempfile.NamedTemporaryFile(delete=False) as tmp_audio: tmp_audio.write(uploaded_audio.read()) tmp_audio_path = tmp_audio.name # Add an "Analyze Audio" button if st.button("🔄 Analyze Audio", type="primary"): progress_bar = st.progress(0) status_text = st.empty() try: with st.spinner("Processing audio... Please wait."): status_text.text("Step 1/3: Converting audio format...") progress_bar.progress(20) # Convert audio to WAV if it's in MP3 format if uploaded_audio.type == "audio/mpeg": wav_audio_file = convert_mp3_to_wav(tmp_audio_path) else: wav_audio_file = tmp_audio_path if wav_audio_file is None: st.error("Failed to process audio file.") st.stop() status_text.text("Step 2/3: Transcribing audio to text...") progress_bar.progress(40) # Transcribe audio to text transcription = transcribe_audio(wav_audio_file) status_text.text("Step 3/3: Analyzing emotions...") progress_bar.progress(80) # Emotion detection emotions = detect_emotion(transcription) progress_bar.progress(100) status_text.text("✅ Processing complete!") # Display results st.success("Analysis completed successfully!") # Show the transcription st.subheader("📝 Transcription") st.text_area("", transcription, height=300, key="audio_transcription") # Show emotions st.subheader("😊 Emotion Analysis") col1, col2 = st.columns([1, 1]) with col1: st.write("**Detected Emotions:**") for emotion, score in emotions.items(): st.write(f"- **{emotion.title()}**: {score:.3f}") with col2: fig = plot_emotions(emotions) if fig: st.pyplot(fig) # Store results in session state st.session_state.audio_transcription = transcription st.session_state.audio_emotions = emotions # Store the audio file as a BytesIO object in memory with open(wav_audio_file, "rb") as f: audio_data = f.read() st.session_state.audio_wav_audio_file = io.BytesIO(audio_data) # Cleanup temporary audio file os.remove(tmp_audio_path) if wav_audio_file != tmp_audio_path: os.remove(wav_audio_file) except Exception as e: st.error(f"An error occurred during processing: {str(e)}") # Clean up files in case of error try: os.remove(tmp_audio_path) if 'wav_audio_file' in locals() and wav_audio_file and wav_audio_file != tmp_audio_path: os.remove(wav_audio_file) except: pass # Check if results are stored in session state if 'audio_transcription' in st.session_state and 'audio_wav_audio_file' in st.session_state: st.subheader("📥 Download Results") col1, col2 = st.columns(2) with col1: # Downloadable transcription file st.download_button( label="📄 Download Transcription", data=st.session_state.audio_transcription, file_name="audio_transcription.txt", mime="text/plain" ) with col2: # Downloadable audio file st.download_button( label="🎵 Download Processed Audio", data=st.session_state.audio_wav_audio_file, file_name="processed_audio.wav", mime="audio/wav" ) # Footer st.markdown("---") st.markdown("Built with ❤️ using Streamlit, MoviePy, and HuggingFace Transformers")