shukdevdatta123's picture
Update app.py
972a238 verified
raw
history blame
6.49 kB
import streamlit as st
import moviepy.editor as mp
import speech_recognition as sr
from pydub import AudioSegment
import tempfile
import os
import io
from transformers import pipeline
import matplotlib.pyplot as plt
import librosa
import numpy as np
# Function to convert video to audio
def video_to_audio(video_file):
video = mp.VideoFileClip(video_file)
audio = video.audio
temp_audio_path = tempfile.mktemp(suffix=".mp3")
audio.write_audiofile(temp_audio_path)
return temp_audio_path
# Function to convert MP3 to WAV
def convert_mp3_to_wav(mp3_file):
audio = AudioSegment.from_mp3(mp3_file)
temp_wav_path = tempfile.mktemp(suffix=".wav")
audio.export(temp_wav_path, format="wav")
return temp_wav_path
# Function to transcribe audio with chunking for large files
def transcribe_audio(audio_file):
audio = AudioSegment.from_wav(audio_file)
duration = len(audio) / 1000 # Duration in seconds
chunk_length = 60 # 60-second chunks
recognizer = sr.Recognizer()
if duration <= chunk_length:
with sr.AudioFile(audio_file) as source:
audio_data = recognizer.record(source)
try:
text = recognizer.recognize_google(audio_data)
return text
except sr.UnknownValueError:
return "Audio could not be understood."
except sr.RequestError:
return "Could not request results from Google Speech Recognition service."
else:
num_chunks = int(duration // chunk_length) + 1
transcriptions = []
for i in range(num_chunks):
start_time = i * chunk_length * 1000 # in milliseconds
end_time = min((i + 1) * chunk_length * 1000, len(audio))
chunk = audio[start_time:end_time]
frame_data = chunk.raw_data
sample_rate = audio.frame_rate
sample_width = audio.sample_width
audio_data = sr.AudioData(frame_data, sample_rate, sample_width)
try:
text = recognizer.recognize_google(audio_data)
transcriptions.append(text)
except sr.UnknownValueError:
transcriptions.append("[Audio could not be understood.]")
except sr.RequestError:
transcriptions.append("[Could not request results.]")
return " ".join(transcriptions)
# Function to detect emotions
def detect_emotion(text):
emotion_pipeline = pipeline("text-classification", model="j-hartmann/emotion-english-distilroberta-base", return_all_scores=True)
result = emotion_pipeline(text)
emotions = {emotion['label']: emotion['score'] for emotion in result[0]}
return emotions
# Function to plot audio waveform
def plot_waveform(audio_data, duration=10):
audio_data.seek(0)
y, sr = librosa.load(audio_data, sr=None, duration=duration)
plt.figure(figsize=(10, 4))
time = np.linspace(0, len(y)/sr, len(y))
plt.plot(time, y)
plt.title(f"Audio Waveform (first {duration} seconds)")
plt.xlabel("Time (s)")
plt.ylabel("Amplitude")
st.pyplot(plt)
# Streamlit app layout
st.title("Video and Audio to Text Transcription with Emotion Detection and Visualization")
st.write("Upload a video or audio file to transcribe it, detect emotions, and visualize the audio waveform.")
st.write("**Note:** To upload files up to 1GB, run the app with: `streamlit run app.py --server.maxUploadSize=1024`")
tab = st.selectbox("Select file type", ["Video", "Audio"])
if tab == "Video":
uploaded_video = st.file_uploader("Upload Video", type=["mp4", "mov", "avi"])
if uploaded_video:
with tempfile.NamedTemporaryFile(delete=False) as tmp_video:
tmp_video.write(uploaded_video.read())
tmp_video_path = tmp_video.name
if st.button("Analyze Video"):
with st.spinner("Processing video..."):
audio_file = video_to_audio(tmp_video_path)
wav_audio_file = convert_mp3_to_wav(audio_file)
transcription = transcribe_audio(wav_audio_file)
st.text_area("Transcription", transcription, height=300)
emotions = detect_emotion(transcription)
st.write(f"Detected Emotions: {emotions}")
with open(wav_audio_file, "rb") as f:
audio_data = io.BytesIO(f.read())
st.session_state.wav_audio_file = audio_data
plot_waveform(st.session_state.wav_audio_file)
os.remove(tmp_video_path)
os.remove(audio_file)
os.remove(wav_audio_file)
if 'wav_audio_file' in st.session_state:
st.audio(st.session_state.wav_audio_file, format='audio/wav')
st.download_button("Download Transcription", st.session_state.transcription, "transcription.txt", "text/plain")
st.download_button("Download Audio", st.session_state.wav_audio_file, "converted_audio.wav", "audio/wav")
elif tab == "Audio":
uploaded_audio = st.file_uploader("Upload Audio", type=["wav", "mp3"])
if uploaded_audio:
with tempfile.NamedTemporaryFile(delete=False) as tmp_audio:
tmp_audio.write(uploaded_audio.read())
tmp_audio_path = tmp_audio.name
if st.button("Analyze Audio"):
with st.spinner("Processing audio..."):
wav_audio_file = convert_mp3_to_wav(tmp_audio_path) if uploaded_audio.type == "audio/mpeg" else tmp_audio_path
transcription = transcribe_audio(wav_audio_file)
st.text_area("Transcription", transcription, height=300)
emotions = detect_emotion(transcription)
st.write(f"Detected Emotions: {emotions}")
with open(wav_audio_file, "rb") as f:
audio_data = io.BytesIO(f.read())
st.session_state.wav_audio_file_audio = audio_data
plot_waveform(st.session_state.wav_audio_file_audio)
if uploaded_audio.type == "audio/mpeg":
os.remove(wav_audio_file)
os.remove(tmp_audio_path)
if 'wav_audio_file_audio' in st.session_state:
st.audio(st.session_state.wav_audio_file_audio, format='audio/wav')
st.download_button("Download Transcription", st.session_state.transcription_audio, "transcription_audio.txt", "text/plain")
st.download_button("Download Audio", st.session_state.wav_audio_file_audio, "converted_audio_audio.wav", "audio/wav")