|
import streamlit as st |
|
import moviepy.editor as mp |
|
import speech_recognition as sr |
|
from pydub import AudioSegment |
|
import tempfile |
|
import os |
|
import io |
|
from transformers import pipeline |
|
import matplotlib.pyplot as plt |
|
import gc |
|
import warnings |
|
warnings.filterwarnings("ignore") |
|
|
|
|
|
st.set_page_config( |
|
page_title="Video/Audio Transcription with Emotion Detection", |
|
page_icon="π¬", |
|
layout="wide" |
|
) |
|
|
|
|
|
|
|
@st.cache_data |
|
def get_config(): |
|
return {"maxUploadSize": 1024} |
|
|
|
|
|
def video_to_audio(video_file, progress_callback=None): |
|
"""Convert video to audio with memory optimization""" |
|
try: |
|
|
|
video = mp.VideoFileClip(video_file) |
|
|
|
|
|
audio = video.audio |
|
temp_audio_path = tempfile.mktemp(suffix=".mp3") |
|
|
|
|
|
if progress_callback: |
|
progress_callback(50) |
|
|
|
audio.write_audiofile(temp_audio_path, verbose=False, logger=None) |
|
|
|
|
|
audio.close() |
|
video.close() |
|
del video, audio |
|
gc.collect() |
|
|
|
if progress_callback: |
|
progress_callback(100) |
|
|
|
return temp_audio_path |
|
except Exception as e: |
|
st.error(f"Error converting video to audio: {str(e)}") |
|
return None |
|
|
|
|
|
def convert_mp3_to_wav(mp3_file): |
|
"""Convert MP3 to WAV with memory optimization""" |
|
try: |
|
|
|
audio = AudioSegment.from_mp3(mp3_file) |
|
|
|
|
|
temp_wav_path = tempfile.mktemp(suffix=".wav") |
|
|
|
|
|
audio.export(temp_wav_path, format="wav") |
|
|
|
|
|
del audio |
|
gc.collect() |
|
|
|
return temp_wav_path |
|
except Exception as e: |
|
st.error(f"Error converting MP3 to WAV: {str(e)}") |
|
return None |
|
|
|
|
|
def transcribe_audio(audio_file, chunk_duration=60): |
|
"""Transcribe audio to text with chunking for large files""" |
|
try: |
|
|
|
recognizer = sr.Recognizer() |
|
|
|
|
|
audio_segment = AudioSegment.from_wav(audio_file) |
|
duration = len(audio_segment) / 1000 |
|
|
|
transcriptions = [] |
|
|
|
|
|
if duration > chunk_duration: |
|
num_chunks = int(duration / chunk_duration) + 1 |
|
|
|
for i in range(num_chunks): |
|
start_time = i * chunk_duration * 1000 |
|
end_time = min((i + 1) * chunk_duration * 1000, len(audio_segment)) |
|
|
|
|
|
chunk = audio_segment[start_time:end_time] |
|
|
|
|
|
chunk_path = tempfile.mktemp(suffix=".wav") |
|
chunk.export(chunk_path, format="wav") |
|
|
|
|
|
try: |
|
with sr.AudioFile(chunk_path) as source: |
|
audio_data = recognizer.record(source) |
|
text = recognizer.recognize_google(audio_data) |
|
transcriptions.append(text) |
|
except (sr.UnknownValueError, sr.RequestError): |
|
transcriptions.append(f"[Chunk {i+1}: Audio could not be transcribed]") |
|
|
|
|
|
os.remove(chunk_path) |
|
|
|
|
|
progress = int(((i + 1) / num_chunks) * 100) |
|
st.progress(progress / 100, text=f"Transcribing... {progress}%") |
|
|
|
else: |
|
|
|
with sr.AudioFile(audio_file) as source: |
|
audio_data = recognizer.record(source) |
|
text = recognizer.recognize_google(audio_data) |
|
transcriptions.append(text) |
|
|
|
|
|
full_transcription = " ".join(transcriptions) |
|
|
|
|
|
del audio_segment |
|
gc.collect() |
|
|
|
return full_transcription |
|
|
|
except sr.UnknownValueError: |
|
return "Audio could not be understood." |
|
except sr.RequestError as e: |
|
return f"Could not request results from Google Speech Recognition service: {str(e)}" |
|
except Exception as e: |
|
return f"Error during transcription: {str(e)}" |
|
|
|
|
|
@st.cache_resource |
|
def load_emotion_model(): |
|
"""Load emotion detection model (cached)""" |
|
return pipeline("text-classification", |
|
model="j-hartmann/emotion-english-distilroberta-base", |
|
return_all_scores=True) |
|
|
|
def detect_emotion(text): |
|
"""Detect emotions in text""" |
|
try: |
|
emotion_pipeline = load_emotion_model() |
|
|
|
|
|
max_length = 500 |
|
if len(text) > max_length: |
|
chunks = [text[i:i+max_length] for i in range(0, len(text), max_length)] |
|
all_emotions = {} |
|
|
|
for chunk in chunks: |
|
result = emotion_pipeline(chunk) |
|
chunk_emotions = {emotion['label']: emotion['score'] for emotion in result[0]} |
|
|
|
|
|
for emotion, score in chunk_emotions.items(): |
|
if emotion in all_emotions: |
|
all_emotions[emotion] = (all_emotions[emotion] + score) / 2 |
|
else: |
|
all_emotions[emotion] = score |
|
|
|
return all_emotions |
|
else: |
|
result = emotion_pipeline(text) |
|
emotions = {emotion['label']: emotion['score'] for emotion in result[0]} |
|
return emotions |
|
|
|
except Exception as e: |
|
st.error(f"Error in emotion detection: {str(e)}") |
|
return {"error": "Could not analyze emotions"} |
|
|
|
|
|
def plot_emotions(emotions): |
|
"""Create a bar chart of emotions""" |
|
if "error" in emotions: |
|
return None |
|
|
|
fig, ax = plt.subplots(figsize=(10, 6)) |
|
emotions_sorted = dict(sorted(emotions.items(), key=lambda x: x[1], reverse=True)) |
|
|
|
colors = ['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4', '#FFEAA7', '#DDA0DD', '#98D8C8'] |
|
bars = ax.bar(emotions_sorted.keys(), emotions_sorted.values(), |
|
color=colors[:len(emotions_sorted)]) |
|
|
|
ax.set_xlabel('Emotions') |
|
ax.set_ylabel('Confidence Score') |
|
ax.set_title('Emotion Detection Results') |
|
ax.set_ylim(0, 1) |
|
|
|
|
|
for bar in bars: |
|
height = bar.get_height() |
|
ax.text(bar.get_x() + bar.get_width()/2., height + 0.01, |
|
f'{height:.3f}', ha='center', va='bottom') |
|
|
|
plt.xticks(rotation=45) |
|
plt.tight_layout() |
|
return fig |
|
|
|
|
|
st.title("π¬ Video and Audio Transcription with Emotion Detection") |
|
st.write("Upload video files up to 1GB or audio files for transcription and emotion analysis.") |
|
|
|
|
|
st.info("π **File Size Limits**: Video files up to 1GB, Audio files up to 500MB") |
|
|
|
|
|
with st.expander("π Instructions for Large Files"): |
|
st.write(""" |
|
**For optimal performance with large files:** |
|
1. Ensure stable internet connection |
|
2. Be patient - large files take time to process |
|
3. Don't close the browser tab during processing |
|
4. For very large files, consider splitting them beforehand |
|
|
|
**Supported formats:** |
|
- **Video**: MP4, MOV, AVI |
|
- **Audio**: WAV, MP3 |
|
""") |
|
|
|
|
|
tab1, tab2 = st.tabs(["πΉ Video Upload", "π΅ Audio Upload"]) |
|
|
|
with tab1: |
|
st.header("Video File Processing") |
|
|
|
|
|
uploaded_video = st.file_uploader( |
|
"Upload Video File", |
|
type=["mp4", "mov", "avi"], |
|
help="Maximum file size: 1GB" |
|
) |
|
|
|
if uploaded_video is not None: |
|
|
|
file_size_mb = uploaded_video.size / (1024 * 1024) |
|
st.info(f"π **File Info**: {uploaded_video.name} ({file_size_mb:.1f} MB)") |
|
|
|
|
|
if file_size_mb < 100: |
|
st.video(uploaded_video) |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_video: |
|
tmp_video.write(uploaded_video.read()) |
|
tmp_video_path = tmp_video.name |
|
|
|
|
|
if st.button("π Analyze Video", type="primary"): |
|
progress_bar = st.progress(0) |
|
status_text = st.empty() |
|
|
|
try: |
|
with st.spinner("Processing video... This may take several minutes for large files."): |
|
|
|
status_text.text("Step 1/4: Converting video to audio...") |
|
progress_bar.progress(10) |
|
|
|
|
|
audio_file = video_to_audio(tmp_video_path, |
|
lambda p: progress_bar.progress(10 + p * 0.3)) |
|
|
|
if audio_file is None: |
|
st.error("Failed to extract audio from video.") |
|
st.stop() |
|
|
|
status_text.text("Step 2/4: Converting audio format...") |
|
progress_bar.progress(50) |
|
|
|
|
|
wav_audio_file = convert_mp3_to_wav(audio_file) |
|
|
|
if wav_audio_file is None: |
|
st.error("Failed to convert audio format.") |
|
st.stop() |
|
|
|
status_text.text("Step 3/4: Transcribing audio to text...") |
|
progress_bar.progress(60) |
|
|
|
|
|
transcription = transcribe_audio(wav_audio_file) |
|
|
|
status_text.text("Step 4/4: Analyzing emotions...") |
|
progress_bar.progress(90) |
|
|
|
|
|
emotions = detect_emotion(transcription) |
|
|
|
progress_bar.progress(100) |
|
status_text.text("β
Processing complete!") |
|
|
|
|
|
st.success("Analysis completed successfully!") |
|
|
|
|
|
st.subheader("π Transcription") |
|
st.text_area("", transcription, height=300, key="video_transcription") |
|
|
|
|
|
st.subheader("π Emotion Analysis") |
|
col1, col2 = st.columns([1, 1]) |
|
|
|
with col1: |
|
st.write("**Detected Emotions:**") |
|
for emotion, score in emotions.items(): |
|
st.write(f"- **{emotion.title()}**: {score:.3f}") |
|
|
|
with col2: |
|
fig = plot_emotions(emotions) |
|
if fig: |
|
st.pyplot(fig) |
|
|
|
|
|
st.session_state.video_transcription = transcription |
|
st.session_state.video_emotions = emotions |
|
|
|
|
|
with open(wav_audio_file, "rb") as f: |
|
audio_data = f.read() |
|
st.session_state.video_wav_audio_file = io.BytesIO(audio_data) |
|
|
|
|
|
os.remove(tmp_video_path) |
|
os.remove(audio_file) |
|
os.remove(wav_audio_file) |
|
|
|
except Exception as e: |
|
st.error(f"An error occurred during processing: {str(e)}") |
|
|
|
try: |
|
os.remove(tmp_video_path) |
|
if 'audio_file' in locals() and audio_file: |
|
os.remove(audio_file) |
|
if 'wav_audio_file' in locals() and wav_audio_file: |
|
os.remove(wav_audio_file) |
|
except: |
|
pass |
|
|
|
|
|
if 'video_transcription' in st.session_state and 'video_wav_audio_file' in st.session_state: |
|
st.subheader("π₯ Download Results") |
|
|
|
col1, col2, col3 = st.columns(3) |
|
|
|
with col1: |
|
|
|
st.audio(st.session_state.video_wav_audio_file, format='audio/wav') |
|
|
|
with col2: |
|
|
|
st.download_button( |
|
label="π Download Transcription", |
|
data=st.session_state.video_transcription, |
|
file_name="video_transcription.txt", |
|
mime="text/plain" |
|
) |
|
|
|
with col3: |
|
|
|
st.download_button( |
|
label="π΅ Download Audio", |
|
data=st.session_state.video_wav_audio_file, |
|
file_name="extracted_audio.wav", |
|
mime="audio/wav" |
|
) |
|
|
|
with tab2: |
|
st.header("Audio File Processing") |
|
|
|
|
|
uploaded_audio = st.file_uploader( |
|
"Upload Audio File", |
|
type=["wav", "mp3"], |
|
help="Maximum file size: 500MB" |
|
) |
|
|
|
if uploaded_audio is not None: |
|
|
|
file_size_mb = uploaded_audio.size / (1024 * 1024) |
|
st.info(f"π **File Info**: {uploaded_audio.name} ({file_size_mb:.1f} MB)") |
|
|
|
|
|
st.audio(uploaded_audio) |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False) as tmp_audio: |
|
tmp_audio.write(uploaded_audio.read()) |
|
tmp_audio_path = tmp_audio.name |
|
|
|
|
|
if st.button("π Analyze Audio", type="primary"): |
|
progress_bar = st.progress(0) |
|
status_text = st.empty() |
|
|
|
try: |
|
with st.spinner("Processing audio... Please wait."): |
|
|
|
status_text.text("Step 1/3: Converting audio format...") |
|
progress_bar.progress(20) |
|
|
|
|
|
if uploaded_audio.type == "audio/mpeg": |
|
wav_audio_file = convert_mp3_to_wav(tmp_audio_path) |
|
else: |
|
wav_audio_file = tmp_audio_path |
|
|
|
if wav_audio_file is None: |
|
st.error("Failed to process audio file.") |
|
st.stop() |
|
|
|
status_text.text("Step 2/3: Transcribing audio to text...") |
|
progress_bar.progress(40) |
|
|
|
|
|
transcription = transcribe_audio(wav_audio_file) |
|
|
|
status_text.text("Step 3/3: Analyzing emotions...") |
|
progress_bar.progress(80) |
|
|
|
|
|
emotions = detect_emotion(transcription) |
|
|
|
progress_bar.progress(100) |
|
status_text.text("β
Processing complete!") |
|
|
|
|
|
st.success("Analysis completed successfully!") |
|
|
|
|
|
st.subheader("π Transcription") |
|
st.text_area("", transcription, height=300, key="audio_transcription") |
|
|
|
|
|
st.subheader("π Emotion Analysis") |
|
col1, col2 = st.columns([1, 1]) |
|
|
|
with col1: |
|
st.write("**Detected Emotions:**") |
|
for emotion, score in emotions.items(): |
|
st.write(f"- **{emotion.title()}**: {score:.3f}") |
|
|
|
with col2: |
|
fig = plot_emotions(emotions) |
|
if fig: |
|
st.pyplot(fig) |
|
|
|
|
|
st.session_state.audio_transcription = transcription |
|
st.session_state.audio_emotions = emotions |
|
|
|
|
|
with open(wav_audio_file, "rb") as f: |
|
audio_data = f.read() |
|
st.session_state.audio_wav_audio_file = io.BytesIO(audio_data) |
|
|
|
|
|
os.remove(tmp_audio_path) |
|
if wav_audio_file != tmp_audio_path: |
|
os.remove(wav_audio_file) |
|
|
|
except Exception as e: |
|
st.error(f"An error occurred during processing: {str(e)}") |
|
|
|
try: |
|
os.remove(tmp_audio_path) |
|
if 'wav_audio_file' in locals() and wav_audio_file and wav_audio_file != tmp_audio_path: |
|
os.remove(wav_audio_file) |
|
except: |
|
pass |
|
|
|
|
|
if 'audio_transcription' in st.session_state and 'audio_wav_audio_file' in st.session_state: |
|
st.subheader("π₯ Download Results") |
|
|
|
col1, col2 = st.columns(2) |
|
|
|
with col1: |
|
|
|
st.download_button( |
|
label="π Download Transcription", |
|
data=st.session_state.audio_transcription, |
|
file_name="audio_transcription.txt", |
|
mime="text/plain" |
|
) |
|
|
|
with col2: |
|
|
|
st.download_button( |
|
label="π΅ Download Processed Audio", |
|
data=st.session_state.audio_wav_audio_file, |
|
file_name="processed_audio.wav", |
|
mime="audio/wav" |
|
) |
|
|
|
|
|
st.markdown("---") |
|
st.markdown("Built with β€οΈ using Streamlit, MoviePy, and HuggingFace Transformers") |