Spaces:
Running
on
Zero
Running
on
Zero
| import cv2 | |
| import os | |
| import tempfile | |
| import uuid | |
| import time | |
| import traceback | |
| import numpy as np | |
| from PIL import Image | |
| from typing import Dict, List, Tuple, Any, Optional | |
| from collections import defaultdict | |
| from dataclasses import dataclass | |
| import math | |
| from detection_model import DetectionModel | |
| from evaluation_metrics import EvaluationMetrics | |
| class ObjectRecord: | |
| """物體記錄數據結構""" | |
| class_name: str | |
| first_seen_time: float | |
| last_seen_time: float | |
| total_detections: int | |
| peak_count_in_frame: int | |
| confidence_avg: float | |
| def get_duration(self) -> float: | |
| """獲取物體在影片中的持續時間""" | |
| return self.last_seen_time - self.first_seen_time | |
| def format_time(self, seconds: float) -> str: | |
| """格式化時間顯示""" | |
| minutes = int(seconds // 60) | |
| secs = int(seconds % 60) | |
| if minutes > 0: | |
| return f"{minutes}m{secs:02d}s" | |
| return f"{secs}s" | |
| class VideoProcessor: | |
| """ | |
| 專注於實用統計分析的視頻處理器: | |
| - 準確的物體計數和識別 | |
| - 物體出現時間分析 | |
| - 檢測品質評估 | |
| - 活動密度統計 | |
| """ | |
| def __init__(self): | |
| """初始化視頻處理器""" | |
| self.detection_models: Dict[str, DetectionModel] = {} | |
| # 分析參數 | |
| self.spatial_cluster_threshold = 100 # 像素距離閾值,用於合併重複檢測 | |
| self.confidence_filter_threshold = 0.1 # 最低信心度過濾 | |
| # 統計數據收集 | |
| self.frame_detections = [] # 每幀檢測結果 | |
| self.object_timeline = defaultdict(list) # 物體時間線記錄 | |
| self.frame_timestamps = [] # 幀時間戳記錄 | |
| def get_or_create_model(self, model_name: str, confidence_threshold: float) -> DetectionModel: | |
| """獲取或創建檢測模型實例""" | |
| model_key = f"{model_name}_{confidence_threshold}" | |
| if model_key not in self.detection_models: | |
| try: | |
| model = DetectionModel(model_name, confidence_threshold) | |
| self.detection_models[model_key] = model | |
| print(f"Loaded detection model: {model_name} with confidence {confidence_threshold}") | |
| except Exception as e: | |
| print(f"Error loading model {model_name}: {e}") | |
| raise | |
| return self.detection_models[model_key] | |
| def cluster_detections_by_position(self, detections: List[Dict], threshold: float = 100) -> List[Dict]: | |
| """根據位置聚類檢測結果,合併相近的重複檢測""" | |
| if not detections: | |
| return [] | |
| # 按物體類別分組進行聚類處理 | |
| class_groups = defaultdict(list) | |
| for det in detections: | |
| class_groups[det['class_name']].append(det) | |
| clustered_results = [] | |
| for class_name, class_detections in class_groups.items(): | |
| if len(class_detections) == 1: | |
| clustered_results.extend(class_detections) | |
| continue | |
| # 執行空間聚類算法 | |
| clusters = [] | |
| used = set() | |
| for i, det1 in enumerate(class_detections): | |
| if i in used: | |
| continue | |
| cluster = [det1] | |
| used.add(i) | |
| # 計算檢測框中心點 | |
| x1_center = (det1['bbox'][0] + det1['bbox'][2]) / 2 | |
| y1_center = (det1['bbox'][1] + det1['bbox'][3]) / 2 | |
| # 查找相近的檢測結果 | |
| for j, det2 in enumerate(class_detections): | |
| if j in used: | |
| continue | |
| x2_center = (det2['bbox'][0] + det2['bbox'][2]) / 2 | |
| y2_center = (det2['bbox'][1] + det2['bbox'][3]) / 2 | |
| distance = math.sqrt((x1_center - x2_center)**2 + (y1_center - y2_center)**2) | |
| if distance < threshold: | |
| cluster.append(det2) | |
| used.add(j) | |
| clusters.append(cluster) | |
| # 為每個聚類生成代表性檢測結果 | |
| for cluster in clusters: | |
| best_detection = max(cluster, key=lambda x: x['confidence']) | |
| avg_confidence = sum(det['confidence'] for det in cluster) / len(cluster) | |
| best_detection['confidence'] = avg_confidence | |
| best_detection['cluster_size'] = len(cluster) | |
| clustered_results.append(best_detection) | |
| return clustered_results | |
| def analyze_frame_detections(self, detections: Any, timestamp: float, class_names: Dict[int, str]): | |
| """分析單幀的檢測結果並更新統計記錄""" | |
| if not hasattr(detections, 'boxes') or len(detections.boxes) == 0: | |
| self.frame_detections.append([]) | |
| self.frame_timestamps.append(timestamp) | |
| return | |
| # extract detected data | |
| boxes = detections.boxes.xyxy.cpu().numpy() | |
| classes = detections.boxes.cls.cpu().numpy().astype(int) | |
| confidences = detections.boxes.conf.cpu().numpy() | |
| # 轉換為統一的檢測格式 | |
| frame_detections = [] | |
| for box, cls_id, conf in zip(boxes, classes, confidences): | |
| if conf >= self.confidence_filter_threshold: | |
| frame_detections.append({ | |
| 'bbox': tuple(box), | |
| 'class_id': cls_id, | |
| 'class_name': class_names.get(cls_id, f'class_{cls_id}'), | |
| 'confidence': conf, | |
| 'timestamp': timestamp | |
| }) | |
| # 為了避免有重複偵測, 用空間聚類 | |
| clustered_detections = self.cluster_detections_by_position( | |
| frame_detections, self.spatial_cluster_threshold | |
| ) | |
| # record results | |
| self.frame_detections.append(clustered_detections) | |
| self.frame_timestamps.append(timestamp) | |
| # 更新物體時間線記錄 | |
| for detection in clustered_detections: | |
| class_name = detection['class_name'] | |
| self.object_timeline[class_name].append({ | |
| 'timestamp': timestamp, | |
| 'confidence': detection['confidence'], | |
| 'bbox': detection['bbox'] | |
| }) | |
| def generate_object_statistics(self, fps: float) -> Dict[str, ObjectRecord]: | |
| """生成物體統計數據""" | |
| object_stats = {} | |
| for class_name, timeline in self.object_timeline.items(): | |
| if not timeline: | |
| continue | |
| # 計算基本時間統計 | |
| timestamps = [entry['timestamp'] for entry in timeline] | |
| confidences = [entry['confidence'] for entry in timeline] | |
| first_seen = min(timestamps) | |
| last_seen = max(timestamps) | |
| total_detections = len(timeline) | |
| avg_confidence = sum(confidences) / len(confidences) | |
| # 計算每個時間點的物體數量以確定峰值 | |
| frame_counts = defaultdict(int) | |
| for entry in timeline: | |
| frame_timestamp = entry['timestamp'] | |
| frame_counts[frame_timestamp] += 1 | |
| peak_count = max(frame_counts.values()) if frame_counts else 1 | |
| # 創建物體記錄 | |
| object_stats[class_name] = ObjectRecord( | |
| class_name=class_name, | |
| first_seen_time=first_seen, | |
| last_seen_time=last_seen, | |
| total_detections=total_detections, | |
| peak_count_in_frame=peak_count, | |
| confidence_avg=avg_confidence | |
| ) | |
| return object_stats | |
| def analyze_object_density(self, object_stats: Dict[str, ObjectRecord], video_duration: float) -> Dict[str, Any]: | |
| """分析物體密度和活動模式""" | |
| total_objects = sum(record.peak_count_in_frame for record in object_stats.values()) | |
| objects_per_minute = (total_objects / video_duration) * 60 if video_duration > 0 else 0 | |
| # 分析每30秒時間段的活動分布 | |
| time_segments = defaultdict(int) | |
| segment_duration = 30 | |
| for detections, timestamp in zip(self.frame_detections, self.frame_timestamps): | |
| segment = int(timestamp // segment_duration) * segment_duration | |
| time_segments[segment] += len(detections) | |
| # 辨識活動高峰時段 | |
| peak_segments = [] | |
| if time_segments: | |
| max_activity = max(time_segments.values()) | |
| threshold = max_activity * 0.8 # 80%活動量代表高度活躍 | |
| for segment, activity in time_segments.items(): | |
| if activity >= threshold: | |
| peak_segments.append({ | |
| 'start_time': segment, | |
| 'end_time': min(segment + segment_duration, video_duration), | |
| 'activity_count': activity, | |
| 'description': f"{segment}s-{min(segment + segment_duration, video_duration):.0f}s" | |
| }) | |
| return { | |
| 'total_objects_detected': total_objects, | |
| 'objects_per_minute': round(objects_per_minute, 2), | |
| 'video_duration_seconds': video_duration, | |
| 'peak_activity_periods': peak_segments, | |
| 'activity_distribution': {str(k): v for k, v in time_segments.items()} | |
| } | |
| def analyze_quality_metrics(self, object_stats: Dict[str, ObjectRecord]) -> Dict[str, Any]: | |
| """分析檢測品質指標""" | |
| all_confidences = [] | |
| class_confidence_stats = {} | |
| # 收集所有置信度數據進行分析 | |
| for class_name, record in object_stats.items(): | |
| class_confidences = [] | |
| for detection_data in self.object_timeline[class_name]: | |
| conf = detection_data['confidence'] | |
| all_confidences.append(conf) | |
| class_confidences.append(conf) | |
| # 計算各類別的置信度統計 | |
| if class_confidences: | |
| class_confidence_stats[class_name] = { | |
| 'average_confidence': round(np.mean(class_confidences), 3), | |
| 'min_confidence': round(np.min(class_confidences), 3), | |
| 'max_confidence': round(np.max(class_confidences), 3), | |
| 'confidence_stability': round(1 - np.std(class_confidences), 3), | |
| 'detection_count': len(class_confidences) | |
| } | |
| # 計算整體品質指標 | |
| if all_confidences: | |
| overall_confidence = np.mean(all_confidences) | |
| confidence_std = np.std(all_confidences) | |
| # 品質等級評估 | |
| if overall_confidence > 0.8 and confidence_std < 0.1: | |
| quality_grade = "excellent" | |
| elif overall_confidence > 0.6 and confidence_std < 0.2: | |
| quality_grade = "good" | |
| elif overall_confidence > 0.4: | |
| quality_grade = "fair" | |
| else: | |
| quality_grade = "poor" | |
| quality_analysis = f"Detection quality: {quality_grade} (avg confidence: {overall_confidence:.3f})" | |
| else: | |
| overall_confidence = 0 | |
| confidence_std = 0 | |
| quality_grade = "no_data" | |
| quality_analysis = "No detection data available for quality analysis" | |
| return { | |
| 'overall_confidence': round(overall_confidence, 3), | |
| 'confidence_stability': round(1 - confidence_std, 3), | |
| 'quality_grade': quality_grade, | |
| 'class_confidence_breakdown': class_confidence_stats, | |
| 'total_detections_analyzed': len(all_confidences), | |
| 'quality_analysis': quality_analysis | |
| } | |
| def generate_timeline_analysis(self, object_stats: Dict[str, ObjectRecord], video_duration: float) -> Dict[str, Any]: | |
| """生成時間線分析報告""" | |
| timeline_analysis = { | |
| 'video_duration_seconds': video_duration, | |
| 'object_appearances': {}, | |
| 'timeline_summary': [] | |
| } | |
| # 分析每個物體的出現的時序 | |
| for class_name, record in object_stats.items(): | |
| timeline_analysis['object_appearances'][class_name] = { | |
| 'first_appearance': record.format_time(record.first_seen_time), | |
| 'first_appearance_seconds': round(record.first_seen_time, 1), | |
| 'last_seen': record.format_time(record.last_seen_time), | |
| 'last_seen_seconds': round(record.last_seen_time, 1), | |
| 'duration_in_video': record.format_time(record.get_duration()), | |
| 'duration_seconds': round(record.get_duration(), 1), | |
| 'estimated_count': record.peak_count_in_frame, | |
| 'detection_confidence': round(record.confidence_avg, 3) | |
| } | |
| # timeline summary | |
| if object_stats: | |
| sorted_objects = sorted(object_stats.values(), key=lambda x: x.first_seen_time) | |
| for i, record in enumerate(sorted_objects): | |
| if record.first_seen_time < 2.0: | |
| summary = f"{record.peak_count_in_frame} {record.class_name}(s) present from the beginning" | |
| else: | |
| summary = f"{record.peak_count_in_frame} {record.class_name}(s) first appeared at {record.format_time(record.first_seen_time)}" | |
| timeline_analysis['timeline_summary'].append(summary) | |
| return timeline_analysis | |
| def draw_simple_annotations(self, frame: np.ndarray, detections: List[Dict]) -> np.ndarray: | |
| """在視頻幀上繪製檢測標註""" | |
| annotated_frame = frame.copy() | |
| # 不同物體類別分配顏色 | |
| colors = { | |
| 'person': (0, 255, 0), # green | |
| 'car': (255, 0, 0), # blue | |
| 'truck': (0, 0, 255), # red | |
| 'bus': (255, 255, 0), # 青色 | |
| 'bicycle': (255, 0, 255), # purple | |
| 'motorcycle': (0, 255, 255) # yellow | |
| } | |
| # 繪製每個檢測結果 | |
| for detection in detections: | |
| x1, y1, x2, y2 = map(int, detection['bbox']) | |
| class_name = detection['class_name'] | |
| confidence = detection['confidence'] | |
| color = colors.get(class_name, (128, 128, 128)) # set gray to default color | |
| # 繪製邊界框 | |
| cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2) | |
| # 準備標籤文字 | |
| label = f"{class_name}: {confidence:.2f}" | |
| if 'cluster_size' in detection and detection['cluster_size'] > 1: | |
| label += f" (merged: {detection['cluster_size']})" | |
| # 繪製標籤背景和文字 | |
| (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) | |
| cv2.rectangle(annotated_frame, (x1, y1 - h - 10), (x1 + w, y1), color, -1) | |
| cv2.putText(annotated_frame, label, (x1, y1 - 5), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
| return annotated_frame | |
| def _ensure_string_keys(self, data): | |
| """確保所有字典鍵值都轉換為字串格式以支援JSON序列化""" | |
| if isinstance(data, dict): | |
| return {str(key): self._ensure_string_keys(value) for key, value in data.items()} | |
| elif isinstance(data, list): | |
| return [self._ensure_string_keys(item) for item in data] | |
| else: | |
| return data | |
| def process_video(self, | |
| video_path: str, | |
| model_name: str, | |
| confidence_threshold: float, | |
| process_interval: int = 10) -> Tuple[Optional[str], Dict[str, Any]]: | |
| """ | |
| 處理視頻文件,執行物體檢測和統計分析 | |
| Args: | |
| video_path: 視頻文件路徑 | |
| model_name: YOLO模型名稱 | |
| confidence_threshold: 置信度閾值 | |
| process_interval: 處理間隔(每N幀處理一次) | |
| Returns: | |
| Tuple[Optional[str], Dict[str, Any]]: (輸出視頻路徑, 分析結果) | |
| """ | |
| if not video_path or not os.path.exists(video_path): | |
| print(f"Error: Video file not found at {video_path}") | |
| return None, {"error": "Video file not found"} | |
| print(f"Starting focused video analysis: {video_path}") | |
| start_time = time.time() | |
| # 重置處理狀態 | |
| self.frame_detections.clear() | |
| self.object_timeline.clear() | |
| self.frame_timestamps.clear() | |
| # 開啟視頻文件 | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| return None, {"error": "Could not open video file"} | |
| # 取得視頻基本屬性 | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| video_duration = total_frames / fps | |
| print(f"Video properties: {width}x{height} @ {fps:.2f} FPS") | |
| print(f"Duration: {video_duration:.1f}s, Total frames: {total_frames}") | |
| print(f"Processing every {process_interval} frames") | |
| # 設定輸出視頻文件 | |
| output_filename = f"analyzed_{uuid.uuid4().hex}_{os.path.basename(video_path)}" | |
| temp_dir = tempfile.gettempdir() | |
| output_path = os.path.join(temp_dir, output_filename) | |
| if not output_path.lower().endswith(('.mp4', '.avi', '.mov')): | |
| output_path += ".mp4" | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
| if not out.isOpened(): | |
| cap.release() | |
| return None, {"error": "Could not create output video file"} | |
| print(f"Output video will be saved to: {output_path}") | |
| # 載入檢測模型 | |
| try: | |
| detection_model = self.get_or_create_model(model_name, confidence_threshold) | |
| except Exception as e: | |
| cap.release() | |
| out.release() | |
| return None, {"error": f"Failed to load detection model: {str(e)}"} | |
| # 主要視頻處理循環 | |
| frame_count = 0 | |
| processed_frame_count = 0 | |
| try: | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frame_count += 1 | |
| timestamp = frame_count / fps | |
| # 根據處理間隔決定是否分析此幀 | |
| if frame_count % process_interval == 0: | |
| processed_frame_count += 1 | |
| if processed_frame_count % 5 == 0: | |
| print(f"Processing frame {frame_count}/{total_frames} ({timestamp:.1f}s)") | |
| try: | |
| # 執行物體檢測 | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| pil_image = Image.fromarray(frame_rgb) | |
| detections = detection_model.detect(pil_image) | |
| # 分析檢測結果 | |
| class_names = detections.names if hasattr(detections, 'names') else {} | |
| self.analyze_frame_detections(detections, timestamp, class_names) | |
| # 繪製檢測標註 | |
| current_detections = self.frame_detections[-1] if self.frame_detections else [] | |
| frame = self.draw_simple_annotations(frame, current_detections) | |
| except Exception as e: | |
| print(f"Error processing frame {frame_count}: {e}") | |
| continue | |
| # 寫入處理後的幀到輸出視頻 | |
| out.write(frame) | |
| except Exception as e: | |
| print(f"Error during video processing: {e}") | |
| traceback.print_exc() | |
| finally: | |
| cap.release() | |
| out.release() | |
| # 生成最終分析結果 | |
| processing_time = time.time() - start_time | |
| # 執行各項統計分析 | |
| object_stats = self.generate_object_statistics(fps) | |
| object_density = self.analyze_object_density(object_stats, video_duration) | |
| quality_metrics = self.analyze_quality_metrics(object_stats) | |
| timeline_analysis = self.generate_timeline_analysis(object_stats, video_duration) | |
| # 計算基本統計數據 | |
| total_unique_objects = sum(record.peak_count_in_frame for record in object_stats.values()) | |
| # 組織分析結果 | |
| analysis_results = { | |
| "processing_info": { | |
| "processing_time_seconds": round(processing_time, 2), | |
| "total_frames": frame_count, | |
| "frames_analyzed": processed_frame_count, | |
| "processing_interval": process_interval, | |
| "video_duration_seconds": round(video_duration, 2), | |
| "fps": fps | |
| }, | |
| "object_summary": { | |
| "total_unique_objects_detected": total_unique_objects, | |
| "object_types_found": len(object_stats), | |
| "detailed_counts": { | |
| name: record.peak_count_in_frame | |
| for name, record in object_stats.items() | |
| } | |
| }, | |
| "timeline_analysis": timeline_analysis, | |
| "analytics": { | |
| "object_density": object_density, | |
| "quality_metrics": quality_metrics | |
| } | |
| } | |
| # 確保所有字典鍵值都是字串格式 | |
| analysis_results = self._ensure_string_keys(analysis_results) | |
| # 驗證輸出文件 | |
| if not os.path.exists(output_path) or os.path.getsize(output_path) == 0: | |
| print(f"Warning: Output video file was not created properly") | |
| return None, analysis_results | |
| print(f"Video processing completed in {processing_time:.2f} seconds") | |
| print(f"Found {total_unique_objects} total objects across {len(object_stats)} categories") | |
| print(f"Quality grade: {quality_metrics['quality_grade']}") | |
| return output_path, analysis_results | |