|
import cv2
|
|
import numpy as np
|
|
import base64
|
|
import os
|
|
import tempfile
|
|
from openai import OpenAI
|
|
from PIL import Image
|
|
import io
|
|
import re
|
|
import time
|
|
|
|
class VideoAnomalyDetector:
|
|
def __init__(self, api_key, model="gpt-4o"):
|
|
"""
|
|
Initialize the VideoAnomalyDetector with OpenAI API key.
|
|
|
|
Args:
|
|
api_key (str): OpenAI API key for accessing GPT-4o model
|
|
model (str): Model to use for analysis ("gpt-4o" or "gpt-4o-mini")
|
|
"""
|
|
self.client = OpenAI(api_key=api_key)
|
|
self.model = model
|
|
|
|
def extract_frames(self, video_path, skip_frames):
|
|
"""
|
|
Extract frames from a video file, skipping the specified number of frames.
|
|
|
|
Args:
|
|
video_path (str): Path to the video file
|
|
skip_frames (int): Number of frames to skip between captures
|
|
|
|
Returns:
|
|
list: List of extracted frames as numpy arrays
|
|
"""
|
|
frames = []
|
|
|
|
|
|
cap = cv2.VideoCapture(video_path)
|
|
|
|
if not cap.isOpened():
|
|
raise ValueError(f"Could not open video file: {video_path}")
|
|
|
|
|
|
|
|
|
|
frame_count = 0
|
|
while True:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
if frame_count % (skip_frames + 1) == 0:
|
|
|
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
frames.append(rgb_frame)
|
|
|
|
frame_count += 1
|
|
|
|
cap.release()
|
|
return frames
|
|
|
|
def process_live_stream(self, stream_source, skip_frames, prompt, analysis_depth="granular", max_frames=None, callback=None, time_interval=None):
|
|
"""
|
|
Process frames from a live video stream.
|
|
|
|
Args:
|
|
stream_source: Stream source (0 for webcam, URL for IP camera or RTSP stream)
|
|
skip_frames (int): Number of frames to skip between captures
|
|
prompt (str): Prompt describing what anomaly to look for
|
|
analysis_depth (str): "granular" for frame-by-frame analysis or "cumulative" for overall analysis
|
|
max_frames (int, optional): Maximum number of frames to process (None for unlimited)
|
|
callback (function, optional): Callback function to report progress
|
|
time_interval (int, optional): If set, capture one frame every X seconds instead of using skip_frames
|
|
|
|
Returns:
|
|
list or dict: List of analysis results for each processed frame (granular) or dict with cumulative analysis (cumulative)
|
|
"""
|
|
|
|
|
|
if os.name == 'nt' and (stream_source == 0 or stream_source == 1):
|
|
|
|
cap = cv2.VideoCapture(stream_source, cv2.CAP_DSHOW)
|
|
|
|
|
|
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M','J','P','G'))
|
|
else:
|
|
|
|
cap = cv2.VideoCapture(stream_source)
|
|
|
|
if not cap.isOpened():
|
|
raise ValueError(f"Could not open video stream: {stream_source}")
|
|
|
|
frames = []
|
|
frame_count = 0
|
|
processed_count = 0
|
|
last_capture_time = time.time()
|
|
|
|
try:
|
|
while True:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
current_time = time.time()
|
|
|
|
|
|
should_capture = False
|
|
|
|
if time_interval is not None:
|
|
|
|
if current_time - last_capture_time >= time_interval:
|
|
should_capture = True
|
|
last_capture_time = current_time
|
|
else:
|
|
|
|
if frame_count % (skip_frames + 1) == 0:
|
|
should_capture = True
|
|
|
|
if should_capture:
|
|
|
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
frames.append(rgb_frame)
|
|
processed_count += 1
|
|
|
|
|
|
if time_interval is not None:
|
|
|
|
result = self.analyze_frame(rgb_frame, prompt)
|
|
|
|
|
|
if "frame" not in result:
|
|
result["frame"] = rgb_frame
|
|
|
|
|
|
if callback:
|
|
callback(processed_count, -1)
|
|
|
|
|
|
if analysis_depth == "granular":
|
|
yield result
|
|
else:
|
|
|
|
|
|
pass
|
|
else:
|
|
|
|
if callback and max_frames:
|
|
callback(processed_count, max_frames)
|
|
|
|
|
|
if max_frames and processed_count >= max_frames:
|
|
break
|
|
|
|
frame_count += 1
|
|
finally:
|
|
cap.release()
|
|
|
|
|
|
|
|
if time_interval is not None and analysis_depth == "cumulative":
|
|
|
|
|
|
result = self.analyze_frames_cumulatively(frames, prompt, callback)
|
|
yield result
|
|
return
|
|
|
|
|
|
if time_interval is None:
|
|
if analysis_depth == "cumulative":
|
|
return self.analyze_frames_cumulatively(frames, prompt, callback)
|
|
else:
|
|
results = []
|
|
|
|
for i, frame in enumerate(frames):
|
|
if callback:
|
|
callback(i, len(frames))
|
|
|
|
result = self.analyze_frame(frame, prompt)
|
|
results.append(result)
|
|
|
|
return results
|
|
|
|
def encode_image_to_base64(self, image_array):
|
|
"""
|
|
Convert a numpy array image to base64 encoded string.
|
|
|
|
Args:
|
|
image_array (numpy.ndarray): Image as numpy array
|
|
|
|
Returns:
|
|
str: Base64 encoded image string
|
|
"""
|
|
|
|
pil_image = Image.fromarray(image_array)
|
|
|
|
|
|
buffer = io.BytesIO()
|
|
|
|
|
|
pil_image.save(buffer, format="PNG")
|
|
|
|
|
|
img_bytes = buffer.getvalue()
|
|
|
|
|
|
base64_encoded = base64.b64encode(img_bytes).decode('utf-8')
|
|
|
|
return base64_encoded
|
|
|
|
def analyze_frame(self, frame, prompt):
|
|
"""
|
|
Analyze a frame using the selected OpenAI model.
|
|
|
|
Args:
|
|
frame (numpy.ndarray): Frame to analyze
|
|
prompt (str): Prompt describing what anomaly to look for
|
|
|
|
Returns:
|
|
dict: Analysis result from the model
|
|
"""
|
|
base64_image = self.encode_image_to_base64(frame)
|
|
|
|
|
|
enhanced_prompt = f"""
|
|
{prompt}
|
|
|
|
After your analysis, please include a structured assessment at the end of your response in this exact format:
|
|
ANOMALY_DETECTED: [Yes/No]
|
|
ANOMALY_TYPE: [Human/Non-human/None]
|
|
CONFIDENCE: [0-100]
|
|
|
|
For ANOMALY_DETECTED, answer "Yes" if you detect any anomaly, otherwise "No".
|
|
For ANOMALY_TYPE, if an anomaly is detected, classify it as either "Human" (if it involves people or human activities) or "Non-human" (if it involves objects, animals, or environmental factors). If no anomaly is detected, use "None".
|
|
For CONFIDENCE, provide a number from 0 to 100 indicating your confidence level in the assessment.
|
|
"""
|
|
|
|
try:
|
|
response = self.client.chat.completions.create(
|
|
model=self.model,
|
|
messages=[
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "text", "text": enhanced_prompt},
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:image/png;base64,{base64_image}"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
],
|
|
max_tokens=1000
|
|
)
|
|
|
|
|
|
response_text = response.choices[0].message.content
|
|
|
|
|
|
anomaly_detected = False
|
|
anomaly_type = "None"
|
|
confidence = 0
|
|
|
|
|
|
anomaly_match = re.search(r'ANOMALY_DETECTED:\s*(Yes|No)', response_text, re.IGNORECASE)
|
|
if anomaly_match and anomaly_match.group(1).lower() == 'yes':
|
|
anomaly_detected = True
|
|
confidence = 90
|
|
|
|
|
|
type_match = re.search(r'ANOMALY_TYPE:\s*(Human|Non-human|None)', response_text, re.IGNORECASE)
|
|
if type_match:
|
|
anomaly_type = type_match.group(1)
|
|
|
|
|
|
conf_match = re.search(r'CONFIDENCE:\s*(\d+)', response_text, re.IGNORECASE)
|
|
if conf_match:
|
|
try:
|
|
confidence = int(conf_match.group(1))
|
|
except:
|
|
pass
|
|
|
|
return {
|
|
"text": response_text,
|
|
"analysis": response_text,
|
|
"frame": frame,
|
|
"anomaly_detected": anomaly_detected,
|
|
"anomaly_type": anomaly_type,
|
|
"confidence": confidence,
|
|
"timestamp": time.time()
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"error": str(e),
|
|
"frame": frame,
|
|
"anomaly_detected": False,
|
|
"anomaly_type": "None",
|
|
"confidence": 0,
|
|
"timestamp": time.time()
|
|
}
|
|
|
|
def analyze_frames_cumulatively(self, frames, prompt, callback=None):
|
|
"""
|
|
Analyze all frames together and provide a cumulative analysis.
|
|
|
|
Args:
|
|
frames (list): List of frames to analyze
|
|
prompt (str): Prompt describing what anomaly to look for
|
|
callback (function, optional): Callback function to report progress
|
|
|
|
Returns:
|
|
dict: Cumulative analysis result
|
|
"""
|
|
|
|
individual_results = []
|
|
for i, frame in enumerate(frames):
|
|
if callback:
|
|
callback(i, len(frames) * 2)
|
|
|
|
result = self.analyze_frame(frame, f"{prompt} Provide a brief analysis of this frame only.")
|
|
individual_results.append(result)
|
|
|
|
|
|
anomaly_frames = []
|
|
anomaly_descriptions = []
|
|
anomaly_types = []
|
|
|
|
for i, result in enumerate(individual_results):
|
|
if "error" not in result and result["anomaly_detected"]:
|
|
anomaly_frames.append(result["frame"])
|
|
anomaly_descriptions.append(f"Frame {i+1}: {result['text']}")
|
|
anomaly_types.append(result["anomaly_type"])
|
|
|
|
|
|
if len(anomaly_frames) >= 3:
|
|
break
|
|
|
|
|
|
if not anomaly_frames and len(frames) > 0:
|
|
if len(frames) == 1:
|
|
anomaly_frames = [frames[0]]
|
|
elif len(frames) == 2:
|
|
anomaly_frames = [frames[0], frames[1]]
|
|
else:
|
|
anomaly_frames = [
|
|
frames[0],
|
|
frames[len(frames) // 2],
|
|
frames[-1]
|
|
]
|
|
|
|
|
|
anomaly_frames = anomaly_frames[:3]
|
|
|
|
|
|
cumulative_prompt = f"""
|
|
{prompt}
|
|
|
|
Based on the analysis of all frames, provide a comprehensive summary of any anomalies detected in the video. Focus on patterns or recurring issues. Here are some notable observations from individual frames:
|
|
|
|
{chr(10).join(anomaly_descriptions[:5])}
|
|
|
|
After your analysis, please include a structured assessment at the end of your response in this exact format:
|
|
ANOMALY_DETECTED: [Yes/No]
|
|
ANOMALY_TYPE: [Human/Non-human/None]
|
|
|
|
For ANOMALY_DETECTED, answer "Yes" if you detect any anomaly across the video, otherwise "No".
|
|
For ANOMALY_TYPE, if an anomaly is detected, classify the predominant type as either "Human" (if it involves people or human activities) or "Non-human" (if it involves objects, animals, or environmental factors). If no anomaly is detected, use "None".
|
|
"""
|
|
|
|
try:
|
|
if callback:
|
|
callback(len(frames), len(frames) * 2)
|
|
|
|
|
|
base64_images = [self.encode_image_to_base64(frame) for frame in anomaly_frames]
|
|
|
|
|
|
content = [{"type": "text", "text": cumulative_prompt}]
|
|
|
|
|
|
for base64_image in base64_images:
|
|
content.append({
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:image/png;base64,{base64_image}"
|
|
}
|
|
})
|
|
|
|
response = self.client.chat.completions.create(
|
|
model=self.model,
|
|
messages=[
|
|
{
|
|
"role": "user",
|
|
"content": content
|
|
}
|
|
],
|
|
max_tokens=1500
|
|
)
|
|
|
|
|
|
response_text = response.choices[0].message.content
|
|
|
|
|
|
anomaly_detected = False
|
|
anomaly_type = "None"
|
|
|
|
|
|
anomaly_match = re.search(r'ANOMALY_DETECTED:\s*(Yes|No)', response_text, re.IGNORECASE)
|
|
if anomaly_match and anomaly_match.group(1).lower() == 'yes':
|
|
anomaly_detected = True
|
|
|
|
|
|
type_match = re.search(r'ANOMALY_TYPE:\s*(Human|Non-human|None)', response_text, re.IGNORECASE)
|
|
if type_match:
|
|
anomaly_type = type_match.group(1)
|
|
|
|
return {
|
|
"text": response_text,
|
|
"frames": anomaly_frames,
|
|
"anomaly_detected": anomaly_detected,
|
|
"anomaly_type": anomaly_type,
|
|
"timestamp": time.time()
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"error": str(e),
|
|
"frames": anomaly_frames,
|
|
"anomaly_detected": False,
|
|
"anomaly_type": "None",
|
|
"timestamp": time.time()
|
|
}
|
|
|
|
def process_video(self, video_path, skip_frames, prompt, analysis_depth="granular", callback=None):
|
|
"""
|
|
Process a video file, extracting frames and analyzing them for anomalies.
|
|
|
|
Args:
|
|
video_path (str): Path to the video file
|
|
skip_frames (int): Number of frames to skip between captures
|
|
prompt (str): Prompt describing what anomaly to look for
|
|
analysis_depth (str): "granular" for frame-by-frame analysis or "cumulative" for overall analysis
|
|
callback (function, optional): Callback function to report progress
|
|
|
|
Returns:
|
|
list or dict: List of analysis results for each processed frame (granular) or dict with cumulative analysis (cumulative)
|
|
"""
|
|
frames = self.extract_frames(video_path, skip_frames)
|
|
|
|
if analysis_depth == "cumulative":
|
|
return self.analyze_frames_cumulatively(frames, prompt, callback)
|
|
else:
|
|
results = []
|
|
|
|
for i, frame in enumerate(frames):
|
|
if callback:
|
|
if analysis_depth == "cumulative":
|
|
callback(i, len(frames) / 2)
|
|
else:
|
|
callback(i, len(frames))
|
|
|
|
result = self.analyze_frame(frame, prompt)
|
|
results.append(result)
|
|
|
|
return results
|
|
|