|
import streamlit as st |
|
import pandas as pd |
|
import numpy as np |
|
import os |
|
import time |
|
import matplotlib.pyplot as plt |
|
from datetime import datetime |
|
import tempfile |
|
import io |
|
import json |
|
from model.transcriber import transcribe_audio |
|
from predict import predict_emotion |
|
|
|
|
|
|
|
from st_audiorec import st_audiorec |
|
|
|
AUDIO_WAV = 'audio/wav' |
|
MAX_FILE_SIZE_MB = 10 |
|
|
|
st.set_page_config( |
|
page_title="Emotional Report Analyzer", |
|
page_icon="🎤", |
|
layout="wide" |
|
) |
|
|
|
|
|
if 'audio_data' not in st.session_state: |
|
st.session_state.audio_data = [] |
|
if 'current_audio_index' not in st.session_state: |
|
st.session_state.current_audio_index = -1 |
|
if 'audio_history_csv' not in st.session_state: |
|
|
|
st.session_state.audio_history_csv = pd.DataFrame( |
|
columns=['timestamp', 'file_path', 'transcription', 'emotion', 'probabilities'] |
|
) |
|
if 'needs_rerun' not in st.session_state: |
|
st.session_state.needs_rerun = False |
|
|
|
|
|
def update_audio_history(new_entry): |
|
|
|
st.session_state.audio_history_csv = pd.concat([st.session_state.audio_history_csv, pd.DataFrame([new_entry])], ignore_index=True) |
|
|
|
|
|
if len(st.session_state.audio_history_csv) > 10: |
|
st.session_state.audio_history_csv = st.session_state.audio_history_csv.iloc[-10:] |
|
|
|
|
|
st.session_state.audio_history_csv.to_csv('audio_history.csv', index=False) |
|
|
|
|
|
def process_audio(audio_path): |
|
try: |
|
|
|
transcription = transcribe_audio(audio_path) |
|
|
|
|
|
predicted_emotion, probabilities = predict_emotion(audio_path) |
|
|
|
|
|
new_entry = { |
|
'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
|
'file_path': audio_path, |
|
'transcription': transcription, |
|
'emotion': predicted_emotion, |
|
'probabilities': str(probabilities) |
|
} |
|
update_audio_history(new_entry) |
|
|
|
|
|
st.session_state.current_audio_index = len(st.session_state.audio_history_csv) - 1 |
|
|
|
return transcription, predicted_emotion, probabilities |
|
except Exception as e: |
|
st.error(f"Error processing audio: {str(e)}") |
|
return None, None, None |
|
|
|
|
|
def split_audio(audio_file, segment_length=10): |
|
|
|
|
|
st.warning("Audio splitting functionality is a placeholder. Implement with pydub or similar library.") |
|
|
|
return [audio_file] |
|
|
|
|
|
def display_emotion_chart(probabilities): |
|
emotions = list(probabilities.keys()) |
|
values = list(probabilities.values()) |
|
|
|
fig, ax = plt.subplots(figsize=(10, 5)) |
|
bars = ax.bar(emotions, values, color=['red', 'gray', 'green']) |
|
|
|
|
|
for bar in bars: |
|
height = bar.get_height() |
|
ax.text(bar.get_x() + bar.get_width()/2., height + 0.02, |
|
f'{height:.2f}', ha='center', va='bottom') |
|
|
|
ax.set_ylim(0, 1.1) |
|
ax.set_ylabel('Probability') |
|
ax.set_title('Emotion Prediction Results') |
|
|
|
st.pyplot(fig) |
|
|
|
|
|
if st.session_state.needs_rerun: |
|
st.session_state.needs_rerun = False |
|
st.rerun() |
|
|
|
col_logo, col_name = st.columns([3, 1]) |
|
col_logo.image("./img/logo_01.png", width=400) |
|
col_name.title("Emotional Report") |
|
|
|
|
|
col1, col2 = st.columns([1, 1]) |
|
|
|
with col1: |
|
st.header("Audio Input") |
|
|
|
|
|
|
|
tab1, tab2 = st.tabs(["Record Audio", "Upload Audio"]) |
|
|
|
with tab1: |
|
st.write("Record your audio (max 10 seconds):") |
|
|
|
|
|
wav_audio_data = st_audiorec() |
|
|
|
if wav_audio_data is not None: |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tmp_file: |
|
tmp_file.write(wav_audio_data) |
|
tmp_file_path = tmp_file.name |
|
|
|
st.success("Audio recorded successfully!") |
|
|
|
|
|
if st.button("Process Recorded Audio"): |
|
|
|
with st.spinner("Processing audio..."): |
|
transcription, emotion, probs = process_audio(tmp_file_path) |
|
|
|
if transcription is not None: |
|
st.success("Audio processed successfully!") |
|
st.session_state.needs_rerun = True |
|
|
|
with tab2: |
|
uploaded_file = st.file_uploader("Upload an audio file (WAV format)", type=['wav']) |
|
|
|
if uploaded_file is not None and uploaded_file.type == AUDIO_WAV and uploaded_file.size < MAX_FILE_SIZE_MB * 1_000_000: |
|
try: |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tmp_file: |
|
tmp_file.write(uploaded_file.getbuffer()) |
|
tmp_file_path = tmp_file.name |
|
except Exception as e: |
|
st.error(f"Error saving uploaded file: {str(e)}") |
|
st.error(f"Try to record your voice directly, maybe your storage is locked.") |
|
|
|
st.audio(uploaded_file, format="audio/wav") |
|
|
|
|
|
if st.button("Process Uploaded Audio"): |
|
|
|
with st.spinner("Processing audio..."): |
|
segments = split_audio(tmp_file_path) |
|
|
|
|
|
for i, segment_path in enumerate(segments): |
|
st.write(f"Processing segment {i+1}...") |
|
transcription, emotion, probs = process_audio(segment_path) |
|
|
|
|
|
st.success("Audio processed successfully!") |
|
st.session_state.needs_rerun = True |
|
|
|
st.header("Audio History and Analytics") |
|
|
|
if len(st.session_state.audio_history_csv) > 0: |
|
|
|
timestamps = st.session_state.audio_history_csv['timestamp'].tolist() |
|
selected_timestamp = st.selectbox( |
|
"Select audio from history:", |
|
options=timestamps, |
|
index=len(timestamps) - 1 |
|
) |
|
|
|
|
|
selected_index = st.session_state.audio_history_csv[ |
|
st.session_state.audio_history_csv['timestamp'] == selected_timestamp |
|
].index[0] |
|
|
|
|
|
if st.session_state.current_audio_index != selected_index: |
|
st.session_state.current_audio_index = selected_index |
|
st.session_state.needs_rerun = True |
|
|
|
|
|
if st.button("Run Analytics on Selected Audio"): |
|
st.subheader("Analytics Results") |
|
|
|
|
|
selected_data = st.session_state.audio_history_csv.iloc[selected_index] |
|
|
|
|
|
st.write(f"Selected Audio: {selected_data['timestamp']}") |
|
st.write(f"Emotion: {selected_data['emotion']}") |
|
st.write(f"File Path: {selected_data['file_path']}") |
|
|
|
|
|
|
|
|
|
try: |
|
if os.path.exists(selected_data['file_path']): |
|
st.audio(selected_data['file_path'], format="audio/wav") |
|
else: |
|
st.warning("Audio file not found - it may have been deleted or moved.") |
|
except Exception as e: |
|
st.error(f"Error playing audio: {str(e)}") |
|
else: |
|
st.info("No audio history available. Record or upload audio to create history.") |
|
|
|
with col2: |
|
st.header("Results") |
|
|
|
|
|
if st.session_state.current_audio_index >= 0 and len(st.session_state.audio_history_csv) > 0: |
|
current_data = st.session_state.audio_history_csv.iloc[st.session_state.current_audio_index] |
|
|
|
|
|
st.subheader("Transcription") |
|
st.text_area("", value=current_data['transcription'], height=100, key="transcription_area") |
|
|
|
|
|
st.subheader("Detected Emotion") |
|
st.info(f"🎭 Predicted emotion: **{current_data['emotion']}**") |
|
|
|
|
|
try: |
|
import ast |
|
probs = ast.literal_eval(current_data['probabilities']) |
|
display_emotion_chart(probs) |
|
except Exception as e: |
|
st.error(f"Error parsing probabilities: {str(e)}") |
|
st.write(f"Raw probabilities: {current_data['probabilities']}") |
|
else: |
|
st.info("Record or upload audio to see results") |
|
|
|
|
|
|
|
|
|
st.markdown("---") |
|
st.caption("Emotional Report Analyzer - Processes audio in 10-second segments and predicts emotions") |