Driver-Distraction-Detection / video_processor.py
simran0608's picture
Upload 8 files
530a851 verified
raw
history blame
4.72 kB
"""
Video Processing Utility for Drowsiness Detection
This script provides a more robust video processing interface
"""
import cv2 as cv
import os
import json
from datetime import datetime
import argparse
def get_video_info(video_path):
"""Get detailed video information"""
cap = cv.VideoCapture(video_path)
if not cap.isOpened():
return None
info = {
'fps': cap.get(cv.CAP_PROP_FPS),
'width': int(cap.get(cv.CAP_PROP_FRAME_WIDTH)),
'height': int(cap.get(cv.CAP_PROP_FRAME_HEIGHT)),
'total_frames': int(cap.get(cv.CAP_PROP_FRAME_COUNT)),
'duration': cap.get(cv.CAP_PROP_FRAME_COUNT) / cap.get(cv.CAP_PROP_FPS) if cap.get(cv.CAP_PROP_FPS) > 0 else 0,
'codec': int(cap.get(cv.CAP_PROP_FOURCC)),
'file_size': os.path.getsize(video_path)
}
cap.release()
return info
def create_processing_report(input_path, output_path, stats):
"""Create a JSON report of the processing results"""
report = {
'timestamp': datetime.now().isoformat(),
'input_file': input_path,
'output_file': output_path,
'video_info': get_video_info(input_path),
'detection_stats': stats,
'processing_info': {
'software': 'Drowsiness Detection System',
'version': '1.0'
}
}
report_path = output_path.replace('.mp4', '_report.json')
with open(report_path, 'w') as f:
json.dump(report, f, indent=2)
return report_path
def process_video_with_progress(input_path, output_path, progress_callback=None):
"""
Process video with progress callback
progress_callback: function that takes (current_frame, total_frames)
"""
# Import the drowsiness detection functions
from drowsiness_detection import process_frame, reset_counters
from drowsiness_detection import DROWSY_COUNTER, YAWN_COUNTER, HEAD_DOWN_COUNTER
reset_counters()
# Open video file
video_stream = cv.VideoCapture(input_path)
if not video_stream.isOpened():
raise ValueError(f"Could not open video file {input_path}")
# Get video properties
fps = int(video_stream.get(cv.CAP_PROP_FPS))
width = int(video_stream.get(cv.CAP_PROP_FRAME_WIDTH))
height = int(video_stream.get(cv.CAP_PROP_FRAME_HEIGHT))
total_frames = int(video_stream.get(cv.CAP_PROP_FRAME_COUNT))
# Setup video writer
fourcc = cv.VideoWriter_fourcc(*'mp4v')
video_writer = cv.VideoWriter(output_path, fourcc, fps, (640, 480))
frame_count = 0
try:
while True:
ret, frame = video_stream.read()
if not ret:
break
frame_count += 1
# Process frame
processed_frame = process_frame(frame)
# Write frame to output video
video_writer.write(processed_frame)
# Call progress callback if provided
if progress_callback:
progress_callback(frame_count, total_frames)
# Get final stats
stats = {
'total_frames': frame_count,
'drowsy_events': DROWSY_COUNTER,
'yawn_events': YAWN_COUNTER,
'head_down_events': HEAD_DOWN_COUNTER
}
return stats
finally:
video_stream.release()
video_writer.release()
def main():
parser = argparse.ArgumentParser(description='Video Processing Utility for Drowsiness Detection')
parser.add_argument('--input', '-i', required=True, help='Input video file path')
parser.add_argument('--output', '-o', help='Output video file path (optional)')
parser.add_argument('--report', '-r', action='store_true', help='Generate processing report')
parser.add_argument('--info', action='store_true', help='Show video information only')
args = parser.parse_args()
if not os.path.exists(args.input):
print(f"Error: Input file {args.input} does not exist")
return
# Show video info
if args.info:
info = get_video_info(args.input)
if info:
print(f"Video Information for: {args.input}")
print(f"Resolution: {info['width']}x{info['height']}")
print(f"FPS: {info['fps']:.2f}")
print(f"Duration: {info['duration']:.2f} seconds")
print(f"Total Frames: {info['total_frames']}")
print(f"File Size: {info['file_size'] / (1024*1024):.2f} MB")
else:
print("Error: Could not read video file")
return
# Generate output path if not provided
if not args.output:
base_name