|
|
import cv2 |
|
|
import numpy as np |
|
|
import base64 |
|
|
import os |
|
|
import tempfile |
|
|
from openai import OpenAI |
|
|
from PIL import Image |
|
|
import io |
|
|
import re |
|
|
import time |
|
|
|
|
|
class VideoAnomalyDetector: |
|
|
def __init__(self, api_key, model="gpt-4o"): |
|
|
""" |
|
|
Initialize the VideoAnomalyDetector with OpenAI API key. |
|
|
|
|
|
Args: |
|
|
api_key (str): OpenAI API key for accessing GPT-4o model |
|
|
model (str): Model to use for analysis ("gpt-4o" or "gpt-4o-mini") |
|
|
""" |
|
|
self.client = OpenAI(api_key=api_key) |
|
|
self.model = model |
|
|
|
|
|
def extract_frames(self, video_path, skip_frames): |
|
|
""" |
|
|
Extract frames from a video file, skipping the specified number of frames. |
|
|
|
|
|
Args: |
|
|
video_path (str): Path to the video file |
|
|
skip_frames (int): Number of frames to skip between captures |
|
|
|
|
|
Returns: |
|
|
list: List of extracted frames as numpy arrays |
|
|
""" |
|
|
frames = [] |
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
|
|
|
|
if not cap.isOpened(): |
|
|
raise ValueError(f"Could not open video file: {video_path}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
frame_count = 0 |
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
if frame_count % (skip_frames + 1) == 0: |
|
|
|
|
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
frames.append(rgb_frame) |
|
|
|
|
|
frame_count += 1 |
|
|
|
|
|
cap.release() |
|
|
return frames |
|
|
|
|
|
def process_live_stream(self, stream_source, skip_frames, prompt, analysis_depth="granular", max_frames=None, callback=None, time_interval=None): |
|
|
""" |
|
|
Process frames from a live video stream. |
|
|
|
|
|
Args: |
|
|
stream_source: Stream source (0 for webcam, URL for IP camera or RTSP stream) |
|
|
skip_frames (int): Number of frames to skip between captures |
|
|
prompt (str): Prompt describing what anomaly to look for |
|
|
analysis_depth (str): "granular" for frame-by-frame analysis or "cumulative" for overall analysis |
|
|
max_frames (int, optional): Maximum number of frames to process (None for unlimited) |
|
|
callback (function, optional): Callback function to report progress |
|
|
time_interval (int, optional): If set, capture one frame every X seconds instead of using skip_frames |
|
|
|
|
|
Returns: |
|
|
list or dict: List of analysis results for each processed frame (granular) or dict with cumulative analysis (cumulative) |
|
|
""" |
|
|
|
|
|
|
|
|
if os.name == 'nt' and (stream_source == 0 or stream_source == 1): |
|
|
|
|
|
cap = cv2.VideoCapture(stream_source, cv2.CAP_DSHOW) |
|
|
|
|
|
|
|
|
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M','J','P','G')) |
|
|
else: |
|
|
|
|
|
cap = cv2.VideoCapture(stream_source) |
|
|
|
|
|
if not cap.isOpened(): |
|
|
raise ValueError(f"Could not open video stream: {stream_source}") |
|
|
|
|
|
frames = [] |
|
|
frame_count = 0 |
|
|
processed_count = 0 |
|
|
last_capture_time = time.time() |
|
|
|
|
|
try: |
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
current_time = time.time() |
|
|
|
|
|
|
|
|
should_capture = False |
|
|
|
|
|
if time_interval is not None: |
|
|
|
|
|
if current_time - last_capture_time >= time_interval: |
|
|
should_capture = True |
|
|
last_capture_time = current_time |
|
|
else: |
|
|
|
|
|
if frame_count % (skip_frames + 1) == 0: |
|
|
should_capture = True |
|
|
|
|
|
if should_capture: |
|
|
|
|
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
frames.append(rgb_frame) |
|
|
processed_count += 1 |
|
|
|
|
|
|
|
|
if time_interval is not None: |
|
|
|
|
|
result = self.analyze_frame(rgb_frame, prompt) |
|
|
|
|
|
|
|
|
if "frame" not in result: |
|
|
result["frame"] = rgb_frame |
|
|
|
|
|
|
|
|
if callback: |
|
|
callback(processed_count, -1) |
|
|
|
|
|
|
|
|
if analysis_depth == "granular": |
|
|
yield result |
|
|
else: |
|
|
|
|
|
|
|
|
pass |
|
|
else: |
|
|
|
|
|
if callback and max_frames: |
|
|
callback(processed_count, max_frames) |
|
|
|
|
|
|
|
|
if max_frames and processed_count >= max_frames: |
|
|
break |
|
|
|
|
|
frame_count += 1 |
|
|
finally: |
|
|
cap.release() |
|
|
|
|
|
|
|
|
|
|
|
if time_interval is not None and analysis_depth == "cumulative": |
|
|
|
|
|
|
|
|
result = self.analyze_frames_cumulatively(frames, prompt, callback) |
|
|
yield result |
|
|
return |
|
|
|
|
|
|
|
|
if time_interval is None: |
|
|
if analysis_depth == "cumulative": |
|
|
return self.analyze_frames_cumulatively(frames, prompt, callback) |
|
|
else: |
|
|
results = [] |
|
|
|
|
|
for i, frame in enumerate(frames): |
|
|
if callback: |
|
|
callback(i, len(frames)) |
|
|
|
|
|
result = self.analyze_frame(frame, prompt) |
|
|
results.append(result) |
|
|
|
|
|
return results |
|
|
|
|
|
def encode_image_to_base64(self, image_array): |
|
|
""" |
|
|
Convert a numpy array image to base64 encoded string. |
|
|
|
|
|
Args: |
|
|
image_array (numpy.ndarray): Image as numpy array |
|
|
|
|
|
Returns: |
|
|
str: Base64 encoded image string |
|
|
""" |
|
|
|
|
|
pil_image = Image.fromarray(image_array) |
|
|
|
|
|
|
|
|
buffer = io.BytesIO() |
|
|
|
|
|
|
|
|
pil_image.save(buffer, format="PNG") |
|
|
|
|
|
|
|
|
img_bytes = buffer.getvalue() |
|
|
|
|
|
|
|
|
base64_encoded = base64.b64encode(img_bytes).decode('utf-8') |
|
|
|
|
|
return base64_encoded |
|
|
|
|
|
def analyze_frame(self, frame, prompt): |
|
|
""" |
|
|
Analyze a frame using the selected OpenAI model. |
|
|
|
|
|
Args: |
|
|
frame (numpy.ndarray): Frame to analyze |
|
|
prompt (str): Prompt describing what anomaly to look for |
|
|
|
|
|
Returns: |
|
|
dict: Analysis result from the model |
|
|
""" |
|
|
base64_image = self.encode_image_to_base64(frame) |
|
|
|
|
|
|
|
|
enhanced_prompt = f""" |
|
|
{prompt} |
|
|
|
|
|
After your analysis, please include a structured assessment at the end of your response in this exact format: |
|
|
ANOMALY_DETECTED: [Yes/No] |
|
|
ANOMALY_TYPE: [Human/Non-human/None] |
|
|
CONFIDENCE: [0-100] |
|
|
|
|
|
For ANOMALY_DETECTED, answer "Yes" if you detect any anomaly, otherwise "No". |
|
|
For ANOMALY_TYPE, if an anomaly is detected, classify it as either "Human" (if it involves people or human activities) or "Non-human" (if it involves objects, animals, or environmental factors). If no anomaly is detected, use "None". |
|
|
For CONFIDENCE, provide a number from 0 to 100 indicating your confidence level in the assessment. |
|
|
""" |
|
|
|
|
|
try: |
|
|
response = self.client.chat.completions.create( |
|
|
model=self.model, |
|
|
messages=[ |
|
|
{ |
|
|
"role": "user", |
|
|
"content": [ |
|
|
{"type": "text", "text": enhanced_prompt}, |
|
|
{ |
|
|
"type": "image_url", |
|
|
"image_url": { |
|
|
"url": f"data:image/png;base64,{base64_image}" |
|
|
} |
|
|
} |
|
|
] |
|
|
} |
|
|
], |
|
|
max_tokens=1000 |
|
|
) |
|
|
|
|
|
|
|
|
response_text = response.choices[0].message.content |
|
|
|
|
|
|
|
|
anomaly_detected = False |
|
|
anomaly_type = "None" |
|
|
confidence = 0 |
|
|
|
|
|
|
|
|
anomaly_match = re.search(r'ANOMALY_DETECTED:\s*(Yes|No)', response_text, re.IGNORECASE) |
|
|
if anomaly_match and anomaly_match.group(1).lower() == 'yes': |
|
|
anomaly_detected = True |
|
|
confidence = 90 |
|
|
|
|
|
|
|
|
type_match = re.search(r'ANOMALY_TYPE:\s*(Human|Non-human|None)', response_text, re.IGNORECASE) |
|
|
if type_match: |
|
|
anomaly_type = type_match.group(1) |
|
|
|
|
|
|
|
|
conf_match = re.search(r'CONFIDENCE:\s*(\d+)', response_text, re.IGNORECASE) |
|
|
if conf_match: |
|
|
try: |
|
|
confidence = int(conf_match.group(1)) |
|
|
except: |
|
|
pass |
|
|
|
|
|
return { |
|
|
"text": response_text, |
|
|
"analysis": response_text, |
|
|
"frame": frame, |
|
|
"anomaly_detected": anomaly_detected, |
|
|
"anomaly_type": anomaly_type, |
|
|
"confidence": confidence, |
|
|
"timestamp": time.time() |
|
|
} |
|
|
except Exception as e: |
|
|
return { |
|
|
"error": str(e), |
|
|
"frame": frame, |
|
|
"anomaly_detected": False, |
|
|
"anomaly_type": "None", |
|
|
"confidence": 0, |
|
|
"timestamp": time.time() |
|
|
} |
|
|
|
|
|
def analyze_frames_cumulatively(self, frames, prompt, callback=None): |
|
|
""" |
|
|
Analyze all frames together and provide a cumulative analysis. |
|
|
|
|
|
Args: |
|
|
frames (list): List of frames to analyze |
|
|
prompt (str): Prompt describing what anomaly to look for |
|
|
callback (function, optional): Callback function to report progress |
|
|
|
|
|
Returns: |
|
|
dict: Cumulative analysis result |
|
|
""" |
|
|
|
|
|
individual_results = [] |
|
|
for i, frame in enumerate(frames): |
|
|
if callback: |
|
|
callback(i, len(frames) * 2) |
|
|
|
|
|
result = self.analyze_frame(frame, f"{prompt} Provide a brief analysis of this frame only.") |
|
|
individual_results.append(result) |
|
|
|
|
|
|
|
|
anomaly_frames = [] |
|
|
anomaly_descriptions = [] |
|
|
anomaly_types = [] |
|
|
|
|
|
for i, result in enumerate(individual_results): |
|
|
if "error" not in result and result["anomaly_detected"]: |
|
|
anomaly_frames.append(result["frame"]) |
|
|
anomaly_descriptions.append(f"Frame {i+1}: {result['text']}") |
|
|
anomaly_types.append(result["anomaly_type"]) |
|
|
|
|
|
|
|
|
if len(anomaly_frames) >= 3: |
|
|
break |
|
|
|
|
|
|
|
|
if not anomaly_frames and len(frames) > 0: |
|
|
if len(frames) == 1: |
|
|
anomaly_frames = [frames[0]] |
|
|
elif len(frames) == 2: |
|
|
anomaly_frames = [frames[0], frames[1]] |
|
|
else: |
|
|
anomaly_frames = [ |
|
|
frames[0], |
|
|
frames[len(frames) // 2], |
|
|
frames[-1] |
|
|
] |
|
|
|
|
|
|
|
|
anomaly_frames = anomaly_frames[:3] |
|
|
|
|
|
|
|
|
cumulative_prompt = f""" |
|
|
{prompt} |
|
|
|
|
|
Based on the analysis of all frames, provide a comprehensive summary of any anomalies detected in the video. Focus on patterns or recurring issues. Here are some notable observations from individual frames: |
|
|
|
|
|
{chr(10).join(anomaly_descriptions[:5])} |
|
|
|
|
|
After your analysis, please include a structured assessment at the end of your response in this exact format: |
|
|
ANOMALY_DETECTED: [Yes/No] |
|
|
ANOMALY_TYPE: [Human/Non-human/None] |
|
|
|
|
|
For ANOMALY_DETECTED, answer "Yes" if you detect any anomaly across the video, otherwise "No". |
|
|
For ANOMALY_TYPE, if an anomaly is detected, classify the predominant type as either "Human" (if it involves people or human activities) or "Non-human" (if it involves objects, animals, or environmental factors). If no anomaly is detected, use "None". |
|
|
""" |
|
|
|
|
|
try: |
|
|
if callback: |
|
|
callback(len(frames), len(frames) * 2) |
|
|
|
|
|
|
|
|
base64_images = [self.encode_image_to_base64(frame) for frame in anomaly_frames] |
|
|
|
|
|
|
|
|
content = [{"type": "text", "text": cumulative_prompt}] |
|
|
|
|
|
|
|
|
for base64_image in base64_images: |
|
|
content.append({ |
|
|
"type": "image_url", |
|
|
"image_url": { |
|
|
"url": f"data:image/png;base64,{base64_image}" |
|
|
} |
|
|
}) |
|
|
|
|
|
response = self.client.chat.completions.create( |
|
|
model=self.model, |
|
|
messages=[ |
|
|
{ |
|
|
"role": "user", |
|
|
"content": content |
|
|
} |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
response_text = response.choices[0].message.content |
|
|
|
|
|
|
|
|
anomaly_detected = False |
|
|
anomaly_type = "None" |
|
|
|
|
|
|
|
|
anomaly_match = re.search(r'ANOMALY_DETECTED:\s*(Yes|No)', response_text, re.IGNORECASE) |
|
|
if anomaly_match and anomaly_match.group(1).lower() == 'yes': |
|
|
anomaly_detected = True |
|
|
|
|
|
|
|
|
type_match = re.search(r'ANOMALY_TYPE:\s*(Human|Non-human|None)', response_text, re.IGNORECASE) |
|
|
if type_match: |
|
|
anomaly_type = type_match.group(1) |
|
|
|
|
|
return { |
|
|
"text": response_text, |
|
|
"frames": anomaly_frames, |
|
|
"anomaly_detected": anomaly_detected, |
|
|
"anomaly_type": anomaly_type, |
|
|
"timestamp": time.time() |
|
|
} |
|
|
except Exception as e: |
|
|
return { |
|
|
"error": str(e), |
|
|
"frames": anomaly_frames, |
|
|
"anomaly_detected": False, |
|
|
"anomaly_type": "None", |
|
|
"timestamp": time.time() |
|
|
} |
|
|
|
|
|
def process_video(self, video_path, skip_frames, prompt, analysis_depth="granular", callback=None): |
|
|
""" |
|
|
Process a video file, extracting frames and analyzing them for anomalies. |
|
|
|
|
|
Args: |
|
|
video_path (str): Path to the video file |
|
|
skip_frames (int): Number of frames to skip between captures |
|
|
prompt (str): Prompt describing what anomaly to look for |
|
|
analysis_depth (str): "granular" for frame-by-frame analysis or "cumulative" for overall analysis |
|
|
callback (function, optional): Callback function to report progress |
|
|
|
|
|
Returns: |
|
|
list or dict: List of analysis results for each processed frame (granular) or dict with cumulative analysis (cumulative) |
|
|
""" |
|
|
frames = self.extract_frames(video_path, skip_frames) |
|
|
|
|
|
if analysis_depth == "cumulative": |
|
|
return self.analyze_frames_cumulatively(frames, prompt, callback) |
|
|
else: |
|
|
results = [] |
|
|
|
|
|
for i, frame in enumerate(frames): |
|
|
if callback: |
|
|
if analysis_depth == "cumulative": |
|
|
callback(i, len(frames) / 2) |
|
|
else: |
|
|
callback(i, len(frames)) |
|
|
|
|
|
result = self.analyze_frame(frame, prompt) |
|
|
results.append(result) |
|
|
|
|
|
return results |
|
|
|