|
"""
|
|
Phi-4 model implementation for video anomaly detection using Hugging Face transformers.
|
|
"""
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import base64
|
|
import os
|
|
import tempfile
|
|
from PIL import Image
|
|
import io
|
|
import re
|
|
import torch
|
|
import time
|
|
from transformers import AutoModelForCausalLM, AutoTokenizer, AutoProcessor
|
|
from huggingface_hub import snapshot_download
|
|
|
|
class Phi4AnomalyDetector:
|
|
def __init__(self, model_name="microsoft/Phi-4-multimodal-instruct"):
|
|
"""
|
|
Initialize the Phi4AnomalyDetector with the Phi-4 vision model.
|
|
|
|
Args:
|
|
model_name (str): Name of the Phi-4 vision model on Hugging Face
|
|
"""
|
|
self.model_name = model_name
|
|
self.model_dir = os.path.join(os.getcwd(), "phi4_model")
|
|
self.device = "cuda" if torch.cuda.is_available() else "cpu"
|
|
|
|
|
|
self.load_model()
|
|
|
|
def load_model(self):
|
|
"""
|
|
Load the Phi-4 model from local directory or download from Hugging Face.
|
|
"""
|
|
try:
|
|
if not os.path.exists(self.model_dir):
|
|
print(f"Downloading {self.model_name} model to {self.model_dir}...")
|
|
snapshot_download(repo_id=self.model_name, local_dir=self.model_dir)
|
|
print("Model downloaded successfully.")
|
|
else:
|
|
print(f"Using existing model from {self.model_dir}")
|
|
|
|
|
|
self.processor = AutoProcessor.from_pretrained(
|
|
self.model_dir,
|
|
trust_remote_code=True
|
|
)
|
|
|
|
self.tokenizer = AutoTokenizer.from_pretrained(
|
|
self.model_dir,
|
|
trust_remote_code=True
|
|
)
|
|
|
|
|
|
if self.device == "cuda":
|
|
self.model = AutoModelForCausalLM.from_pretrained(
|
|
self.model_dir,
|
|
torch_dtype=torch.float16,
|
|
device_map="auto",
|
|
trust_remote_code=True
|
|
)
|
|
else:
|
|
self.model = AutoModelForCausalLM.from_pretrained(
|
|
self.model_dir,
|
|
device_map="auto",
|
|
trust_remote_code=True
|
|
)
|
|
|
|
print(f"Phi-4 model loaded successfully on {self.device}")
|
|
|
|
except Exception as e:
|
|
raise RuntimeError(f"Failed to load Phi-4 model: {str(e)}")
|
|
|
|
def extract_frames(self, video_path, skip_frames):
|
|
"""
|
|
Extract frames from a video file, skipping the specified number of frames.
|
|
|
|
Args:
|
|
video_path (str): Path to the video file
|
|
skip_frames (int): Number of frames to skip between captures
|
|
|
|
Returns:
|
|
list: List of extracted frames as numpy arrays
|
|
"""
|
|
frames = []
|
|
|
|
|
|
cap = cv2.VideoCapture(video_path)
|
|
|
|
|
|
|
|
|
|
if not cap.isOpened():
|
|
raise ValueError(f"Could not open video file: {video_path}")
|
|
|
|
frame_count = 0
|
|
while True:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
if frame_count % (skip_frames + 1) == 0:
|
|
|
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
frames.append(rgb_frame)
|
|
|
|
frame_count += 1
|
|
|
|
cap.release()
|
|
return frames
|
|
|
|
def process_live_stream(self, stream_source, skip_frames, prompt, analysis_depth="granular", max_frames=None, callback=None, time_interval=None):
|
|
"""
|
|
Process frames from a live video stream.
|
|
|
|
Args:
|
|
stream_source: Stream source (0 for webcam, URL for IP camera or RTSP stream)
|
|
skip_frames (int): Number of frames to skip between captures
|
|
prompt (str): Prompt describing what anomaly to look for
|
|
analysis_depth (str): "granular" for frame-by-frame analysis or "cumulative" for overall analysis
|
|
max_frames (int, optional): Maximum number of frames to process (None for unlimited)
|
|
callback (function, optional): Callback function to report progress
|
|
time_interval (int, optional): If set, capture one frame every X seconds instead of using skip_frames
|
|
|
|
Returns:
|
|
list or dict: List of analysis results for each processed frame (granular) or dict with cumulative analysis (cumulative)
|
|
"""
|
|
|
|
|
|
if os.name == 'nt' and (stream_source == 0 or stream_source == 1):
|
|
|
|
cap = cv2.VideoCapture(stream_source, cv2.CAP_DSHOW)
|
|
|
|
|
|
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M','J','P','G'))
|
|
else:
|
|
|
|
cap = cv2.VideoCapture(stream_source)
|
|
|
|
if not cap.isOpened():
|
|
raise ValueError(f"Could not open video stream: {stream_source}")
|
|
|
|
frames = []
|
|
frame_count = 0
|
|
processed_count = 0
|
|
last_capture_time = time.time()
|
|
|
|
try:
|
|
while True:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
current_time = time.time()
|
|
|
|
|
|
should_capture = False
|
|
|
|
if time_interval is not None:
|
|
|
|
if current_time - last_capture_time >= time_interval:
|
|
should_capture = True
|
|
last_capture_time = current_time
|
|
else:
|
|
|
|
if frame_count % (skip_frames + 1) == 0:
|
|
should_capture = True
|
|
|
|
if should_capture:
|
|
|
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
frames.append(rgb_frame)
|
|
processed_count += 1
|
|
|
|
|
|
if time_interval is not None:
|
|
|
|
result = self.analyze_frame(rgb_frame, prompt)
|
|
|
|
|
|
if "frame" not in result:
|
|
result["frame"] = rgb_frame
|
|
|
|
|
|
if callback:
|
|
callback(processed_count, -1)
|
|
|
|
|
|
if analysis_depth == "granular":
|
|
yield result
|
|
else:
|
|
|
|
|
|
pass
|
|
else:
|
|
|
|
if callback and max_frames:
|
|
callback(processed_count, max_frames)
|
|
|
|
|
|
if max_frames and processed_count >= max_frames:
|
|
break
|
|
|
|
frame_count += 1
|
|
finally:
|
|
cap.release()
|
|
|
|
|
|
|
|
if time_interval is not None and analysis_depth == "cumulative":
|
|
|
|
|
|
result = self.analyze_frames_cumulatively(frames, prompt, callback)
|
|
yield result
|
|
return
|
|
|
|
|
|
if time_interval is None:
|
|
if analysis_depth == "cumulative":
|
|
return self.analyze_frames_cumulatively(frames, prompt, callback)
|
|
else:
|
|
results = []
|
|
|
|
for i, frame in enumerate(frames):
|
|
if callback:
|
|
callback(i, len(frames))
|
|
|
|
result = self.analyze_frame(frame, prompt)
|
|
results.append(result)
|
|
|
|
return results
|
|
|
|
def analyze_frame(self, frame, prompt):
|
|
"""
|
|
Analyze a frame using the Phi-4 vision model.
|
|
|
|
Args:
|
|
frame (numpy.ndarray): Frame to analyze
|
|
prompt (str): Prompt describing what anomaly to look for
|
|
|
|
Returns:
|
|
dict: Analysis result from the model
|
|
"""
|
|
|
|
pil_image = Image.fromarray(frame)
|
|
|
|
|
|
enhanced_prompt = f"""
|
|
{prompt}
|
|
|
|
After your analysis, please include a structured assessment at the end of your response in this exact format:
|
|
ANOMALY_DETECTED: [Yes/No]
|
|
ANOMALY_TYPE: [Human/Non-human/None]
|
|
CONFIDENCE: [0-100]
|
|
|
|
For ANOMALY_DETECTED, answer "Yes" if you detect any anomaly, otherwise "No".
|
|
For ANOMALY_TYPE, if an anomaly is detected, classify it as either "Human" (if it involves people or human activities) or "Non-human" (if it involves objects, animals, or environmental factors). If no anomaly is detected, use "None".
|
|
For CONFIDENCE, provide a number from 0 to 100 indicating your confidence level in the assessment.
|
|
"""
|
|
|
|
try:
|
|
|
|
inputs = self.processor(text=enhanced_prompt, images=pil_image, return_tensors="pt")
|
|
|
|
|
|
for key in inputs:
|
|
if torch.is_tensor(inputs[key]):
|
|
inputs[key] = inputs[key].to(self.model.device)
|
|
|
|
|
|
with torch.no_grad():
|
|
outputs = self.model.generate(
|
|
**inputs,
|
|
max_new_tokens=500,
|
|
do_sample=False
|
|
)
|
|
|
|
|
|
response_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
|
|
|
|
|
|
if enhanced_prompt in response_text:
|
|
response_text = response_text.split(enhanced_prompt)[-1].strip()
|
|
|
|
|
|
anomaly_detected = False
|
|
anomaly_type = "None"
|
|
confidence = 0
|
|
|
|
|
|
anomaly_match = re.search(r'ANOMALY_DETECTED:\s*(Yes|No)', response_text, re.IGNORECASE)
|
|
if anomaly_match and anomaly_match.group(1).lower() == 'yes':
|
|
anomaly_detected = True
|
|
confidence = 90
|
|
|
|
|
|
type_match = re.search(r'ANOMALY_TYPE:\s*(Human|Non-human|None)', response_text, re.IGNORECASE)
|
|
if type_match:
|
|
anomaly_type = type_match.group(1)
|
|
|
|
|
|
conf_match = re.search(r'CONFIDENCE:\s*(\d+)', response_text, re.IGNORECASE)
|
|
if conf_match:
|
|
try:
|
|
confidence = int(conf_match.group(1))
|
|
except:
|
|
pass
|
|
|
|
return {
|
|
"text": response_text,
|
|
"analysis": response_text,
|
|
"frame": frame,
|
|
"anomaly_detected": anomaly_detected,
|
|
"anomaly_type": anomaly_type,
|
|
"confidence": confidence,
|
|
"timestamp": time.time()
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"error": str(e),
|
|
"frame": frame,
|
|
"anomaly_detected": False,
|
|
"anomaly_type": "None",
|
|
"confidence": 0,
|
|
"timestamp": time.time()
|
|
}
|
|
|
|
def analyze_frames_cumulatively(self, frames, prompt, callback=None):
|
|
"""
|
|
Analyze all frames together and provide a cumulative analysis.
|
|
|
|
Args:
|
|
frames (list): List of frames to analyze
|
|
prompt (str): Prompt describing what anomaly to look for
|
|
callback (function, optional): Callback function to report progress
|
|
|
|
Returns:
|
|
dict: Cumulative analysis result
|
|
"""
|
|
|
|
individual_results = []
|
|
for i, frame in enumerate(frames):
|
|
if callback:
|
|
callback(i, len(frames) * 2)
|
|
|
|
result = self.analyze_frame(frame, f"{prompt} Provide a brief analysis of this frame only.")
|
|
individual_results.append(result)
|
|
|
|
|
|
anomaly_frames = []
|
|
anomaly_descriptions = []
|
|
anomaly_types = []
|
|
|
|
for i, result in enumerate(individual_results):
|
|
if "error" not in result and result["anomaly_detected"]:
|
|
anomaly_frames.append(result["frame"])
|
|
anomaly_descriptions.append(f"Frame {i+1}: {result['text']}")
|
|
anomaly_types.append(result["anomaly_type"])
|
|
|
|
|
|
if len(anomaly_frames) >= 3:
|
|
break
|
|
|
|
|
|
if not anomaly_frames and len(frames) > 0:
|
|
if len(frames) == 1:
|
|
anomaly_frames = [frames[0]]
|
|
elif len(frames) == 2:
|
|
anomaly_frames = [frames[0], frames[1]]
|
|
else:
|
|
anomaly_frames = [
|
|
frames[0],
|
|
frames[len(frames) // 2],
|
|
frames[-1]
|
|
]
|
|
|
|
|
|
anomaly_frames = anomaly_frames[:3]
|
|
|
|
|
|
cumulative_prompt = f"""
|
|
{prompt}
|
|
|
|
Based on the analysis of all frames, provide a comprehensive summary of any anomalies detected in the video. Focus on patterns or recurring issues. Here are some notable observations from individual frames:
|
|
|
|
{chr(10).join(anomaly_descriptions[:5])}
|
|
|
|
After your analysis, please include a structured assessment at the end of your response in this exact format:
|
|
ANOMALY_DETECTED: [Yes/No]
|
|
ANOMALY_TYPE: [Human/Non-human/None]
|
|
|
|
For ANOMALY_DETECTED, answer "Yes" if you detect any anomaly across the video, otherwise "No".
|
|
For ANOMALY_TYPE, if an anomaly is detected, classify the predominant type as either "Human" (if it involves people or human activities) or "Non-human" (if it involves objects, animals, or environmental factors). If no anomaly is detected, use "None".
|
|
"""
|
|
|
|
try:
|
|
if callback:
|
|
callback(len(frames), len(frames) * 2)
|
|
|
|
|
|
representative_frame = anomaly_frames[0] if anomaly_frames else frames[0]
|
|
pil_image = Image.fromarray(representative_frame)
|
|
|
|
|
|
inputs = self.processor(text=cumulative_prompt, images=pil_image, return_tensors="pt")
|
|
|
|
|
|
for key in inputs:
|
|
if torch.is_tensor(inputs[key]):
|
|
inputs[key] = inputs[key].to(self.model.device)
|
|
|
|
|
|
with torch.no_grad():
|
|
outputs = self.model.generate(
|
|
**inputs,
|
|
max_new_tokens=1000,
|
|
do_sample=False
|
|
)
|
|
|
|
|
|
response_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
|
|
|
|
|
|
if cumulative_prompt in response_text:
|
|
response_text = response_text.split(cumulative_prompt)[-1].strip()
|
|
|
|
|
|
anomaly_detected = False
|
|
anomaly_type = "None"
|
|
confidence = 0
|
|
|
|
|
|
anomaly_match = re.search(r'ANOMALY_DETECTED:\s*(Yes|No)', response_text, re.IGNORECASE)
|
|
if anomaly_match and anomaly_match.group(1).lower() == 'yes':
|
|
anomaly_detected = True
|
|
confidence = 90
|
|
|
|
|
|
type_match = re.search(r'ANOMALY_TYPE:\s*(Human|Non-human|None)', response_text, re.IGNORECASE)
|
|
if type_match:
|
|
anomaly_type = type_match.group(1)
|
|
|
|
|
|
conf_match = re.search(r'CONFIDENCE:\s*(\d+)', response_text, re.IGNORECASE)
|
|
if conf_match:
|
|
try:
|
|
confidence = int(conf_match.group(1))
|
|
except:
|
|
pass
|
|
|
|
return {
|
|
"text": response_text,
|
|
"analysis": response_text,
|
|
"frames": anomaly_frames,
|
|
"anomaly_detected": anomaly_detected,
|
|
"anomaly_type": anomaly_type,
|
|
"confidence": confidence,
|
|
"timestamp": time.time()
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"error": str(e),
|
|
"frames": anomaly_frames,
|
|
"anomaly_detected": False,
|
|
"anomaly_type": "None",
|
|
"confidence": 0,
|
|
"timestamp": time.time()
|
|
}
|
|
|
|
def process_video(self, video_path, skip_frames, prompt, analysis_depth="granular", callback=None):
|
|
"""
|
|
Process a video file, extracting frames and analyzing them for anomalies.
|
|
|
|
Args:
|
|
video_path (str): Path to the video file
|
|
skip_frames (int): Number of frames to skip between captures
|
|
prompt (str): Prompt describing what anomaly to look for
|
|
analysis_depth (str): "granular" for frame-by-frame analysis or "cumulative" for overall analysis
|
|
callback (function, optional): Callback function to report progress
|
|
|
|
Returns:
|
|
list or dict: List of analysis results for each processed frame (granular) or dict with cumulative analysis (cumulative)
|
|
"""
|
|
frames = self.extract_frames(video_path, skip_frames)
|
|
|
|
if analysis_depth == "cumulative":
|
|
return self.analyze_frames_cumulatively(frames, prompt, callback)
|
|
else:
|
|
results = []
|
|
|
|
for i, frame in enumerate(frames):
|
|
if callback:
|
|
callback(i, len(frames))
|
|
|
|
result = self.analyze_frame(frame, prompt)
|
|
results.append(result)
|
|
|
|
return results |