import gradio as gr from gradio_client import Client , handle_file import cv2 import os import shutil from PIL import Image from moviepy import * clientImgPipeLn = Client("dj-dawgs-ipd/IPD-Image-Pipeline") clientAudioPipeLn = Client("dj-dawgs-ipd/IPD-Audio-Pipeline") def predict(video_path): cap = cv2.VideoCapture(video_path) fps = int(cap.get(cv2.CAP_PROP_FPS)) frame_interval = fps * 2 frame_count = 0 success = True temp_data_path = "temp_data" os.makedirs(temp_data_path, exist_ok=True) temp_frames_path = os.path.join(temp_data_path, "temp_frames") os.makedirs(temp_frames_path, exist_ok=True) resImg = {} resAudio = {} video_clip = VideoFileClip(video_path) if video_clip.audio is None: resAudio = { 'prediction' : None, 'language' : None, 'label' : None, 'confidence' : None, 'hate_text' : None } else: audio_path = os.path.join(temp_data_path , "temp_audio.wav") video_clip.audio.write_audiofile(audio_path , codec="pcm_s16le") resAudio = clientAudioPipeLn.predict( audio_path = handle_file(audio_path), api_name = '/predict' ) while success: success, frame = cap.read() if frame_count % frame_interval == 0 and success: temp_image_path = os.path.join(temp_data_path, f"temp_frames/temp_frame_{frame_count // fps}s.jpg") cv2.imwrite(temp_image_path, frame) response = clientImgPipeLn.predict( image=handle_file(temp_image_path), api_name="/predict" ) print(f"Response for frame at {frame_count // fps}s: {response}") if response['prediction'] == 'hate': resImg = response resImg['hate_image_timestamp'] = frame_count//fps break frame_count += 1 cap.release() shutil.rmtree(temp_data_path) if len(resImg) == 0 and resAudio['prediction'] == 'not_hate': return { 'prediction' : 'not_hate', 'language' : { 'video' : None, 'audio' : None }, 'label' : { 'video' : None, 'audio' : None }, 'confidence' : None, 'hate_text' : { 'video' : None, 'audio' : None }, 'hate_image_timestamp' : None, 'hate_component' : None } if resImg['prediction'] == 'hate' and resAudio['prediction'] == 'not_hate': resImg['hate_component'] = 'video' return { 'prediction' : 'hate', 'language' : { 'video' : resImg['language'], 'audio' : None }, 'label' : { 'video' : resImg['label'], 'audio' : None }, 'confidence' : resImg['confidence'], 'hate_text' : { 'video' : resImg['hate_text'], 'audio' : None }, 'hate_image_timestamp' : resImg['hate_image_timestamp'], 'hate_component' : ["video"] } if len(resImg) == 0 and resAudio['prediction'] == 'hate': return { 'prediction' : 'hate', 'language' : { 'video' : None, 'audio' : resAudio['language'] }, 'label' : { 'video' : None, 'audio' : resAudio['label'] }, 'confidence' : resAudio['confidence'], 'hate_text' : { 'video' : None, 'audio' : resAudio['hate_text'] }, 'hate_image_timestamp' : None, 'hate_component' : ["audio"], } return { 'prediction' : 'hate', 'language' : { 'video' : resImg['language'], 'audio' : resAudio['language'] }, 'label' : { 'video' : resImg['label'], 'audio' : resAudio['label'] }, 'confidence' : ((resImg['confidence'] or 0) + (resAudio['confidence'] or 0)) / (2 - (resImg['confidence'] == None or resAudio['confidence'] == None)), 'hate_text' : { 'video' : resImg['hate_text'], 'audio' : resAudio['hate_text'] }, 'hate_image_timestamp' : resImg['hate_image_timestamp'], 'hate_component' : ["video" , "audio"] } iface = gr.Interface(fn=predict, inputs = gr.Video(), outputs=gr.JSON(), title = "Hate Speech Detection in Video", description = "Detect hateful symbols or text in Video" ) if __name__ == "__main__": iface.launch(show_error = True)