shukdevdatta123's picture
Update app.py
ee377d8 verified
raw
history blame
20 kB
import streamlit as st
import moviepy.editor as mp
import speech_recognition as sr
from pydub import AudioSegment
import tempfile
import os
import io
from transformers import pipeline
import matplotlib.pyplot as plt
import gc
import warnings
warnings.filterwarnings("ignore")
# Configure Streamlit for large file uploads
st.set_page_config(
page_title="Video/Audio Transcription with Emotion Detection",
page_icon="🎬",
layout="wide"
)
# Set maximum upload size (this needs to be set before any file upload widgets)
# Note: You'll also need to configure this in your Streamlit config file or environment
@st.cache_data
def get_config():
return {"maxUploadSize": 1024} # 1GB in MB
# Function to convert video to audio with progress tracking
def video_to_audio(video_file, progress_callback=None):
"""Convert video to audio with memory optimization"""
try:
# Load the video using moviepy with memory optimization
video = mp.VideoFileClip(video_file)
# Extract audio
audio = video.audio
temp_audio_path = tempfile.mktemp(suffix=".mp3")
# Write the audio to a file with progress tracking
if progress_callback:
progress_callback(50) # 50% progress
audio.write_audiofile(temp_audio_path, verbose=False, logger=None)
# Clean up video object to free memory
audio.close()
video.close()
del video, audio
gc.collect()
if progress_callback:
progress_callback(100) # 100% progress
return temp_audio_path
except Exception as e:
st.error(f"Error converting video to audio: {str(e)}")
return None
# Function to convert MP3 audio to WAV
def convert_mp3_to_wav(mp3_file):
"""Convert MP3 to WAV with memory optimization"""
try:
# Load the MP3 file using pydub
audio = AudioSegment.from_mp3(mp3_file)
# Create a temporary WAV file
temp_wav_path = tempfile.mktemp(suffix=".wav")
# Export the audio to the temporary WAV file
audio.export(temp_wav_path, format="wav")
# Clean up to free memory
del audio
gc.collect()
return temp_wav_path
except Exception as e:
st.error(f"Error converting MP3 to WAV: {str(e)}")
return None
# Function to transcribe audio to text with chunking for large files
def transcribe_audio(audio_file, chunk_duration=60):
"""Transcribe audio to text with chunking for large files"""
try:
# Initialize recognizer
recognizer = sr.Recognizer()
# Load audio and get duration
audio_segment = AudioSegment.from_wav(audio_file)
duration = len(audio_segment) / 1000 # Duration in seconds
transcriptions = []
# If audio is longer than chunk_duration, split it
if duration > chunk_duration:
num_chunks = int(duration / chunk_duration) + 1
for i in range(num_chunks):
start_time = i * chunk_duration * 1000 # Convert to milliseconds
end_time = min((i + 1) * chunk_duration * 1000, len(audio_segment))
# Extract chunk
chunk = audio_segment[start_time:end_time]
# Save chunk temporarily
chunk_path = tempfile.mktemp(suffix=".wav")
chunk.export(chunk_path, format="wav")
# Transcribe chunk
try:
with sr.AudioFile(chunk_path) as source:
audio_data = recognizer.record(source)
text = recognizer.recognize_google(audio_data)
transcriptions.append(text)
except (sr.UnknownValueError, sr.RequestError):
transcriptions.append(f"[Chunk {i+1}: Audio could not be transcribed]")
# Clean up chunk file
os.remove(chunk_path)
# Update progress
progress = int(((i + 1) / num_chunks) * 100)
st.progress(progress / 100, text=f"Transcribing... {progress}%")
else:
# For shorter audio, transcribe directly
with sr.AudioFile(audio_file) as source:
audio_data = recognizer.record(source)
text = recognizer.recognize_google(audio_data)
transcriptions.append(text)
# Join all transcriptions
full_transcription = " ".join(transcriptions)
# Clean up
del audio_segment
gc.collect()
return full_transcription
except sr.UnknownValueError:
return "Audio could not be understood."
except sr.RequestError as e:
return f"Could not request results from Google Speech Recognition service: {str(e)}"
except Exception as e:
return f"Error during transcription: {str(e)}"
# Function to perform emotion detection using Hugging Face transformers
@st.cache_resource
def load_emotion_model():
"""Load emotion detection model (cached)"""
return pipeline("text-classification",
model="j-hartmann/emotion-english-distilroberta-base",
return_all_scores=True)
def detect_emotion(text):
"""Detect emotions in text"""
try:
emotion_pipeline = load_emotion_model()
# Split text into chunks if it's too long (model has token limits)
max_length = 500
if len(text) > max_length:
chunks = [text[i:i+max_length] for i in range(0, len(text), max_length)]
all_emotions = {}
for chunk in chunks:
result = emotion_pipeline(chunk)
chunk_emotions = {emotion['label']: emotion['score'] for emotion in result[0]}
# Aggregate emotions
for emotion, score in chunk_emotions.items():
if emotion in all_emotions:
all_emotions[emotion] = (all_emotions[emotion] + score) / 2
else:
all_emotions[emotion] = score
return all_emotions
else:
result = emotion_pipeline(text)
emotions = {emotion['label']: emotion['score'] for emotion in result[0]}
return emotions
except Exception as e:
st.error(f"Error in emotion detection: {str(e)}")
return {"error": "Could not analyze emotions"}
# Function to visualize emotions
def plot_emotions(emotions):
"""Create a bar chart of emotions"""
if "error" in emotions:
return None
fig, ax = plt.subplots(figsize=(10, 6))
emotions_sorted = dict(sorted(emotions.items(), key=lambda x: x[1], reverse=True))
colors = ['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4', '#FFEAA7', '#DDA0DD', '#98D8C8']
bars = ax.bar(emotions_sorted.keys(), emotions_sorted.values(),
color=colors[:len(emotions_sorted)])
ax.set_xlabel('Emotions')
ax.set_ylabel('Confidence Score')
ax.set_title('Emotion Detection Results')
ax.set_ylim(0, 1)
# Add value labels on bars
for bar in bars:
height = bar.get_height()
ax.text(bar.get_x() + bar.get_width()/2., height + 0.01,
f'{height:.3f}', ha='center', va='bottom')
plt.xticks(rotation=45)
plt.tight_layout()
return fig
# Streamlit app layout
st.title("🎬 Video and Audio Transcription with Emotion Detection")
st.write("Upload video files up to 1GB or audio files for transcription and emotion analysis.")
# Display file size information
st.info("πŸ“ **File Size Limits**: Video files up to 1GB, Audio files up to 500MB")
# Add instructions for large file uploads
with st.expander("πŸ“‹ Instructions for Large Files"):
st.write("""
**For optimal performance with large files:**
1. Ensure stable internet connection
2. Be patient - large files take time to process
3. Don't close the browser tab during processing
4. For very large files, consider splitting them beforehand
**Supported formats:**
- **Video**: MP4, MOV, AVI
- **Audio**: WAV, MP3
""")
# Create tabs to separate video and audio uploads
tab1, tab2 = st.tabs(["πŸ“Ή Video Upload", "🎡 Audio Upload"])
with tab1:
st.header("Video File Processing")
# File uploader for video with increased size limit
uploaded_video = st.file_uploader(
"Upload Video File",
type=["mp4", "mov", "avi"],
help="Maximum file size: 1GB"
)
if uploaded_video is not None:
# Display file information
file_size_mb = uploaded_video.size / (1024 * 1024)
st.info(f"πŸ“Š **File Info**: {uploaded_video.name} ({file_size_mb:.1f} MB)")
# Show video preview for smaller files
if file_size_mb < 100: # Only show preview for files under 100MB
st.video(uploaded_video)
# Save the uploaded video file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_video:
tmp_video.write(uploaded_video.read())
tmp_video_path = tmp_video.name
# Add an "Analyze Video" button
if st.button("πŸ”„ Analyze Video", type="primary"):
progress_bar = st.progress(0)
status_text = st.empty()
try:
with st.spinner("Processing video... This may take several minutes for large files."):
status_text.text("Step 1/4: Converting video to audio...")
progress_bar.progress(10)
# Convert video to audio
audio_file = video_to_audio(tmp_video_path,
lambda p: progress_bar.progress(10 + p * 0.3))
if audio_file is None:
st.error("Failed to extract audio from video.")
st.stop()
status_text.text("Step 2/4: Converting audio format...")
progress_bar.progress(50)
# Convert the extracted MP3 audio to WAV
wav_audio_file = convert_mp3_to_wav(audio_file)
if wav_audio_file is None:
st.error("Failed to convert audio format.")
st.stop()
status_text.text("Step 3/4: Transcribing audio to text...")
progress_bar.progress(60)
# Transcribe audio to text
transcription = transcribe_audio(wav_audio_file)
status_text.text("Step 4/4: Analyzing emotions...")
progress_bar.progress(90)
# Emotion detection
emotions = detect_emotion(transcription)
progress_bar.progress(100)
status_text.text("βœ… Processing complete!")
# Display results
st.success("Analysis completed successfully!")
# Show the transcription
st.subheader("πŸ“ Transcription")
st.text_area("", transcription, height=300, key="video_transcription")
# Show emotions
st.subheader("😊 Emotion Analysis")
col1, col2 = st.columns([1, 1])
with col1:
st.write("**Detected Emotions:**")
for emotion, score in emotions.items():
st.write(f"- **{emotion.title()}**: {score:.3f}")
with col2:
fig = plot_emotions(emotions)
if fig:
st.pyplot(fig)
# Store results in session state
st.session_state.video_transcription = transcription
st.session_state.video_emotions = emotions
# Store the audio file as a BytesIO object in memory
with open(wav_audio_file, "rb") as f:
audio_data = f.read()
st.session_state.video_wav_audio_file = io.BytesIO(audio_data)
# Cleanup temporary files
os.remove(tmp_video_path)
os.remove(audio_file)
os.remove(wav_audio_file)
except Exception as e:
st.error(f"An error occurred during processing: {str(e)}")
# Clean up files in case of error
try:
os.remove(tmp_video_path)
if 'audio_file' in locals() and audio_file:
os.remove(audio_file)
if 'wav_audio_file' in locals() and wav_audio_file:
os.remove(wav_audio_file)
except:
pass
# Check if results are stored in session state
if 'video_transcription' in st.session_state and 'video_wav_audio_file' in st.session_state:
st.subheader("πŸ“₯ Download Results")
col1, col2, col3 = st.columns(3)
with col1:
# Provide the audio file to the user for playback
st.audio(st.session_state.video_wav_audio_file, format='audio/wav')
with col2:
# Downloadable transcription file
st.download_button(
label="πŸ“„ Download Transcription",
data=st.session_state.video_transcription,
file_name="video_transcription.txt",
mime="text/plain"
)
with col3:
# Downloadable audio file
st.download_button(
label="🎡 Download Audio",
data=st.session_state.video_wav_audio_file,
file_name="extracted_audio.wav",
mime="audio/wav"
)
with tab2:
st.header("Audio File Processing")
# File uploader for audio
uploaded_audio = st.file_uploader(
"Upload Audio File",
type=["wav", "mp3"],
help="Maximum file size: 500MB"
)
if uploaded_audio is not None:
# Display file information
file_size_mb = uploaded_audio.size / (1024 * 1024)
st.info(f"πŸ“Š **File Info**: {uploaded_audio.name} ({file_size_mb:.1f} MB)")
# Show audio player
st.audio(uploaded_audio)
# Save the uploaded audio file temporarily
with tempfile.NamedTemporaryFile(delete=False) as tmp_audio:
tmp_audio.write(uploaded_audio.read())
tmp_audio_path = tmp_audio.name
# Add an "Analyze Audio" button
if st.button("πŸ”„ Analyze Audio", type="primary"):
progress_bar = st.progress(0)
status_text = st.empty()
try:
with st.spinner("Processing audio... Please wait."):
status_text.text("Step 1/3: Converting audio format...")
progress_bar.progress(20)
# Convert audio to WAV if it's in MP3 format
if uploaded_audio.type == "audio/mpeg":
wav_audio_file = convert_mp3_to_wav(tmp_audio_path)
else:
wav_audio_file = tmp_audio_path
if wav_audio_file is None:
st.error("Failed to process audio file.")
st.stop()
status_text.text("Step 2/3: Transcribing audio to text...")
progress_bar.progress(40)
# Transcribe audio to text
transcription = transcribe_audio(wav_audio_file)
status_text.text("Step 3/3: Analyzing emotions...")
progress_bar.progress(80)
# Emotion detection
emotions = detect_emotion(transcription)
progress_bar.progress(100)
status_text.text("βœ… Processing complete!")
# Display results
st.success("Analysis completed successfully!")
# Show the transcription
st.subheader("πŸ“ Transcription")
st.text_area("", transcription, height=300, key="audio_transcription")
# Show emotions
st.subheader("😊 Emotion Analysis")
col1, col2 = st.columns([1, 1])
with col1:
st.write("**Detected Emotions:**")
for emotion, score in emotions.items():
st.write(f"- **{emotion.title()}**: {score:.3f}")
with col2:
fig = plot_emotions(emotions)
if fig:
st.pyplot(fig)
# Store results in session state
st.session_state.audio_transcription = transcription
st.session_state.audio_emotions = emotions
# Store the audio file as a BytesIO object in memory
with open(wav_audio_file, "rb") as f:
audio_data = f.read()
st.session_state.audio_wav_audio_file = io.BytesIO(audio_data)
# Cleanup temporary audio file
os.remove(tmp_audio_path)
if wav_audio_file != tmp_audio_path:
os.remove(wav_audio_file)
except Exception as e:
st.error(f"An error occurred during processing: {str(e)}")
# Clean up files in case of error
try:
os.remove(tmp_audio_path)
if 'wav_audio_file' in locals() and wav_audio_file and wav_audio_file != tmp_audio_path:
os.remove(wav_audio_file)
except:
pass
# Check if results are stored in session state
if 'audio_transcription' in st.session_state and 'audio_wav_audio_file' in st.session_state:
st.subheader("πŸ“₯ Download Results")
col1, col2 = st.columns(2)
with col1:
# Downloadable transcription file
st.download_button(
label="πŸ“„ Download Transcription",
data=st.session_state.audio_transcription,
file_name="audio_transcription.txt",
mime="text/plain"
)
with col2:
# Downloadable audio file
st.download_button(
label="🎡 Download Processed Audio",
data=st.session_state.audio_wav_audio_file,
file_name="processed_audio.wav",
mime="audio/wav"
)
# Footer
st.markdown("---")
st.markdown("Built with ❀️ using Streamlit, MoviePy, and HuggingFace Transformers")