import torch import torch.nn.functional as F import logging import os import gc import numpy as np import cv2 from PIL import Image from typing import List, Dict, Any, Tuple from transformers import Owlv2Processor, Owlv2ForObjectDetection from .base import BaseDetector import time logger = logging.getLogger(__name__) class WeaponDetectorGPU(BaseDetector): """Detector de armas otimizado para GPU.""" def __init__(self): """Inicializa o detector.""" super().__init__() self.default_resolution = 640 self.device = None # Será configurado em _initialize self._initialize() def _initialize(self): """Inicializa o modelo.""" try: # Configurar device if not torch.cuda.is_available(): raise RuntimeError("CUDA não está disponível!") # Configurar device corretamente self.device = torch.device("cuda:0") # Usar device CUDA # Carregar modelo e processador logger.info("Carregando modelo e processador...") model_name = "google/owlv2-base-patch16" self.owlv2_processor = Owlv2Processor.from_pretrained(model_name) self.owlv2_model = Owlv2ForObjectDetection.from_pretrained( model_name, torch_dtype=torch.float16, device_map={"": 0} # Mapear todo o modelo para GPU 0 ) # Otimizar modelo self.owlv2_model.eval() # Processar queries self.text_queries = self._get_detection_queries() logger.info(f"Queries carregadas: {self.text_queries}") # Log das queries self.processed_text = self.owlv2_processor( text=self.text_queries, return_tensors="pt", padding=True ) self.processed_text = { key: val.to(self.device) for key, val in self.processed_text.items() } logger.info("Inicialização GPU completa!") self._initialized = True except Exception as e: logger.error(f"Erro na inicialização GPU: {str(e)}") raise def detect_objects(self, image: Image.Image, threshold: float = 0.3) -> List[Dict]: """Detecta objetos em uma imagem.""" try: # Pré-processar imagem image = self._preprocess_image(image) # Processar imagem image_inputs = self.owlv2_processor( images=image, return_tensors="pt" ) image_inputs = { key: val.to(self.device) for key, val in image_inputs.items() } # Inferência with torch.no_grad(): inputs = {**image_inputs, **self.processed_text} outputs = self.owlv2_model(**inputs) target_sizes = torch.tensor([image.size[::-1]], device=self.device) results = self.owlv2_processor.post_process_grounded_object_detection( outputs=outputs, target_sizes=target_sizes, threshold=threshold )[0] # Processar detecções detections = [] if len(results["scores"]) > 0: scores = results["scores"] boxes = results["boxes"] labels = results["labels"] for score, box, label in zip(scores, boxes, labels): score_val = score.item() if score_val >= threshold: # Garantir que o índice está dentro dos limites label_idx = min(label.item(), len(self.text_queries) - 1) label_text = self.text_queries[label_idx] detections.append({ "confidence": round(score_val * 100, 2), # Converter para porcentagem "box": [int(x) for x in box.tolist()], "label": label_text }) logger.debug(f"Detecção: {label_text} ({score_val * 100:.2f}%)") # Aplicar NMS nas detecções detections = self._apply_nms(detections) return detections except Exception as e: logger.error(f"Erro em detect_objects: {str(e)}") return [] def _get_best_device(self) -> torch.device: """Retorna o melhor dispositivo disponível.""" if torch.cuda.is_available(): return torch.device("cuda:0") return torch.device("cpu") def _clear_gpu_memory(self): """Limpa memória GPU.""" torch.cuda.empty_cache() gc.collect() def process_video(self, video_path: str, fps: int = None, threshold: float = 0.3, resolution: int = 640) -> Tuple[str, Dict]: """Processa um vídeo.""" metrics = { "total_time": 0, "frame_extraction_time": 0, "analysis_time": 0, "frames_analyzed": 0, "video_duration": 0, "device_type": "GPU", "detections": [] } try: start_time = time.time() # Extrair frames t0 = time.time() frames = self.extract_frames(video_path, fps or 2, resolution) metrics["frame_extraction_time"] = time.time() - t0 metrics["frames_analyzed"] = len(frames) if not frames: logger.warning("Nenhum frame extraído do vídeo") return video_path, metrics # Calcular duração do vídeo metrics["video_duration"] = len(frames) / (fps or 2) # Processar frames em batch t0 = time.time() batch_size = 16 # Aumentado para T4 dedicada detections_by_frame = [] for i in range(0, len(frames), batch_size): batch_frames = frames[i:i + batch_size] batch_pil_frames = [] # Preparar batch for frame in batch_frames: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frame_pil = Image.fromarray(frame_rgb) frame_pil = self._preprocess_image(frame_pil) batch_pil_frames.append(frame_pil) # Processar batch batch_inputs = self.owlv2_processor( images=batch_pil_frames, return_tensors="pt", padding=True ) batch_inputs = { key: val.to(self.device) for key, val in batch_inputs.items() } # Inferência em batch with torch.no_grad(): inputs = {**batch_inputs, **self.processed_text} outputs = self.owlv2_model(**inputs) target_sizes = torch.tensor( [frame.size[::-1] for frame in batch_pil_frames], device=self.device ) results = self.owlv2_processor.post_process_grounded_object_detection( outputs=outputs, target_sizes=target_sizes, threshold=threshold ) # Processar resultados do batch for frame_idx, frame_results in enumerate(results): if len(frame_results["scores"]) > 0: scores = frame_results["scores"] boxes = frame_results["boxes"] labels = frame_results["labels"] frame_detections = [] for score, box, label in zip(scores, boxes, labels): score_val = score.item() if score_val >= threshold: label_idx = min(label.item(), len(self.text_queries) - 1) label_text = self.text_queries[label_idx] frame_detections.append({ "confidence": round(score_val * 100, 2), "box": [int(x) for x in box.tolist()], "label": label_text }) if frame_detections: frame_detections = self._apply_nms(frame_detections) detections_by_frame.append({ "frame": i + frame_idx, "detections": frame_detections }) # Liberar memória do batch del batch_inputs, outputs torch.cuda.empty_cache() # Atualizar métricas finais metrics["analysis_time"] = time.time() - t0 metrics["total_time"] = time.time() - start_time metrics["detections"] = detections_by_frame return video_path, metrics except Exception as e: logger.error(f"Erro ao processar vídeo: {str(e)}") return video_path, metrics def _preprocess_image(self, image: Image.Image) -> Image.Image: """Pré-processa a imagem para o formato esperado pelo modelo.""" try: # Converter para RGB se necessário if image.mode != 'RGB': image = image.convert('RGB') # Redimensionar mantendo proporção target_size = (self.default_resolution, self.default_resolution) if image.size != target_size: ratio = min(target_size[0] / image.size[0], target_size[1] / image.size[1]) new_size = tuple(int(dim * ratio) for dim in image.size) image = image.resize(new_size, Image.Resampling.LANCZOS) # Adicionar padding se necessário if new_size != target_size: new_image = Image.new('RGB', target_size, (0, 0, 0)) paste_x = (target_size[0] - new_size[0]) // 2 paste_y = (target_size[1] - new_size[1]) // 2 new_image.paste(image, (paste_x, paste_y)) image = new_image return image except Exception as e: logger.error(f"Erro no pré-processamento: {str(e)}") return image def _apply_nms(self, detections: List[Dict], iou_threshold: float = 0.5) -> List[Dict]: """Aplica Non-Maximum Suppression nas detecções.""" try: if not detections or len(detections) <= 1: return detections # Extrair scores e boxes scores = torch.tensor([d["confidence"] for d in detections], device=self.device) boxes = torch.tensor([[d["box"][0], d["box"][1], d["box"][2], d["box"][3]] for d in detections], device=self.device) # Ordenar por score _, order = scores.sort(descending=True) keep = [] while order.numel() > 0: if order.numel() == 1: keep.append(order.item()) break i = order[0] keep.append(i.item()) # Calcular IoU com os boxes restantes box1 = boxes[i] box2 = boxes[order[1:]] # Calcular interseção left = torch.max(box1[0], box2[:, 0]) top = torch.max(box1[1], box2[:, 1]) right = torch.min(box1[2], box2[:, 2]) bottom = torch.min(box1[3], box2[:, 3]) width = torch.clamp(right - left, min=0) height = torch.clamp(bottom - top, min=0) inter = width * height # Calcular união area1 = (box1[2] - box1[0]) * (box1[3] - box1[1]) area2 = (box2[:, 2] - box2[:, 0]) * (box2[:, 3] - box2[:, 1]) union = area1 + area2 - inter # Calcular IoU iou = inter / union mask = iou <= iou_threshold order = order[1:][mask] # Retornar detecções filtradas return [detections[i] for i in keep] except Exception as e: logger.error(f"Erro ao aplicar NMS: {str(e)}") return detections