emotion-llm / app.py
Garvitj's picture
Update app.py
915d8ef verified
raw
history blame
11.9 kB
import gradio as gr
import numpy as np
import cv2
import librosa
import speech_recognition as sr
import tempfile
import wave
import optimum
import os
import tensorflow as tf
from tensorflow.keras.preprocessing.text import tokenizer_from_json
from tensorflow.keras.models import load_model, model_from_json
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.preprocessing.sequence import pad_sequences
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import pickle
import json
from tensorflow.keras.preprocessing.image import img_to_array, load_img
from collections import Counter
from pydub import AudioSegment
import ffmpeg
nltk.download('punkt') # Tokenizer
nltk.download('wordnet') # WordNet lemmatizer
nltk.download('stopwords') # Stopwords
# Load the text model
with open('model_architecture_for_text_emotion_updated_json.json', 'r') as json_file:
model_json = json_file.read()
text_model = model_from_json(model_json)
text_model.load_weights("model_for_text_emotion_updated(1).keras")
# Load the encoder and scaler for audio
with open('encoder.pkl', 'rb') as file:
encoder = pickle.load(file)
with open('scaler.pkl', 'rb') as file:
scaler = pickle.load(file)
# Load the tokenizer for text
with open('tokenizer.json') as json_file:
tokenizer_json = json.load(json_file)
tokenizer = tokenizer_from_json(tokenizer_json)
# Load the audio model
audio_model = load_model('my_model.h5')
# Load the image model
image_model = load_model('model_emotion.h5')
# Initialize NLTK
lemmatizer = WordNetLemmatizer()
stop_words = set(stopwords.words('english'))
# Preprocess text function
def preprocess_text(text):
tokens = nltk.word_tokenize(text.lower())
tokens = [word for word in tokens if word.isalnum() and word not in stop_words]
lemmatized_tokens = [lemmatizer.lemmatize(word) for word in tokens]
return ' '.join(lemmatized_tokens)
# Extract features from audio
def extract_features(data, sample_rate):
result = np.array([])
zcr = np.mean(librosa.feature.zero_crossing_rate(y=data).T, axis=0)
result = np.hstack((result, zcr))
stft = np.abs(librosa.stft(data))
chroma_stft = np.mean(librosa.feature.chroma_stft(S=stft, sr=sample_rate).T, axis=0)
result = np.hstack((result, chroma_stft))
mfcc = np.mean(librosa.feature.mfcc(y=data, sr=sample_rate).T, axis=0)
result = np.hstack((result, mfcc))
rms = np.mean(librosa.feature.rms(y=data).T, axis=0)
result = np.hstack((result, rms))
mel = np.mean(librosa.feature.melspectrogram(y=data, sr=sample_rate).T, axis=0)
result = np.hstack((result, mel))
return result
# Predict emotion from text
def find_emotion_using_text(sample_rate, audio_data, recognizer):
mapping = {0: "anger", 1: "disgust", 2: "fear", 3: "joy", 4: "neutral", 5: "sadness", 6: "surprise"}
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio_file:
temp_audio_path = temp_audio_file.name
with wave.open(temp_audio_path, 'w') as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(sample_rate)
wf.writeframes(audio_data.tobytes())
with sr.AudioFile(temp_audio_path) as source:
audio_record = recognizer.record(source)
text = recognizer.recognize_google(audio_record)
pre_text = preprocess_text(text)
title_seq = tokenizer.texts_to_sequences([pre_text])
padded_title_seq = pad_sequences(title_seq, maxlen=35, padding='post', truncating='post')
inp1 = np.array(padded_title_seq)
text_prediction = text_model.predict(inp1)
os.remove(temp_audio_path)
max_index = text_prediction.argmax()
return mapping[max_index],text
# Predict emotion from audio
def predict_emotion(audio_data):
sample_rate, data = audio_data
data = data.flatten()
if data.dtype != np.float32:
data = data.astype(np.float32)
data = data / np.max(np.abs(data))
features = extract_features(data, sample_rate)
features = np.expand_dims(features, axis=0)
if features.ndim == 3:
features = np.squeeze(features, axis=2)
elif features.ndim != 2:
raise ValueError("Features array has unexpected dimensions.")
scaled_features = scaler.transform(features)
scaled_features = np.expand_dims(scaled_features, axis=2)
prediction = audio_model.predict(scaled_features)
emotion_index = np.argmax(prediction)
num_classes = len(encoder.categories_[0])
emotion_array = np.zeros((1, num_classes))
emotion_array[0, emotion_index] = 1
emotion_label = encoder.inverse_transform(emotion_array)[0]
return emotion_label
def preprocess_image(image):
image = load_img(image, target_size=(48, 48), color_mode="grayscale")
image = img_to_array(image)
image = np.expand_dims(image, axis=0)
image = image / 255.0
return image
# Predict emotion from image
def predict_emotion_from_image(image):
preprocessed_image = preprocess_image(image)
prediction = image_model.predict(preprocessed_image)
emotion_index = np.argmax(prediction)
mapping = {0: "anger", 1: "disgust", 2: "fear", 3: "joy", 4: "neutral", 5: "sadness", 6: "surprise"}
return mapping[emotion_index]
def process_video(video_path):
cap = cv2.VideoCapture(video_path)
frame_rate = cap.get(cv2.CAP_PROP_FPS)
frame_count = 0
predictions = []
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Process every nth frame (to speed up processing)
if frame_count % int(frame_rate) == 0:
# Convert frame to grayscale as required by your model
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
frame = cv2.resize(frame, (48, 48)) # Resize to match model input size
frame = img_to_array(frame)
frame = np.expand_dims(frame, axis=0) / 255.0
# Predict emotion
prediction = image_model.predict(frame)
predictions.append(np.argmax(prediction))
frame_count += 1
cap.release()
cv2.destroyAllWindows()
# Find the most common prediction
most_common_emotion = Counter(predictions).most_common(1)[0][0]
mapping = {0: "anger", 1: "disgust", 2: "fear", 3: "joy", 4: "neutral", 5: "sadness", 6: "surprise"}
return mapping[most_common_emotion]
def process_audio_from_video(video_path):
text_emotion = "Error in text processing" # Initialize text_emotion
text=""
try:
# Load the video using an alternative library (e.g., ffmpeg or cv2)
import ffmpeg
audio_output = tempfile.NamedTemporaryFile(delete=False, suffix=".wav").name
ffmpeg.input(video_path).output(audio_output, format="wav").run(quiet=True)
recognizer = sr.Recognizer()
with sr.AudioFile(audio_output) as source:
audio_record = recognizer.record(source)
text = recognizer.recognize_google(audio_record)
pre_text = preprocess_text(text)
title_seq = tokenizer.texts_to_sequences([pre_text])
padded_title_seq = pad_sequences(title_seq, maxlen=35, padding='post', truncating='post')
inp1 = np.array(padded_title_seq)
text_prediction = text_model.predict(inp1)
os.remove(audio_output)
max_index = text_prediction.argmax()
text_emotion = {0: "anger", 1: "disgust", 2: "fear", 3: "joy", 4: "neutral", 5: "sadness", 6: "surprise"}[max_index]
except Exception as e:
print(f"Error processing text from audio: {e}")
text_emotion = "Error in text processing"
try:
# Extract audio features for emotion recognition
sample_rate, data = librosa.load(video_path, sr=None, mono=True)
data = data.flatten()
if data.dtype != np.float32:
data = data.astype(np.float32)
data = data / np.max(np.abs(data))
features = extract_features(data, sample_rate)
features = np.expand_dims(features, axis=0)
scaled_features = scaler.transform(features)
scaled_features = np.expand_dims(scaled_features, axis=2)
prediction = audio_model.predict(scaled_features)
emotion_index = np.argmax(prediction)
num_classes = len(encoder.categories_[0])
emotion_array = np.zeros((1, num_classes))
emotion_array[0, emotion_index] = 1
audio_emotion = encoder.inverse_transform(emotion_array)[0]
except Exception as e:
print(f"Error processing audio features: {e}")
audio_emotion = "Error in audio processing"
return text_emotion, audio_emotion,text
import gradio as gr
from huggingface_hub import InferenceClient
from transformers import AutoTokenizer, AutoModelForCausalLM
from huggingface_hub import InferenceClient
client = InferenceClient("HuggingFaceH4/zephyr-7b-beta")
def respond(message, history: list[tuple[str, str]], system_message, max_tokens, temperature, top_p):
messages = [{"role": "system", "content": system_message}]
# Format history with user and bot messages
for val in history:
if val[0]:
messages.append({"role": "user", "content": val[0]})
if val[1]:
messages.append({"role": "assistant", "content": val[1]})
messages.append({"role": "user", "content": message})
response = ""
# Stream response from the model
for message in client.chat_completion(
messages,
max_tokens=max_tokens,
stream=True,
temperature=temperature,
top_p=top_p,
):
token = message.choices[0].delta.content
response += token
yield response
# Function to handle video processing and interaction
def transcribe_and_predict_video(video, chat_history=[]):
# Process the video for emotions (use your own emotion detection functions)
image_emotion = process_video(video)
text_emotion, audio_emotion, user_input = process_audio_from_video(video)
em = [image_emotion, text_emotion, audio_emotion]
# Format the conversation history
history_text = "".join([f"User ({msg[2]}): {msg[0]}\nBot: {msg[1]}\n" for msg in chat_history])
# Construct the prompt with emotion context and history
prompt = f"""
You are a helpful AI assistant. Respond like a human while considering the user's emotion.
User's Emotion: {em}
Conversation History:
{history_text}
User ({em}): {user_input}
Bot:"""
# Tokenize input
inputs = tokenizer(prompt, return_tensors="pt").to("cpu")
# Generate response
output = model.generate(**inputs, max_length=512, temperature=0.7, top_p=0.9, do_sample=True)
response = tokenizer.decode(output[0], skip_special_tokens=True).split("Bot:")[-1].strip()
# Store the current emotion for the user input (modify emotion detection as needed)
emotion = detect_emotion(user_input) # Assuming `detect_emotion` is a function that returns the user's emotion
# Update the chat history with the current conversation and emotion
chat_history.append((user_input, response, emotion))
return response, chat_history
# Gradio interface setup
demo = gr.ChatInterface(
respond,
additional_inputs=[
gr.Textbox(value="You are a friendly Chatbot.", label="System message"),
gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"),
gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"),
gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"),
],
)
# Launch the Gradio interface
if __name__ == "__main__":
demo.launch()