Spaces:
Runtime error
Runtime error
| import torch | |
| import torch.nn.functional as F | |
| import logging | |
| import os | |
| import gc | |
| import numpy as np | |
| import cv2 | |
| from PIL import Image | |
| from typing import List, Dict, Any, Tuple | |
| from transformers import Owlv2Processor, Owlv2ForObjectDetection | |
| from .base import BaseDetector | |
| import time | |
| logger = logging.getLogger(__name__) | |
| class WeaponDetectorGPU(BaseDetector): | |
| """Detector de armas otimizado para GPU.""" | |
| def __init__(self): | |
| """Inicializa o detector.""" | |
| super().__init__() | |
| self.default_resolution = 640 | |
| self.device = None # Será configurado em _initialize | |
| self._initialize() | |
| def _initialize(self): | |
| """Inicializa o modelo.""" | |
| try: | |
| # Configurar device | |
| if not torch.cuda.is_available(): | |
| raise RuntimeError("CUDA não está disponível!") | |
| # Configurar device corretamente | |
| self.device = torch.device("cuda:0") # Usar device CUDA | |
| # Carregar modelo e processador | |
| logger.info("Carregando modelo e processador...") | |
| model_name = "google/owlv2-base-patch16" | |
| self.owlv2_processor = Owlv2Processor.from_pretrained(model_name) | |
| self.owlv2_model = Owlv2ForObjectDetection.from_pretrained( | |
| model_name, | |
| torch_dtype=torch.float16, | |
| device_map={"": 0} # Mapear todo o modelo para GPU 0 | |
| ) | |
| # Otimizar modelo | |
| self.owlv2_model.eval() | |
| # Processar queries | |
| self.text_queries = self._get_detection_queries() | |
| logger.info(f"Queries carregadas: {self.text_queries}") # Log das queries | |
| self.processed_text = self.owlv2_processor( | |
| text=self.text_queries, | |
| return_tensors="pt", | |
| padding=True | |
| ) | |
| self.processed_text = { | |
| key: val.to(self.device) | |
| for key, val in self.processed_text.items() | |
| } | |
| logger.info("Inicialização GPU completa!") | |
| self._initialized = True | |
| except Exception as e: | |
| logger.error(f"Erro na inicialização GPU: {str(e)}") | |
| raise | |
| def detect_objects(self, image: Image.Image, threshold: float = 0.3) -> List[Dict]: | |
| """Detecta objetos em uma imagem.""" | |
| try: | |
| # Pré-processar imagem | |
| image = self._preprocess_image(image) | |
| # Processar imagem | |
| image_inputs = self.owlv2_processor( | |
| images=image, | |
| return_tensors="pt" | |
| ) | |
| image_inputs = { | |
| key: val.to(self.device) | |
| for key, val in image_inputs.items() | |
| } | |
| # Inferência | |
| with torch.no_grad(): | |
| inputs = {**image_inputs, **self.processed_text} | |
| outputs = self.owlv2_model(**inputs) | |
| target_sizes = torch.tensor([image.size[::-1]], device=self.device) | |
| results = self.owlv2_processor.post_process_grounded_object_detection( | |
| outputs=outputs, | |
| target_sizes=target_sizes, | |
| threshold=threshold | |
| )[0] | |
| # Processar detecções | |
| detections = [] | |
| if len(results["scores"]) > 0: | |
| scores = results["scores"] | |
| boxes = results["boxes"] | |
| labels = results["labels"] | |
| for score, box, label in zip(scores, boxes, labels): | |
| score_val = score.item() | |
| if score_val >= threshold: | |
| # Garantir que o índice está dentro dos limites | |
| label_idx = min(label.item(), len(self.text_queries) - 1) | |
| label_text = self.text_queries[label_idx] | |
| detections.append({ | |
| "confidence": round(score_val * 100, 2), # Converter para porcentagem | |
| "box": [int(x) for x in box.tolist()], | |
| "label": label_text | |
| }) | |
| logger.debug(f"Detecção: {label_text} ({score_val * 100:.2f}%)") | |
| # Aplicar NMS nas detecções | |
| detections = self._apply_nms(detections) | |
| return detections | |
| except Exception as e: | |
| logger.error(f"Erro em detect_objects: {str(e)}") | |
| return [] | |
| def _get_best_device(self) -> torch.device: | |
| """Retorna o melhor dispositivo disponível.""" | |
| if torch.cuda.is_available(): | |
| return torch.device("cuda:0") | |
| return torch.device("cpu") | |
| def _clear_gpu_memory(self): | |
| """Limpa memória GPU.""" | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| def process_video(self, video_path: str, fps: int = None, threshold: float = 0.3, resolution: int = 640) -> Tuple[str, Dict]: | |
| """Processa um vídeo.""" | |
| metrics = { | |
| "total_time": 0, | |
| "frame_extraction_time": 0, | |
| "analysis_time": 0, | |
| "frames_analyzed": 0, | |
| "video_duration": 0, | |
| "device_type": "GPU", | |
| "detections": [] | |
| } | |
| try: | |
| start_time = time.time() | |
| # Extrair frames | |
| t0 = time.time() | |
| frames = self.extract_frames(video_path, fps or 2, resolution) | |
| metrics["frame_extraction_time"] = time.time() - t0 | |
| metrics["frames_analyzed"] = len(frames) | |
| if not frames: | |
| logger.warning("Nenhum frame extraído do vídeo") | |
| return video_path, metrics | |
| # Calcular duração do vídeo | |
| metrics["video_duration"] = len(frames) / (fps or 2) | |
| # Processar frames em batch | |
| t0 = time.time() | |
| batch_size = 16 # Aumentado para T4 dedicada | |
| detections_by_frame = [] | |
| for i in range(0, len(frames), batch_size): | |
| batch_frames = frames[i:i + batch_size] | |
| batch_pil_frames = [] | |
| # Preparar batch | |
| for frame in batch_frames: | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| frame_pil = Image.fromarray(frame_rgb) | |
| frame_pil = self._preprocess_image(frame_pil) | |
| batch_pil_frames.append(frame_pil) | |
| # Processar batch | |
| batch_inputs = self.owlv2_processor( | |
| images=batch_pil_frames, | |
| return_tensors="pt", | |
| padding=True | |
| ) | |
| batch_inputs = { | |
| key: val.to(self.device) | |
| for key, val in batch_inputs.items() | |
| } | |
| # Inferência em batch | |
| with torch.no_grad(): | |
| inputs = {**batch_inputs, **self.processed_text} | |
| outputs = self.owlv2_model(**inputs) | |
| target_sizes = torch.tensor( | |
| [frame.size[::-1] for frame in batch_pil_frames], | |
| device=self.device | |
| ) | |
| results = self.owlv2_processor.post_process_grounded_object_detection( | |
| outputs=outputs, | |
| target_sizes=target_sizes, | |
| threshold=threshold | |
| ) | |
| # Processar resultados do batch | |
| for frame_idx, frame_results in enumerate(results): | |
| if len(frame_results["scores"]) > 0: | |
| scores = frame_results["scores"] | |
| boxes = frame_results["boxes"] | |
| labels = frame_results["labels"] | |
| frame_detections = [] | |
| for score, box, label in zip(scores, boxes, labels): | |
| score_val = score.item() | |
| if score_val >= threshold: | |
| label_idx = min(label.item(), len(self.text_queries) - 1) | |
| label_text = self.text_queries[label_idx] | |
| frame_detections.append({ | |
| "confidence": round(score_val * 100, 2), | |
| "box": [int(x) for x in box.tolist()], | |
| "label": label_text | |
| }) | |
| if frame_detections: | |
| frame_detections = self._apply_nms(frame_detections) | |
| detections_by_frame.append({ | |
| "frame": i + frame_idx, | |
| "detections": frame_detections | |
| }) | |
| # Liberar memória do batch | |
| del batch_inputs, outputs | |
| torch.cuda.empty_cache() | |
| # Atualizar métricas finais | |
| metrics["analysis_time"] = time.time() - t0 | |
| metrics["total_time"] = time.time() - start_time | |
| metrics["detections"] = detections_by_frame | |
| return video_path, metrics | |
| except Exception as e: | |
| logger.error(f"Erro ao processar vídeo: {str(e)}") | |
| return video_path, metrics | |
| def _preprocess_image(self, image: Image.Image) -> Image.Image: | |
| """Pré-processa a imagem para o formato esperado pelo modelo.""" | |
| try: | |
| # Converter para RGB se necessário | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| # Redimensionar mantendo proporção | |
| target_size = (self.default_resolution, self.default_resolution) | |
| if image.size != target_size: | |
| ratio = min(target_size[0] / image.size[0], target_size[1] / image.size[1]) | |
| new_size = tuple(int(dim * ratio) for dim in image.size) | |
| image = image.resize(new_size, Image.Resampling.LANCZOS) | |
| # Adicionar padding se necessário | |
| if new_size != target_size: | |
| new_image = Image.new('RGB', target_size, (0, 0, 0)) | |
| paste_x = (target_size[0] - new_size[0]) // 2 | |
| paste_y = (target_size[1] - new_size[1]) // 2 | |
| new_image.paste(image, (paste_x, paste_y)) | |
| image = new_image | |
| return image | |
| except Exception as e: | |
| logger.error(f"Erro no pré-processamento: {str(e)}") | |
| return image | |
| def _apply_nms(self, detections: List[Dict], iou_threshold: float = 0.5) -> List[Dict]: | |
| """Aplica Non-Maximum Suppression nas detecções.""" | |
| try: | |
| if not detections or len(detections) <= 1: | |
| return detections | |
| # Extrair scores e boxes | |
| scores = torch.tensor([d["confidence"] for d in detections], device=self.device) | |
| boxes = torch.tensor([[d["box"][0], d["box"][1], d["box"][2], d["box"][3]] | |
| for d in detections], device=self.device) | |
| # Ordenar por score | |
| _, order = scores.sort(descending=True) | |
| keep = [] | |
| while order.numel() > 0: | |
| if order.numel() == 1: | |
| keep.append(order.item()) | |
| break | |
| i = order[0] | |
| keep.append(i.item()) | |
| # Calcular IoU com os boxes restantes | |
| box1 = boxes[i] | |
| box2 = boxes[order[1:]] | |
| # Calcular interseção | |
| left = torch.max(box1[0], box2[:, 0]) | |
| top = torch.max(box1[1], box2[:, 1]) | |
| right = torch.min(box1[2], box2[:, 2]) | |
| bottom = torch.min(box1[3], box2[:, 3]) | |
| width = torch.clamp(right - left, min=0) | |
| height = torch.clamp(bottom - top, min=0) | |
| inter = width * height | |
| # Calcular união | |
| area1 = (box1[2] - box1[0]) * (box1[3] - box1[1]) | |
| area2 = (box2[:, 2] - box2[:, 0]) * (box2[:, 3] - box2[:, 1]) | |
| union = area1 + area2 - inter | |
| # Calcular IoU | |
| iou = inter / union | |
| mask = iou <= iou_threshold | |
| order = order[1:][mask] | |
| # Retornar detecções filtradas | |
| return [detections[i] for i in keep] | |
| except Exception as e: | |
| logger.error(f"Erro ao aplicar NMS: {str(e)}") | |
| return detections |