|
import streamlit as st |
|
import moviepy.editor as mp |
|
import speech_recognition as sr |
|
from pydub import AudioSegment |
|
import tempfile |
|
import os |
|
import io |
|
from transformers import pipeline |
|
import matplotlib.pyplot as plt |
|
import librosa |
|
import numpy as np |
|
|
|
|
|
def video_to_audio(video_file): |
|
video = mp.VideoFileClip(video_file) |
|
audio = video.audio |
|
temp_audio_path = tempfile.mktemp(suffix=".mp3") |
|
audio.write_audiofile(temp_audio_path) |
|
return temp_audio_path |
|
|
|
|
|
def convert_mp3_to_wav(mp3_file): |
|
audio = AudioSegment.from_mp3(mp3_file) |
|
temp_wav_path = tempfile.mktemp(suffix=".wav") |
|
audio.export(temp_wav_path, format="wav") |
|
return temp_wav_path |
|
|
|
|
|
def transcribe_audio(audio_file): |
|
audio = AudioSegment.from_wav(audio_file) |
|
duration = len(audio) / 1000 |
|
chunk_length = 60 |
|
recognizer = sr.Recognizer() |
|
|
|
if duration <= chunk_length: |
|
with sr.AudioFile(audio_file) as source: |
|
audio_data = recognizer.record(source) |
|
try: |
|
text = recognizer.recognize_google(audio_data) |
|
return text |
|
except sr.UnknownValueError: |
|
return "Audio could not be understood." |
|
except sr.RequestError: |
|
return "Could not request results from Google Speech Recognition service." |
|
else: |
|
num_chunks = int(duration // chunk_length) + 1 |
|
transcriptions = [] |
|
for i in range(num_chunks): |
|
start_time = i * chunk_length * 1000 |
|
end_time = min((i + 1) * chunk_length * 1000, len(audio)) |
|
chunk = audio[start_time:end_time] |
|
frame_data = chunk.raw_data |
|
sample_rate = audio.frame_rate |
|
sample_width = audio.sample_width |
|
audio_data = sr.AudioData(frame_data, sample_rate, sample_width) |
|
try: |
|
text = recognizer.recognize_google(audio_data) |
|
transcriptions.append(text) |
|
except sr.UnknownValueError: |
|
transcriptions.append("[Audio could not be understood.]") |
|
except sr.RequestError: |
|
transcriptions.append("[Could not request results.]") |
|
return " ".join(transcriptions) |
|
|
|
|
|
def detect_emotion(text): |
|
emotion_pipeline = pipeline("text-classification", model="j-hartmann/emotion-english-distilroberta-base", return_all_scores=True) |
|
result = emotion_pipeline(text) |
|
emotions = {emotion['label']: emotion['score'] for emotion in result[0]} |
|
return emotions |
|
|
|
|
|
def plot_waveform(audio_data, duration=10): |
|
audio_data.seek(0) |
|
y, sr = librosa.load(audio_data, sr=None, duration=duration) |
|
plt.figure(figsize=(10, 4)) |
|
time = np.linspace(0, len(y)/sr, len(y)) |
|
plt.plot(time, y) |
|
plt.title(f"Audio Waveform (first {duration} seconds)") |
|
plt.xlabel("Time (s)") |
|
plt.ylabel("Amplitude") |
|
st.pyplot(plt) |
|
|
|
|
|
st.title("Video and Audio to Text Transcription with Emotion Detection and Visualization") |
|
st.write("Upload a video or audio file to transcribe it, detect emotions, and visualize the audio waveform.") |
|
st.write("**Note:** To upload files up to 1GB, run the app with: `streamlit run app.py --server.maxUploadSize=1024`") |
|
|
|
tab = st.selectbox("Select file type", ["Video", "Audio"]) |
|
|
|
if tab == "Video": |
|
uploaded_video = st.file_uploader("Upload Video", type=["mp4", "mov", "avi"]) |
|
if uploaded_video: |
|
with tempfile.NamedTemporaryFile(delete=False) as tmp_video: |
|
tmp_video.write(uploaded_video.read()) |
|
tmp_video_path = tmp_video.name |
|
if st.button("Analyze Video"): |
|
with st.spinner("Processing video..."): |
|
audio_file = video_to_audio(tmp_video_path) |
|
wav_audio_file = convert_mp3_to_wav(audio_file) |
|
transcription = transcribe_audio(wav_audio_file) |
|
st.text_area("Transcription", transcription, height=300) |
|
emotions = detect_emotion(transcription) |
|
st.write(f"Detected Emotions: {emotions}") |
|
with open(wav_audio_file, "rb") as f: |
|
audio_data = io.BytesIO(f.read()) |
|
st.session_state.wav_audio_file = audio_data |
|
plot_waveform(st.session_state.wav_audio_file) |
|
os.remove(tmp_video_path) |
|
os.remove(audio_file) |
|
os.remove(wav_audio_file) |
|
if 'wav_audio_file' in st.session_state: |
|
st.audio(st.session_state.wav_audio_file, format='audio/wav') |
|
st.download_button("Download Transcription", st.session_state.transcription, "transcription.txt", "text/plain") |
|
st.download_button("Download Audio", st.session_state.wav_audio_file, "converted_audio.wav", "audio/wav") |
|
|
|
elif tab == "Audio": |
|
uploaded_audio = st.file_uploader("Upload Audio", type=["wav", "mp3"]) |
|
if uploaded_audio: |
|
with tempfile.NamedTemporaryFile(delete=False) as tmp_audio: |
|
tmp_audio.write(uploaded_audio.read()) |
|
tmp_audio_path = tmp_audio.name |
|
if st.button("Analyze Audio"): |
|
with st.spinner("Processing audio..."): |
|
wav_audio_file = convert_mp3_to_wav(tmp_audio_path) if uploaded_audio.type == "audio/mpeg" else tmp_audio_path |
|
transcription = transcribe_audio(wav_audio_file) |
|
st.text_area("Transcription", transcription, height=300) |
|
emotions = detect_emotion(transcription) |
|
st.write(f"Detected Emotions: {emotions}") |
|
with open(wav_audio_file, "rb") as f: |
|
audio_data = io.BytesIO(f.read()) |
|
st.session_state.wav_audio_file_audio = audio_data |
|
plot_waveform(st.session_state.wav_audio_file_audio) |
|
if uploaded_audio.type == "audio/mpeg": |
|
os.remove(wav_audio_file) |
|
os.remove(tmp_audio_path) |
|
if 'wav_audio_file_audio' in st.session_state: |
|
st.audio(st.session_state.wav_audio_file_audio, format='audio/wav') |
|
st.download_button("Download Transcription", st.session_state.transcription_audio, "transcription_audio.txt", "text/plain") |
|
st.download_button("Download Audio", st.session_state.wav_audio_file_audio, "converted_audio_audio.wav", "audio/wav") |