Last commit not found
import os | |
import subprocess | |
import gradio as gr | |
import whisper | |
import yt_dlp | |
import torch | |
import numpy as np | |
from moviepy.editor import VideoFileClip | |
from transformers import AutoModelForAudioClassification, AutoFeatureExtractor | |
from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
from transformers import BlipProcessor, BlipForConditionalGeneration | |
import cv2 | |
def authenticate_youtube(): | |
os.system('yt-dlp --username oauth2 --password ""') | |
def download_youtube_video(video_url, output_path): | |
ydl_opts = { | |
'format': 'bestvideo+bestaudio', | |
'outtmpl': os.path.join(output_path, '%(title)s.%(ext)s'), | |
'username': 'oauth2', | |
'password': '' | |
} | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
ydl.download([video_url]) | |
video_info = ydl.extract_info(video_url, download=False) | |
video_title = video_info.get('title', 'video') | |
return os.path.join(output_path, f"{video_title}.webm") | |
def convert_to_mp4(input_path, output_path): | |
output_file = os.path.join(output_path, 'video.mp4') | |
command = ['ffmpeg', '-i', input_path, '-c', 'copy', output_file] | |
subprocess.run(command, check=True) | |
return output_file | |
def extract_audio_from_video(video_path): | |
video_clip = VideoFileClip(video_path) | |
audio_output = os.path.join(output_path, 'audio.mp3') | |
audio_clip = video_clip.audio | |
audio_clip.write_audiofile(audio_output) | |
return audio_output | |
def convert_mp3_to_wav(mp3_path): | |
from pydub import AudioSegment | |
audio = AudioSegment.from_mp3(mp3_path) | |
wav_output = os.path.join(output_path, 'audio.wav') | |
audio.export(wav_output, format="wav") | |
return wav_output | |
def process_text(text): | |
model_name = "cardiffnlp/twitter-roberta-base-emotion" | |
emotion_labels = ['anger', 'joy', 'optimism', 'sad'] | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
model = AutoModelForSequenceClassification.from_pretrained(model_name) | |
inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512) | |
with torch.no_grad(): | |
outputs = model(**inputs) | |
logits = outputs.logits | |
emotion_probs = torch.softmax(logits, dim=-1).squeeze() | |
predicted_emotion = emotion_labels[torch.argmax(emotion_probs)] | |
emotion_dict = {emotion_labels[i]: emotion_probs[i].item() for i in range(len(emotion_labels))} | |
return emotion_dict, predicted_emotion | |
def preprocess_frame(frame): | |
frame = cv2.resize(frame, (224, 224)) | |
pixel_values = caption_processor(images=frame, return_tensors="pt").pixel_values | |
return pixel_values | |
def generate_caption(pixel_values): | |
caption_ids = caption_model.generate(pixel_values) | |
caption = caption_processor.batch_decode(caption_ids, skip_special_tokens=True)[0] | |
return caption | |
def predict_emotions(caption): | |
inputs = emotion_tokenizer(caption, return_tensors='pt', truncation=True, padding=True) | |
outputs = emotion_model(**inputs) | |
emotion_probs = torch.softmax(outputs.logits, dim=1) | |
predicted_emotions = {label: prob.item() for label, prob in zip(emotion_labels, emotion_probs[0])} | |
return predicted_emotions | |
caption_model_name = "Salesforce/blip-image-captioning-base" | |
caption_processor = BlipProcessor.from_pretrained(caption_model_name) | |
caption_model = BlipForConditionalGeneration.from_pretrained(caption_model_name) | |
emotion_model_name = "j-hartmann/emotion-english-distilroberta-base" | |
emotion_tokenizer = AutoTokenizer.from_pretrained(emotion_model_name) | |
emotion_model = AutoModelForSequenceClassification.from_pretrained(emotion_model_name) | |
def analyze_video(video_url): | |
global output_path | |
output_path = './' | |
authenticate_youtube() | |
video_path = download_youtube_video(video_url, output_path) | |
mp4_path = convert_to_mp4(video_path, output_path) | |
audio_path = extract_audio_from_video(mp4_path) | |
audio_wav_path = convert_mp3_to_wav(audio_path) | |
model_whisper = whisper.load_model("base") | |
result_whisper = model_whisper.transcribe(audio_wav_path) | |
transcript = result_whisper['text'] | |
emotion_dict_text, predicted_emotion_text = process_text(transcript) | |
n_frame_interval = 60 | |
emotion_vectors_video = [] | |
video_capture = cv2.VideoCapture(mp4_path) | |
total_frames_video = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT)) | |
frame_count_video = 0 | |
while video_capture.isOpened(): | |
ret_video, frame_video = video_capture.read() | |
if not ret_video or frame_count_video > total_frames_video: | |
break | |
if frame_count_video % n_frame_interval == 0: | |
pixel_values_video = preprocess_frame(frame_video) | |
caption_video = generate_caption(pixel_values_video) | |
predicted_emotions_video = predict_emotions(caption_video) | |
emotion_vectors_video.append(np.array(list(predicted_emotions_video.values()))) | |
frame_count_video += 1 | |
video_capture.release() | |
average_emotion_vector_video = np.mean(emotion_vectors_video, axis=0) | |
combined_emotion_vector_final = np.concatenate((np.array(list(emotion_dict_text.values())), average_emotion_vector_video)) | |
final_most_predicted_index = np.argmax(combined_emotion_vector_final) | |
final_most_predicted_emotion = list(emotion_dict_text.keys())[final_most_predicted_index] | |
return transcript, predicted_emotion_text, final_most_predicted_emotion | |
iface = gr.Interface(fn=analyze_video, | |
inputs=gr.Textbox(label="YouTube Video URL"), | |
outputs=["text", "text", "text"], | |
title="Multimodal Emotion Recognition", | |
description="Enter a YouTube Video URL to analyze emotions from both audio and visual content.") | |
if __name__ == "__main__": | |
iface.launch() | |