import gradio as gr import cv2 import numpy as np import tensorflow as tf import cv2 import json import numpy as np from concurrent.futures import ThreadPoolExecutor import dlib import tempfile import shutil import os from tensorflow.keras.applications.inception_v3 import preprocess_input model_path = 'deepfake_detection_model.h5' model = tf.keras.models.load_model(model_path) IMG_SIZE = (299, 299) MOTION_THRESHOLD = 20 FRAME_SKIP = 2 no_of_frames = 10 MAX_FRAMES=no_of_frames detector = dlib.get_frontal_face_detector() def extract_faces_from_frame(frame, detector): """ Detects faces in a frame and returns the resized faces. Parameters: - frame: The video frame to process. - detector: Dlib face detector. Returns: - resized_faces (list): List of resized faces detected in the frame. """ gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) faces = detector(gray_frame) resized_faces = [] for face in faces: x1, y1, x2, y2 = face.left(), face.top(), face.right(), face.bottom() crop_img = frame[y1:y2, x1:x2] if crop_img.size != 0: resized_face = cv2.resize(crop_img, IMG_SIZE) resized_faces.append(resized_face) # Debug: Log the number of faces detected #print(f"Detected {len(resized_faces)} faces in current frame") return resized_faces def process_frame(video_path, detector, frame_skip): """ Processes frames to extract motion and face data concurrently. Parameters: - cap: OpenCV VideoCapture object. - detector: Dlib face detector. - frame_skip (int): Number of frames to skip for processing. Returns: - motion_frames (list): List of motion-based face images. - all_faces (list): List of all detected faces for fallback. """ prev_frame = None frame_count = 0 motion_frames = [] all_faces = [] cap = cv2.VideoCapture(video_path) while cap.isOpened(): ret, frame = cap.read() if not ret: break # Skip frames to improve processing speed if frame_count % frame_skip != 0: frame_count += 1 continue # Debug: Log frame number being processed #print(f"Processing frame {frame_count}") # # Resize frame to reduce processing time (optional, adjust size as needed) # frame = cv2.resize(frame, (640, 360)) # Extract faces from the current frame faces = extract_faces_from_frame(frame, detector) all_faces.extend(faces) # Store all faces detected, including non-motion gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) if prev_frame is None: prev_frame = gray_frame frame_count += 1 continue # Calculate frame difference to detect motion frame_diff = cv2.absdiff(prev_frame, gray_frame) motion_score = np.sum(frame_diff) # Debug: Log the motion score #print(f"Motion score: {motion_score}") # Check if motion is above the defined threshold and add the face to motion frames if motion_score > MOTION_THRESHOLD and faces: motion_frames.extend(faces) prev_frame = gray_frame frame_count += 1 cap.release() return motion_frames, all_faces def select_well_distributed_frames(motion_frames, all_faces, no_of_frames): """ Selects well-distributed frames from the detected motion and fallback faces. Parameters: - motion_frames (list): List of frames with detected motion. - all_faces (list): List of all detected faces. - no_of_frames (int): Required number of frames. Returns: - final_frames (list): List of selected frames. """ # Case 1: Motion frames exceed the required number if len(motion_frames) >= no_of_frames: interval = len(motion_frames) // no_of_frames distributed_motion_frames = [motion_frames[i * interval] for i in range(no_of_frames)] return distributed_motion_frames # Case 2: Motion frames are less than the required number needed_frames = no_of_frames - len(motion_frames) # If all frames together are still less than needed, return all frames available if len(motion_frames) + len(all_faces) < no_of_frames: #print(f"Returning all available frames: {len(motion_frames) + len(all_faces)}") return motion_frames + all_faces interval = max(1, len(all_faces) // needed_frames) additional_faces = [all_faces[i * interval] for i in range(needed_frames)] combined_frames = motion_frames + additional_faces interval = max(1, len(combined_frames) // no_of_frames) final_frames = [combined_frames[i * interval] for i in range(no_of_frames)] return final_frames def extract_frames(no_of_frames, video_path): motion_frames, all_faces = process_frame(video_path, detector, FRAME_SKIP) final_frames = select_well_distributed_frames(motion_frames, all_faces, no_of_frames) return final_frames def predict_video(model, video_path): """ Predict if a video is REAL or FAKE using the trained model. Parameters: - model: The loaded deepfake detection model. - video_path: Path to the video file to be processed. Returns: - str: 'REAL' or 'FAKE' based on the model's prediction. """ # Extract frames from the video frames = extract_frames(no_of_frames, video_path) original_frames = frames # Convert the frames list to a 5D tensor (1, time_steps, height, width, channels) if len(frames) < MAX_FRAMES: # Pad with zero arrays to match MAX_FRAMES while len(frames) < MAX_FRAMES: frames.append(np.zeros((299, 299, 3), dtype=np.float32)) frames = frames[:MAX_FRAMES] frames = np.array(frames) frames = preprocess_input(frames) # Expand dims to fit the model input shape input_data = np.expand_dims(frames, axis=0) # Shape becomes (1, MAX_FRAMES, 299, 299, 3) # Predict using the model prediction = model.predict(input_data) probability = prediction[0][0] # Get the probability for the first (and only) sample # Convert probability to class label if probability >=0.6: predicted_label='FAKE' else: predicted_label = 'REAL' probability=1-probability return original_frames, predicted_label, probability def display_frames_and_prediction(video_file): # Ensure file is completely uploaded and of correct type if video_file is None: return [], "
No video uploaded!
", "" # Check file size (10 MB limit) if os.path.getsize(video_file) > 10 * 1024 * 1024: # 10 MB return [], "
File size exceeds 10 MB limit!
", "" # Check file extension if not video_file.endswith('.mp4'): return [], "
Only .mp4 files are allowed!
", "" with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file: temp_file_path = temp_file.name with open(video_file, 'rb') as src_file: with open(temp_file_path, 'wb') as dest_file: shutil.copyfileobj(src_file, dest_file) frames, predicted_label, confidence = predict_video(model, temp_file_path) os.remove(temp_file_path) confidence_text = f"Confidence: {confidence:.2%}" prediction_style = ( f"
" f"{predicted_label}
" ) return frames, prediction_style, confidence_text iface = gr.Interface( fn=display_frames_and_prediction, inputs=gr.File(label="Upload Video", interactive=True), outputs=[ gr.Gallery(label="Extracted Frames"), gr.HTML(label="Prediction"), gr.Textbox(label="Confidence", interactive=False) ], title="Deepfake Detection", description="Upload a video to determine if it is REAL or FAKE based on the deepfake detection model.", css="app.css", examples=[ ["examples/abarnvbtwb.mp4"], ["examples/aapnvogymq.mp4"], ], cache_examples="lazy" ) iface.launch()