Spaces:
Sleeping
Sleeping
################################ | |
### Real time prediction for real time record | |
############################### | |
import streamlit as st | |
import pyaudio | |
import wave | |
import torch | |
from transformers import Wav2Vec2ForSequenceClassification, Wav2Vec2Processor | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import time | |
# Paramètres audio | |
CHUNK = 1024 | |
FORMAT = pyaudio.paInt16 | |
CHANNELS = 1 | |
RATE = 16000 | |
# Interface Streamlit | |
st.title("Détection des émotions en temps réel") | |
# Boutons pour démarrer et arrêter l'enregistrement | |
start_button = st.button("Démarrer l'enregistrement") | |
stop_button = st.button("Arrêter l'enregistrement") | |
# Zone de visualisation des émotions en temps réel | |
emotion_placeholder = st.empty() | |
final_emotion_placeholder = st.empty() | |
if start_button: | |
st.write("Enregistrement en cours...") | |
audio = pyaudio.PyAudio() | |
stream = audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) | |
frames = [] | |
real_time_emotions = [] | |
while not stop_button: | |
data = stream.read(CHUNK) | |
frames.append(data) | |
# Traitement en temps réel (par tranche de 1 seconde) | |
if len(frames) >= RATE // CHUNK: | |
audio_segment = np.frombuffer(b''.join(frames[-(RATE // CHUNK):]), dtype=np.int16) | |
emotion = predict_emotion(audio_segment, output_probs=False, sampling_rate=RATE) | |
real_time_emotions.append(emotion) | |
emotion_placeholder.line_chart(real_time_emotions) # Affichage graphique des émotions | |
# Arrêt de l'enregistrement | |
stream.stop_stream() | |
stream.close() | |
audio.terminate() | |
# Sauvegarde de l'audio enregistré | |
wf = wave.open("output.wav", "wb") | |
wf.setnchannels(CHANNELS) | |
wf.setsampwidth(audio.get_sample_size(FORMAT)) | |
wf.setframerate(RATE) | |
wf.writeframes(b"".join(frames)) | |
wf.close() | |
# Prédiction finale sur tout l'audio enregistré | |
full_audio_data = np.frombuffer(b''.join(frames), dtype=np.int16) | |
final_emotion = predict_emotion(full_audio_data) | |
final_emotion_placeholder.write(f"Émotion finale prédite : {final_emotion}") | |
################################ | |
### Real time prediction for uploaded audio file | |
############################### | |
# Charger le modèle wav2vec et le processeur | |
# # Configuration Streamlit | |
# st.title("Analyse des émotions en temps réel") | |
# uploaded_file = st.file_uploader("Choisissez un fichier audio", type=["wav", "mp3"]) | |
# if uploaded_file is not None: | |
# # Charger et rééchantillonner l'audio | |
# audio, sr = librosa.load(uploaded_file, sr=16000) | |
# # Paramètres de la fenêtre glissante | |
# window_size = 1 # en secondes | |
# hop_length = 0.5 # en secondes | |
# # Créer un graphique en temps réel | |
# fig, ax = plt.subplots() | |
# lines = [ax.plot([], [], label=emotion)[0] for emotion in emotions] | |
# ax.set_ylim(0, 1) | |
# ax.set_xlim(0, len(audio) / sr) | |
# ax.set_xlabel("Temps (s)") | |
# ax.set_ylabel("Probabilité") | |
# ax.legend() | |
# chart = st.pyplot(fig) | |
# # Traitement par fenêtre glissante | |
# for i in range(0, len(audio), int(hop_length * sr)): | |
# chunk = audio[i:i + int(window_size * sr)] | |
# if len(chunk) < int(window_size * sr): | |
# break | |
# emotion_scores = predict_emotion(chunk, output_probs=False, sampling_rate=RATE) | |
# # Mettre à jour le graphique | |
# for emotion, line in zip(emotions, lines): | |
# xdata = line.get_xdata().tolist() | |
# ydata = line.get_ydata().tolist() | |
# xdata.append(i / sr) | |
# ydata.append(emotion_scores[emotion]) | |
# line.set_data(xdata, ydata) | |
# ax.relim() | |
# ax.autoscale_view() | |
# chart.pyplot(fig) | |
# st.success("Analyse terminée !") | |
############################################ | |
### Progress bar | |
############################################ | |
with st.status("Downloading data...", expanded=True) as status: | |
st.write("Searching for data...") | |
time.sleep(2) | |
st.write("Found URL.") | |
time.sleep(1) | |
st.write("Downloading data...") | |
time.sleep(1) | |
status.update( | |
label="Download complete!", state="complete", expanded=False | |
) | |
st.button("Rerun") | |
############################################ | |
### Time duration estimation | |
############################################ | |
progress_bar = st.progress(0) | |
time_placeholder = st.empty() | |
total_time = 10 # Total estimated time in seconds | |
for i in range(total_time): | |
# Update progress bar | |
progress_bar.progress((i + 1) / total_time) | |
# Update time estimation | |
remaining_time = total_time - i - 1 | |
time_placeholder.text(f"Estimated time remaining: {remaining_time} seconds") | |
# Simulate task progress | |
time.sleep(1) | |
############################################ | |
### Audio file noise reduction | |
############################################ | |
from pydub import AudioSegment | |
import noisereduce as nr | |
from transformers import Wav2Vec2ForCTC, Wav2Vec2Processor | |
# Fonction de réduction de bruit | |
def reduce_noise(audio_data, sr): | |
reduced_noise = nr.reduce_noise(y=audio_data, sr=sr) | |
return reduced_noise | |
# Chargement du modèle wav2vec | |
processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base-960h") | |
model = Wav2Vec2ForCTC.from_pretrained("facebook/wav2vec2-base-960h") | |
# Interface Streamlit | |
st.title("Application de transcription audio avec réduction de bruit") | |
uploaded_file = st.file_uploader("Choisissez un fichier audio .wav", type="wav") | |
if uploaded_file is not None: | |
# Chargement et prétraitement de l'audio | |
audio = AudioSegment.from_wav(uploaded_file) | |
audio_array = np.array(audio.get_array_of_samples()) | |
# Réduction de bruit | |
reduced_noise_audio = reduce_noise(audio_array, audio.frame_rate) | |
# Traitement avec wav2vec | |
input_values = processor(reduced_noise_audio, sampling_rate=audio.frame_rate, return_tensors="pt").input_values | |
with torch.no_grad(): | |
logits = model(input_values).logits | |
predicted_ids = torch.argmax(logits, dim=-1) | |
transcription = processor.batch_decode(predicted_ids)[0] | |
st.audio(uploaded_file, format="audio/wav") | |
st.write("Transcription:") | |
st.write(transcription) | |
############################################ | |
### Choix des émotions | |
############################################ | |
# options = ['Sadness','Anger', 'Disgust', 'Fear', 'Surprise', 'Joy','Neutral'] | |
# selected_options = st.multiselect('What emotions do you want to be displayed', options, default=['Joy', 'Anger','Neutral]) | |
############################################ | |
### Transcription Speech2Text | |
############################################ | |
# # Fonction pour transcrire l'audio | |
# def transcribe_audio(audio): | |
# # Préparer les données d'entrée pour le modèle | |
# input_values = processor(audio, sampling_rate=16000, return_tensors="pt").input_values | |
# # Passer les données dans le modèle pour obtenir les logits | |
# with torch.no_grad(): | |
# logits = model(input_values).logits | |
# # Décoder les prédictions en texte | |
# predicted_ids = torch.argmax(logits, dim=-1) | |
# transcription = processor.batch_decode(predicted_ids)[0] | |
# return transcription | |
# # Charger et transcrire l'audio | |
# # audio, rate = load_audio(audio_file_path) # (re)chargement de l'audio si nécessaire | |
# transcription = transcribe_audio(audio) | |
# # Afficher la transcription | |
# print("Transcription :", transcription) | |
############################################ | |
### Feedback | |
############################################ | |
import pandas as pd | |
import os | |
# Initialisation du fichier CSV | |
csv_file = "predictions/feedback.csv" | |
# Vérifier si le fichier CSV existe, sinon le créer avec des colonnes appropriées | |
if not os.path.exists(csv_file): | |
df = pd.DataFrame(columns=["filepath", "prediction", "feedback"]) | |
df.to_csv(csv_file, index=False) | |
# Charger les données existantes du CSV | |
df = pd.read_csv(csv_file) | |
# Interface Streamlit | |
st.title("Predicted emotion feedback") | |
# Simuler une prédiction pour l'exemple (remplacez par votre modèle réel) | |
audio_file_name = "example_audio.wav" | |
predicted_emotion = "Joie" # Exemple de prédiction | |
st.write(f"Fichier audio : {audio_file_name}") | |
st.write(f"Émotion détectée : {predicted_emotion}") | |
# Formulaire de feedback | |
with st.form("feedback_form"): | |
st.write("Est-ce la bonne émotion qui a été détectée ? Cochez la réelle émotion.") | |
feedback = st.selectbox("Votre réponse :", ['Sadness','Anger', 'Disgust', 'Fear', 'Surprise', 'Joy', 'Neutral']) | |
submit_button = st.form_submit_button("Soumettre") | |
st.write("En cliquant sur ce bouton, vous acceptez que votre audio soit sauvegardé dans notre base de données.") | |
if submit_button: | |
# Ajouter le feedback au DataFrame | |
new_entry = {"filepath": audio_file_name, "prediction": predicted_emotion, "feedback": feedback} | |
df = df.append(new_entry, ignore_index=True) | |
# Sauvegarder les données mises à jour dans le fichier CSV | |
df.to_csv(csv_file, index=False) | |
# Sauvegarder le fichier audio | |
with open("predictions/data", "wb") as f: | |
f.write(uploaded_file.getbuffer()) | |
# Confirmation pour l'utilisateur | |
st.success("Merci pour votre retour ! Vos données ont été sauvegardées.") | |
# Afficher les données sauvegardées (optionnel) | |
# st.write("Données collectées jusqu'à présent :") | |
# st.dataframe(df) | |
############################################ | |
### Predict proba (to replace in predict.py) | |
############################################ | |
import librosa | |
def predict_emotion_probabilities(audio_path): | |
waveform, _ = librosa.load(audio_path, sr=16000) | |
input_values = processor(waveform, return_tensors="pt", sampling_rate=16000).input_values | |
input_values = input_values.to(device) | |
with torch.no_grad(): | |
outputs = model(input_values) | |
# Appliquer softmax pour obtenir des probabilités | |
probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1) | |
# Convertir en numpy array et prendre le premier (et seul) élément | |
probabilities = probabilities[0].detach().cpu().numpy() | |
# Créer un dictionnaire associant chaque émotion à sa probabilité | |
emotion_probabilities = {emotion: prob for emotion, prob in zip(emotion_labels, probabilities)} | |
return emotion_probabilities |