|
import cv2 |
|
import os |
|
import json |
|
import subprocess |
|
import numpy as np |
|
import torch |
|
import matplotlib.pyplot as plt |
|
from tqdm import tqdm |
|
from PIL import Image |
|
from transformers import ( |
|
AutoImageProcessor, |
|
AutoModelForObjectDetection |
|
) |
|
import os |
|
import tempfile |
|
|
|
|
|
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
FRAME_EXTRACTION_INTERVAL = 0.05 |
|
|
|
|
|
try: |
|
print("π Loading visual model and processor...") |
|
processor_visual = AutoImageProcessor.from_pretrained("facebook/detr-resnet-50") |
|
model_visual = AutoModelForObjectDetection.from_pretrained("facebook/detr-resnet-50").to(DEVICE) |
|
print(f"β
Model loaded on {DEVICE} successfully!") |
|
except Exception as e: |
|
print(f"β Error loading model: {e}") |
|
exit() |
|
|
|
|
|
def extract_metadata(video_path): |
|
"""Extracts video metadata using FFmpeg""" |
|
try: |
|
cmd = ["ffprobe", "-v", "quiet", "-print_format", "json", |
|
"-show_format", "-show_streams", video_path] |
|
result = subprocess.run(cmd, capture_output=True, text=True) |
|
return json.loads(result.stdout) |
|
except Exception as e: |
|
print(f"β Metadata extraction failed: {e}") |
|
return {} |
|
|
|
|
|
def extract_frames(video_path, output_folder="frames"): |
|
"""Extracts frames from video at specified interval (supports sub-second intervals)""" |
|
os.makedirs(output_folder, exist_ok=True) |
|
|
|
cap = cv2.VideoCapture(video_path) |
|
if not cap.isOpened(): |
|
print("β Could not open video file") |
|
return 0 |
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
total_duration = total_frames / fps |
|
frame_count = 0 |
|
|
|
|
|
timestamp = 0.0 |
|
while timestamp <= total_duration: |
|
cap.set(cv2.CAP_PROP_POS_MSEC, timestamp * 1000) |
|
ret, frame = cap.read() |
|
if ret: |
|
cv2.imwrite(f"{output_folder}/frame_{frame_count:04d}.jpg", frame) |
|
frame_count += 1 |
|
else: |
|
break |
|
|
|
timestamp += FRAME_EXTRACTION_INTERVAL |
|
|
|
cap.release() |
|
return frame_count |
|
|
|
def calculate_optical_flow(frames_folder): |
|
"""Calculates dense optical flow between consecutive frames with validation""" |
|
frame_files = sorted([f for f in os.listdir(frames_folder) if f.endswith(".jpg")]) |
|
flow_results = [] |
|
|
|
|
|
ref_height, ref_width = None, None |
|
for f in frame_files: |
|
frame = cv2.imread(os.path.join(frames_folder, f)) |
|
if frame is not None: |
|
ref_height, ref_width = frame.shape[:2] |
|
break |
|
|
|
if ref_height is None: |
|
print("β No valid frames found for optical flow calculation") |
|
return [] |
|
|
|
prev_gray = None |
|
for i in tqdm(range(len(frame_files)), desc="Calculating optical flow"): |
|
current_path = os.path.join(frames_folder, frame_files[i]) |
|
current_frame = cv2.imread(current_path) |
|
|
|
if current_frame is None: |
|
continue |
|
|
|
|
|
if current_frame.shape[:2] != (ref_height, ref_width): |
|
current_frame = cv2.resize(current_frame, (ref_width, ref_height)) |
|
|
|
|
|
if len(current_frame.shape) == 2: |
|
current_frame = cv2.cvtColor(current_frame, cv2.COLOR_GRAY2BGR) |
|
|
|
current_gray = cv2.cvtColor(current_frame, cv2.COLOR_BGR2GRAY) |
|
|
|
if prev_gray is not None: |
|
flow = cv2.calcOpticalFlowFarneback( |
|
prev_gray, current_gray, None, |
|
pyr_scale=0.5, levels=3, iterations=3, |
|
winsize=15, poly_n=5, poly_sigma=1.2, flags=0 |
|
) |
|
|
|
flow_magnitude = np.sqrt(flow[...,0]*2 + flow[...,1]*2) |
|
flow_results.append({ |
|
"max_flow": float(flow_magnitude.max()), |
|
"mean_flow": float(flow_magnitude.mean()) |
|
}) |
|
|
|
prev_gray = current_gray |
|
|
|
|
|
window_size = 5 |
|
smoothed_flow = [] |
|
for i in range(len(flow_results)): |
|
start = max(0, i - window_size // 2) |
|
end = min(len(flow_results), i + window_size // 2 + 1) |
|
window = flow_results[start:end] |
|
avg_mean = np.mean([f['mean_flow'] for f in window]) |
|
avg_max = np.mean([f['max_flow'] for f in window]) |
|
smoothed_flow.append({'mean_flow': avg_mean, 'max_flow': avg_max}) |
|
|
|
return smoothed_flow |
|
|
|
|
|
def detect_objects(frames_folder): |
|
"""Processes frames through the visual detection model""" |
|
results = [] |
|
frame_files = sorted([f for f in os.listdir(frames_folder) if f.endswith(".jpg")]) |
|
|
|
for frame_file in tqdm(frame_files, desc="Analyzing frames"): |
|
try: |
|
image = Image.open(os.path.join(frames_folder, frame_file)) |
|
inputs = processor_visual(images=image, return_tensors="pt").to(DEVICE) |
|
|
|
with torch.no_grad(): |
|
outputs = model_visual(**inputs) |
|
|
|
|
|
target_sizes = torch.tensor([image.size[::-1]]).to(DEVICE) |
|
detections = processor_visual.post_process_object_detection( |
|
outputs, target_sizes=target_sizes, threshold=0.4 |
|
)[0] |
|
|
|
scores = detections["scores"].cpu().numpy().tolist() |
|
max_confidence = max(scores) if scores else 0.0 |
|
|
|
results.append({ |
|
"frame": frame_file, |
|
"detections": len(scores), |
|
"max_confidence": max_confidence, |
|
"average_confidence": np.mean(scores) if scores else 0.0 |
|
}) |
|
|
|
except Exception as e: |
|
print(f"β Error processing {frame_file}: {e}") |
|
results.append({ |
|
"frame": frame_file, |
|
"detections": 0, |
|
"max_confidence": 0.0, |
|
"average_confidence": 0.0 |
|
}) |
|
|
|
return results |
|
|
|
|
|
def detect_manipulation(report_path="report.json"): |
|
"""Determines video authenticity based on analysis results""" |
|
try: |
|
with open(report_path) as f: |
|
report = json.load(f) |
|
|
|
|
|
CONFIDENCE_THRESHOLD = 0.80 |
|
FLOW_STD_THRESHOLD = 28 |
|
SUSPICIOUS_FRAME_RATIO = 0.25 |
|
|
|
stats = report["summary_stats"] |
|
|
|
|
|
confidence_std = np.std([r["average_confidence"] for r in report["frame_analysis"]]) |
|
flow_std = stats.get("std_optical_flow", 0) |
|
low_conf_frames = sum(1 for r in report["frame_analysis"] if r["average_confidence"] < 0.4) |
|
anomaly_ratio = low_conf_frames / len(report["frame_analysis"]) |
|
|
|
|
|
score = 0 |
|
if stats["average_detection_confidence"] < CONFIDENCE_THRESHOLD: |
|
score += 1.5 |
|
if flow_std > FLOW_STD_THRESHOLD: |
|
score += 1.2 |
|
if anomaly_ratio > SUSPICIOUS_FRAME_RATIO: |
|
score += 1.0 |
|
if confidence_std > 0.2: |
|
score += 0.8 |
|
|
|
return score |
|
|
|
except Exception as e: |
|
return f"β Error in analysis: {str(e)}" |
|
|
|
|
|
|
|
def generate_report(visual_results, flow_results, output_file="report.json"): |
|
"""Generates comprehensive analysis report""" |
|
report_data = { |
|
"frame_analysis": visual_results, |
|
"motion_analysis": flow_results, |
|
"summary_stats": { |
|
"max_detection_confidence": max(r["max_confidence"] for r in visual_results), |
|
"average_detection_confidence": np.mean([r["average_confidence"] for r in visual_results]), |
|
"detection_confidence_std": np.std([r["average_confidence"] for r in visual_results]), |
|
"peak_optical_flow": max(r["max_flow"] for r in flow_results) if flow_results else 0, |
|
"average_optical_flow": np.mean([r["mean_flow"] for r in flow_results]) if flow_results else 0, |
|
"std_optical_flow": np.std([r["mean_flow"] for r in flow_results]) if flow_results else 0 |
|
} |
|
} |
|
|
|
with open(output_file, "w") as f: |
|
json.dump(report_data, f, indent=2) |
|
|
|
|
|
|
|
return report_data |
|
|
|
|
|
def analyze_video(video_path): |
|
"""Complete video analysis workflow""" |
|
print("\nπ Metadata Extraction:") |
|
metadata = extract_metadata(video_path) |
|
print(json.dumps(metadata.get("streams", [{}])[0], indent=2)) |
|
|
|
print("\nπ Frame Extraction:") |
|
frame_count = extract_frames(video_path) |
|
print(f"β
Extracted {frame_count} frames at {FRAME_EXTRACTION_INTERVAL}s intervals") |
|
|
|
print("\nπ Running object detection...") |
|
visual_results = detect_objects("frames") |
|
|
|
print("\nπ Calculating optical flow...") |
|
flow_results = calculate_optical_flow("frames") |
|
|
|
print("\nπ Generating Final Report...") |
|
report_data = generate_report(visual_results, flow_results) |
|
|
|
print("\nπ Authenticity Analysis:") |
|
score = detect_manipulation() |
|
|
|
print(f"\nπ― Final Score: {score}") |
|
return score |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import streamlit as st |
|
import tempfile |
|
def local_css(file_name): |
|
with open(file_name) as f: |
|
st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True) |
|
local_css("style.css") |
|
|
|
|
|
|
|
st.sidebar.title("Navigation") |
|
page = st.sidebar.radio("", ["Home", "Analyze Video", "About"]) |
|
|
|
|
|
if page == "Home": |
|
st.markdown("<h1 class='title'>Video Manipulation Detection</h1>", unsafe_allow_html=True) |
|
|
|
|
|
col1, col2 = st.columns(2) |
|
with col1: |
|
st.markdown(""" |
|
<div class='hero-text'> |
|
Detect manipulated videos with AI-powered analysis. |
|
Protect yourself from deepfakes and synthetic media. |
|
</div> |
|
""", unsafe_allow_html=True) |
|
|
|
with col2: |
|
st.video("Realistic Universe Intro_free.mp4") |
|
|
|
|
|
st.markdown("## How It Works") |
|
cols = st.columns(3) |
|
with cols[0]: |
|
st.image("upload-icon.png", width=100) |
|
st.markdown("### Upload Video") |
|
with cols[1]: |
|
st.image("analyze-icon.png", width=100) |
|
st.markdown("### AI Analysis") |
|
with cols[2]: |
|
st.image("result-icon.png", width=100) |
|
st.markdown("### Get Results") |
|
|
|
|
|
elif page == "Analyze Video": |
|
uploaded_file = st.file_uploader("Upload a Video", type=["mp4", "mov"]) |
|
|
|
if uploaded_file is not None: |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as temp_file: |
|
temp_file.write(uploaded_file.read()) |
|
temp_video_path = temp_file.name |
|
|
|
st.video(temp_video_path) |
|
|
|
if st.button("Analyze Video"): |
|
with st.spinner("Analyzing..."): |
|
try: |
|
score = analyze_video(temp_video_path) |
|
|
|
|
|
st.write(f"Analysis Score: {score}") |
|
float(score) |
|
|
|
if score >= 3.5 : |
|
st.markdown(f""" |
|
<div class='result-box suspicious'> |
|
<p>This video shows major signs of manipulation</p> |
|
</div> |
|
""", unsafe_allow_html=True) |
|
elif score >= 2.0: |
|
st.markdown(f""" |
|
<div class='result-box suspicious'> |
|
<p>This video shows minor signs of manipulation</p> |
|
</div> |
|
""", unsafe_allow_html=True) |
|
else: |
|
st.markdown(f""" |
|
<div class='result-box clean'> |
|
<p>No significant manipulation detected</p> |
|
</div> |
|
""", unsafe_allow_html=True) |
|
except Exception as e: |
|
st.error(f"An error occurred during analysis: {e}") |
|
|
|
elif page == "About": |
|
st.markdown("<h1 class='title'>About Us</h1>", unsafe_allow_html=True) |
|
|
|
|
|
col1, col2 = st.columns(2) |
|
with col1: |
|
st.image("creator.jpg", width=300, caption="Ayush Agarwal, Lead Developer") |
|
with col2: |
|
st.markdown(""" |
|
<div class='about-text'> |
|
## Ayush Agarwal , |
|
Student at VIT Bhopal University , |
|
AIML enthusiast |
|
<br><br> |
|
π§ [email protected] |
|
<br> |
|
π [LinkedIn](www.linkedin.com/in/ayush20039939) |
|
<br> |
|
π [GitHub](https://github.com/Ayush99392003) |
|
</div> |
|
""", unsafe_allow_html=True) |
|
|
|
|
|
st.markdown("## Our Technology") |
|
st.markdown(""" |
|
<div class='tech-stack'> |
|
<img src='https://img.icons8.com/color/96/000000/python.png'/> |
|
<img src='https://img.icons8.com/color/96/000000/tensorflow.png'/> |
|
<img src='https://img.icons8.com/color/96/000000/opencv.png'/> |
|
<img src='https://raw.githubusercontent.com/github/explore/968d1eb8fb6b704c6be917f0000283face4f33ee/topics/streamlit/streamlit.png'/> |
|
</div> |
|
""", unsafe_allow_html=True) |