File size: 6,370 Bytes
d6d8b90
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import os
import subprocess
import gradio as gr
import whisper
import yt_dlp
import torch
import numpy as np
from moviepy.editor import VideoFileClip
from transformers import AutoModelForAudioClassification, AutoFeatureExtractor
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from transformers import BlipProcessor, BlipForConditionalGeneration
import cv2

# Define the necessary functions

def download_youtube_video(video_url, output_path):
    ydl_opts = {
        'format': 'bestvideo+bestaudio',
        'outtmpl': os.path.join(output_path, '%(title)s.%(ext)s'),
    }
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        ydl.download([video_url])
        video_info = ydl.extract_info(video_url, download=False)
        video_title = video_info.get('title', 'video')
        return os.path.join(output_path, f"{video_title}.webm")

def convert_to_mp4(input_path, output_path):
    output_file = os.path.join(output_path, 'video.mp4')
    command = ['ffmpeg', '-i', input_path, '-c', 'copy', output_file]
    subprocess.run(command, check=True)
    return output_file

def extract_audio_from_video(video_path):
    video_clip = VideoFileClip(video_path)
    audio_output = os.path.join(output_path, 'audio.mp3')
    audio_clip = video_clip.audio
    audio_clip.write_audiofile(audio_output)
    return audio_output

def convert_mp3_to_wav(mp3_path):
    from pydub import AudioSegment
    audio = AudioSegment.from_mp3(mp3_path)
    wav_output = os.path.join(output_path, 'audio.wav')
    audio.export(wav_output, format="wav")
    return wav_output

def process_text(text):
    model_name = "cardiffnlp/twitter-roberta-base-emotion"
    emotion_labels = ['anger', 'joy', 'optimism', 'sad']
    
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForSequenceClassification.from_pretrained(model_name)

    inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits
    
    emotion_probs = torch.softmax(logits, dim=-1).squeeze()
    predicted_emotion = emotion_labels[torch.argmax(emotion_probs)]
    
    emotion_dict = {emotion_labels[i]: emotion_probs[i].item() for i in range(len(emotion_labels))}
    
    return emotion_dict, predicted_emotion

def preprocess_frame(frame):
    frame = cv2.resize(frame, (224, 224))
    pixel_values = caption_processor(images=frame, return_tensors="pt").pixel_values
    return pixel_values

def generate_caption(pixel_values):
    caption_ids = caption_model.generate(pixel_values)
    caption = caption_processor.batch_decode(caption_ids, skip_special_tokens=True)[0]
    return caption

def predict_emotions(caption):
    inputs = emotion_tokenizer(caption, return_tensors='pt', truncation=True, padding=True)
    outputs = emotion_model(**inputs)
    
    emotion_probs = torch.softmax(outputs.logits, dim=1)
    
    predicted_emotions = {label: prob.item() for label, prob in zip(emotion_labels, emotion_probs[0])}
    
    return predicted_emotions

# Load models and processors once at the start
caption_model_name = "Salesforce/blip-image-captioning-base"
caption_processor = BlipProcessor.from_pretrained(caption_model_name)
caption_model = BlipForConditionalGeneration.from_pretrained(caption_model_name)

emotion_model_name = "j-hartmann/emotion-english-distilroberta-base"
emotion_tokenizer = AutoTokenizer.from_pretrained(emotion_model_name)
emotion_model = AutoModelForSequenceClassification.from_pretrained(emotion_model_name)

# Gradio Interface Function
def analyze_video(video_url):
    # Set output path for downloads
    global output_path
    output_path = './'

    # Download the video
    video_path = download_youtube_video(video_url, output_path)

    # Convert to mp4 format
    mp4_path = convert_to_mp4(video_path, output_path)

    # Extract audio from the video
    audio_path = extract_audio_from_video(mp4_path)

    # Convert audio to wav format for processing
    audio_wav_path = convert_mp3_to_wav(audio_path)

    # Process the audio using Whisper for transcription
    model_whisper = whisper.load_model("base")
    
    result_whisper = model_whisper.transcribe(audio_wav_path)
    
    transcript = result_whisper['text']
    
    # Process text to get emotions
    emotion_dict_text, predicted_emotion_text = process_text(transcript)

    
   # Process the video using image captioning and emotion recognition
   n_frame_interval = 60  # Process every 60th frame
   emotion_vectors_video = []

   # Process the video frames for emotions using BLIP model 
   video_capture = cv2.VideoCapture(mp4_path)
   total_frames_video = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT))

   frame_count_video = 0

   while video_capture.isOpened():
       ret_video, frame_video = video_capture.read()

       if not ret_video or frame_count_video > total_frames_video:
           break

       if frame_count_video % n_frame_interval == 0:
           pixel_values_video = preprocess_frame(frame_video)
           caption_video = generate_caption(pixel_values_video)
           predicted_emotions_video, _ = predict_emotions(caption_video)

           # Collect emotion vectors from frames 
           emotion_vectors_video.append(np.array(list(predicted_emotions_video.values())))

       frame_count_video += 1

   video_capture.release()

   # Aggregate results from video frames 
   average_emotion_vector_video = np.mean(emotion_vectors_video, axis=0)

   # Combine text and video emotion results 
   combined_emotion_vector_final= np.concatenate((np.array(list(emotion_dict_text.values())), average_emotion_vector_video))

   final_most_predicted_index= np.argmax(combined_emotion_vector_final)
   
   final_most_predicted_emotion= list(emotion_dict_text.keys())[final_most_predicted_index]

   return transcript, predicted_emotion_text, final_most_predicted_emotion


# Create Gradio interface 
iface= gr.Interface(fn=analyze_video,
                     inputs=gr.Textbox(label="YouTube Video URL"),
                     outputs=["text", "text", "text"],
                     title="Multimodal Emotion Recognition",
                     description="Enter a YouTube Video URL to analyze emotions from both audio and visual content.")
                     
# Launch the app 
if __name__ == "__main__":
     iface.launch()