File size: 6,189 Bytes
d6d8b90
 
 
 
 
 
 
 
 
 
 
 
9f703fc
 
 
 
d6d8b90
 
c3c9064
d6d8b90
 
 
 
 
c3c9064
d6d8b90
c3c9064
 
d6d8b90
 
 
 
 
 
c3c9064
d6d8b90
 
 
 
 
 
 
c3c9064
d6d8b90
 
 
 
 
 
9f703fc
d6d8b90
 
 
 
 
 
 
9f703fc
d6d8b90
 
9f703fc
d6d8b90
9f703fc
d6d8b90
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9f703fc
d6d8b90
9f703fc
d6d8b90
9f703fc
d6d8b90
 
 
 
 
 
 
 
 
 
9f703fc
d6d8b90
 
c3c9064
9f703fc
c3c9064
d6d8b90
 
1be32a3
d6d8b90
 
 
c3c9064
d6d8b90
c3c9064
909f75a
 
c3c9064
909f75a
 
 
d6d8b90
909f75a
 
d6d8b90
909f75a
 
d6d8b90
909f75a
 
 
c3c9064
909f75a
d6d8b90
909f75a
d6d8b90
909f75a
d6d8b90
909f75a
 
 
 
d6d8b90
909f75a
d6d8b90
9f703fc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
909f75a
d6d8b90
9f703fc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import os
import subprocess
import gradio as gr
import whisper
import yt_dlp
import torch
import numpy as np
from moviepy.editor import VideoFileClip
from transformers import AutoModelForAudioClassification, AutoFeatureExtractor
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from transformers import BlipProcessor, BlipForConditionalGeneration
import cv2

YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY')

def download_youtube_video(video_url, api_key):
    ydl_opts = {
        'format': 'bestvideo+bestaudio',
        'outtmpl': os.path.join('./', '%(title)s.%(ext)s'),
    }
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        ydl.download([video_url])
        video_info = ydl.extract_info(video_url, download=False)
        video_title = video_info.get('title', 'video')
        return os.path.join('./', f"{video_title}.webm")

def convert_to_mp4(input_path):
    output_file = os.path.join('./', 'video.mp4')
    command = ['ffmpeg', '-i', input_path, '-c', 'copy', output_file]
    subprocess.run(command, check=True)
    return output_file

def extract_audio_from_video(video_path):
    video_clip = VideoFileClip(video_path)
    audio_output = os.path.join('./', 'audio.mp3')
    audio_clip = video_clip.audio
    audio_clip.write_audiofile(audio_output)
    return audio_output

def convert_mp3_to_wav(mp3_path):
    from pydub import AudioSegment
    audio = AudioSegment.from_mp3(mp3_path)
    wav_output = os.path.join('./', 'audio.wav')
    audio.export(wav_output, format="wav")
    return wav_output

def process_text(text):
    model_name = "cardiffnlp/twitter-roberta-base-emotion"
    emotion_labels = ['anger', 'joy', 'optimism', 'sad']

    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForSequenceClassification.from_pretrained(model_name)

    inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits

    emotion_probs = torch.softmax(logits, dim=-1).squeeze()
    predicted_emotion = emotion_labels[torch.argmax(emotion_probs)]

    emotion_dict = {emotion_labels[i]: emotion_probs[i].item() for i in range(len(emotion_labels))}

    return emotion_dict, predicted_emotion

def preprocess_frame(frame):
    frame = cv2.resize(frame, (224, 224))
    pixel_values = caption_processor(images=frame, return_tensors="pt").pixel_values
    return pixel_values

def generate_caption(pixel_values):
    caption_ids = caption_model.generate(pixel_values)
    caption = caption_processor.batch_decode(caption_ids, skip_special_tokens=True)[0]
    return caption

def predict_emotions(caption):
    inputs = emotion_tokenizer(caption, return_tensors='pt', truncation=True, padding=True)
    outputs = emotion_model(**inputs)

    emotion_probs = torch.softmax(outputs.logits, dim=1)

    predicted_emotions = {label: prob.item() for label, prob in zip(emotion_labels, emotion_probs[0])}

    return predicted_emotions

caption_model_name = "Salesforce/blip-image-captioning-base"
caption_processor = BlipProcessor.from_pretrained(caption_model_name)
caption_model = BlipForConditionalGeneration.from_pretrained(caption_model_name)

emotion_model_name = "j-hartmann/emotion-english-distilroberta-base"
emotion_tokenizer = AutoTokenizer.from_pretrained(emotion_model_name)
emotion_model = AutoModelForSequenceClassification.from_pretrained(emotion_model_name)

def analyze_video(video_url):
    global output_path
    output_path = './'

    video_path = download_youtube_video(video_url, YOUTUBE_API_KEY)
    mp4_path = convert_to_mp4(video_path)
    audio_path = extract_audio_from_video(mp4_path)
    audio_wav_path = convert_mp3_to_wav(audio_path)

    model_whisper = whisper.load_model("base")
    result_whisper = model_whisper.transcribe(audio_wav_path)
    transcript = result_whisper['text']

    emotion_dict_text, predicted_emotion_text = process_text(transcript)

    n_frame_interval = 60
    emotion_vectors_video = []

    video_capture = cv2.VideoCapture(mp4_path)
    total_frames_video = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_count_video = 0

    while video_capture.isOpened():
        ret_video, frame_video = video_capture.read()

        if not ret_video or frame_count_video > total_frames_video:
            break

        if frame_count_video % n_frame_interval == 0:
            pixel_values_video = preprocess_frame(frame_video)
            caption_video = generate_caption(pixel_values_video)
            predicted_emotions_video, _ = predict_emotions(caption_video)
            emotion_vectors_video.append(np.array(list(predicted_emotions_video.values())))

        frame_count_video += 1

    video_capture.release()

    average_emotion_vector_video = np.mean(emotion_vectors_video, axis=0)
    combined_emotion_vector_final = np.concatenate((np.array(list(emotion_dict_text.values())), average_emotion_vector_video))
    final_most_predicted_index = np.argmax(combined_emotion_vector_final)
    final_most_predicted_emotion = list(emotion_dict_text.keys())[final_most_predicted_index]

    return transcript, predicted_emotion_text, final_most_predicted_emotion

with gr.Blocks() as iface:
    gr.Markdown("# 🎥 Multimodal Emotion Recognition\nUpload or enter a YouTube Video URL and analyze emotions from both audio and video frames.")
    
    with gr.Row():
        video_url = gr.Textbox(label="YouTube Video URL", placeholder="Enter video URL here...", interactive=True)
        api_key = gr.Textbox(label="YouTube API Key", placeholder="Enter your API key", type="password", interactive=True)
    
    with gr.Row():
        submit_button = gr.Button("Analyze Video")
    
    with gr.Row():
        transcript_output = gr.Textbox(label="Transcript", interactive=False)
        audio_emotion_output = gr.Textbox(label="Emotion from Audio", interactive=False)
        visual_emotion_output = gr.Textbox(label="Emotion from Video", interactive=False)
    
    submit_button.click(analyze_video, inputs=[video_url, api_key], outputs=[transcript_output, audio_emotion_output, visual_emotion_output])

if __name__ == "__main__":
    iface.launch()