import torch import torch.nn.functional as F import torch._dynamo import logging import os import time import gc import numpy as np import cv2 from PIL import Image from transformers import Owlv2Processor, Owlv2ForObjectDetection from .base import BaseDetector, BaseCache logger = logging.getLogger(__name__) # Configurações globais do PyTorch para otimização em GPU torch.backends.cuda.matmul.allow_tf32 = True torch.backends.cudnn.allow_tf32 = True torch.backends.cudnn.benchmark = True torch.backends.cuda.matmul.allow_fp16_reduced_precision_reduction = True torch._dynamo.config.suppress_errors = True class GPUCache(BaseCache): """Cache otimizado para GPU.""" def __init__(self, max_size: int = 1000): super().__init__(max_size) self.device = torch.device('cuda') class WeaponDetectorGPU(BaseDetector): """Implementação GPU do detector de armas com otimizações para a última versão do OWLv2.""" def __init__(self): """Inicializa variáveis básicas.""" super().__init__() self.default_resolution = 640 self.amp_dtype = torch.float16 self.preprocess_stream = torch.cuda.Stream() self.max_batch_size = 16 # Aumentado para 16 self.current_batch_size = 8 # Aumentado para 8 self.min_batch_size = 2 def _initialize(self): """Inicializa o modelo e o processador para execução exclusiva em GPU.""" try: # Configurar device self.device = self._get_best_device() # Diretório de cache para o modelo cache_dir = os.getenv('CACHE_DIR', '/tmp/weapon_detection_cache') os.makedirs(cache_dir, exist_ok=True) # Limpar memória GPU self._clear_gpu_memory() logger.info("Carregando modelo e processador...") # Carregar processador e modelo com otimizações model_name = "google/owlv2-base-patch16" self.owlv2_processor = Owlv2Processor.from_pretrained( model_name, cache_dir=cache_dir ) # Configurações otimizadas para T4 self.owlv2_model = Owlv2ForObjectDetection.from_pretrained( model_name, cache_dir=cache_dir, torch_dtype=self.amp_dtype, device_map="auto", low_cpu_mem_usage=True ).to(self.device) # Otimizar modelo para inferência self.owlv2_model.eval() torch.compile(self.owlv2_model) # Usar torch.compile para otimização # Usar queries do método base self.text_queries = self._get_detection_queries() logger.info(f"Total de queries carregadas: {len(self.text_queries)}") # Processar queries uma única vez com otimização de memória with torch.cuda.amp.autocast(dtype=self.amp_dtype): self.processed_text = self.owlv2_processor( text=self.text_queries, return_tensors="pt", padding=True ) self.processed_text = { key: val.to(self.device, non_blocking=True) for key, val in self.processed_text.items() } # Ajustar batch size baseado na memória disponível self._adjust_batch_size() logger.info(f"Inicialização GPU completa! Batch size inicial: {self.current_batch_size}") self._initialized = True except Exception as e: logger.error(f"Erro na inicialização GPU: {str(e)}") raise def _adjust_batch_size(self): """Ajusta o batch size baseado na memória disponível.""" try: gpu_mem = torch.cuda.get_device_properties(0).total_memory free_mem = torch.cuda.memory_reserved() - torch.cuda.memory_allocated() mem_ratio = free_mem / gpu_mem if mem_ratio < 0.2: # Menos de 20% livre self.current_batch_size = max(self.min_batch_size, self.current_batch_size // 2) elif mem_ratio > 0.4: # Mais de 40% livre self.current_batch_size = min(self.max_batch_size, self.current_batch_size * 2) logger.debug(f"Batch size ajustado para {self.current_batch_size} (Memória livre: {mem_ratio:.1%})") except Exception as e: logger.warning(f"Erro ao ajustar batch size: {str(e)}") self.current_batch_size = self.min_batch_size def detect_objects(self, image: Image.Image, threshold: float = 0.3) -> list: """Detecta objetos em uma imagem utilizando a última versão do OWLv2.""" try: self.threshold = threshold # Pré-processar imagem if image.mode != 'RGB': image = image.convert('RGB') # Processar imagem image_inputs = self.owlv2_processor( images=image, return_tensors="pt" ) image_inputs = { key: val.to(self.device) for key, val in image_inputs.items() } # Inferência with torch.no_grad(): inputs = {**image_inputs, **self.processed_text} outputs = self.owlv2_model(**inputs) target_sizes = torch.tensor([image.size[::-1]], device=self.device) results = self.owlv2_processor.post_process_grounded_object_detection( outputs=outputs, target_sizes=target_sizes, threshold=threshold )[0] # Processar detecções detections = [] if len(results["scores"]) > 0: scores = results["scores"] boxes = results["boxes"] labels = results["labels"] for score, box, label in zip(scores, boxes, labels): if score.item() >= threshold: detections.append({ "confidence": score.item(), "box": [int(x) for x in box.tolist()], "label": self.text_queries[label] }) return detections except Exception as e: logger.error(f"Erro em detect_objects: {str(e)}") return [] def process_video(self, video_path: str, fps: int = None, threshold: float = 0.3, resolution: int = 640) -> tuple: """Processa um vídeo utilizando GPU com processamento em lote e otimizações para T4.""" try: metrics = { "total_time": 0, "frame_extraction_time": 0, "analysis_time": 0, "frames_analyzed": 0, "video_duration": 0, "device_type": self.device.type, "detections": [], "technical": { "model": "owlv2-base-patch16", "input_size": f"{resolution}x{resolution}", "threshold": threshold, "batch_size": self.current_batch_size, "gpu_memory": f"{torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f}GB" } } start_time = time.time() frames = self.extract_frames(video_path, fps, resolution) metrics["frame_extraction_time"] = time.time() - start_time metrics["frames_analyzed"] = len(frames) if not frames: logger.warning("Nenhum frame extraído do vídeo") return video_path, metrics metrics["video_duration"] = len(frames) / (fps or 2) analysis_start = time.time() # Processar frames em lotes com ajuste dinâmico de batch size for i in range(0, len(frames), self.current_batch_size): try: batch_frames = frames[i:i + self.current_batch_size] # Pré-processamento assíncrono with torch.cuda.stream(self.preprocess_stream): batch_images = [ Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) for frame in batch_frames ] batch_inputs = self.owlv2_processor( images=batch_images, return_tensors="pt" ) batch_inputs = { key: val.to(self.device, non_blocking=True) for key, val in batch_inputs.items() } # Expandir texto processado para o batch batch_text = { key: val.repeat(len(batch_images), 1) for key, val in self.processed_text.items() } inputs = {**batch_inputs, **batch_text} # Inferência com mixed precision with torch.cuda.amp.autocast(dtype=self.amp_dtype): with torch.no_grad(): outputs = self.owlv2_model(**inputs) # Processar resultados target_sizes = torch.tensor([[img.size[::-1] for img in batch_images]], device=self.device) results = self.owlv2_processor.post_process_grounded_object_detection( outputs=outputs, target_sizes=target_sizes[0], threshold=threshold ) # Verificar detecções for batch_idx, result in enumerate(results): if len(result["scores"]) > 0: frame_idx = i + batch_idx max_score_idx = torch.argmax(result["scores"]) score = result["scores"][max_score_idx] if score.item() >= threshold: detection = { "frame": frame_idx, "confidence": score.item(), "box": [int(x) for x in result["boxes"][max_score_idx].tolist()], "label": self.text_queries[result["labels"][max_score_idx]] } metrics["detections"].append(detection) metrics["analysis_time"] = time.time() - analysis_start metrics["total_time"] = time.time() - start_time return video_path, metrics # Limpar memória e ajustar batch size periodicamente if (i // self.current_batch_size) % 5 == 0: self._clear_gpu_memory() self._adjust_batch_size() except RuntimeError as e: if "out of memory" in str(e): logger.warning("OOM detectado, reduzindo batch size") self._clear_gpu_memory() self.current_batch_size = max(self.min_batch_size, self.current_batch_size // 2) continue raise metrics["analysis_time"] = time.time() - analysis_start metrics["total_time"] = time.time() - start_time return video_path, metrics except Exception as e: logger.error(f"Erro ao processar vídeo: {str(e)}") return video_path, metrics def _clear_gpu_memory(self): """Limpa memória GPU de forma agressiva.""" try: torch.cuda.empty_cache() torch.cuda.synchronize() gc.collect() except Exception as e: logger.error(f"Erro ao limpar memória GPU: {str(e)}") def _get_best_device(self): if not torch.cuda.is_available(): raise RuntimeError("CUDA não está disponível!") return torch.device('cuda') def _preprocess_image(self, image: Image.Image) -> Image.Image: """Pré-processa a imagem com otimizações para GPU.""" try: target_size = (self.default_resolution, self.default_resolution) if image.mode != 'RGB': image = image.convert('RGB') if image.size != target_size: ratio = min(target_size[0] / image.size[0], target_size[1] / image.size[1]) new_size = (int(image.size[0] * ratio), int(image.size[1] * ratio)) with torch.cuda.stream(self.preprocess_stream), torch.amp.autocast(device_type='cuda', dtype=self.amp_dtype): img_tensor = torch.from_numpy(np.array(image)).permute(2, 0, 1).unsqueeze(0) img_tensor = img_tensor.to(self.device, dtype=self.amp_dtype, non_blocking=True) img_tensor = img_tensor / 255.0 mode = 'bilinear' if ratio < 1 else 'nearest' img_tensor = F.interpolate( img_tensor, size=new_size, mode=mode, align_corners=False if mode == 'bilinear' else None ) if new_size != target_size: final_tensor = torch.zeros( (1, 3, target_size[1], target_size[0]), device=self.device, dtype=self.amp_dtype ) pad_left = (target_size[0] - new_size[0]) // 2 pad_top = (target_size[1] - new_size[1]) // 2 final_tensor[ :, :, pad_top:pad_top + new_size[1], pad_left:pad_left + new_size[0] ] = img_tensor img_tensor = final_tensor img_tensor = img_tensor.squeeze(0).permute(1, 2, 0).cpu() image = Image.fromarray((img_tensor.numpy() * 255).astype(np.uint8)) return image except Exception as e: logger.error(f"Erro no pré-processamento: {str(e)}") return image def _get_memory_usage(self): """Retorna o uso atual de memória GPU em porcentagem.""" try: allocated = torch.cuda.memory_allocated() reserved = torch.cuda.memory_reserved() total = torch.cuda.get_device_properties(0).total_memory return (allocated + reserved) / total * 100 except Exception as e: logger.error(f"Erro ao obter uso de memória GPU: {str(e)}") return 0 def _should_clear_cache(self): """Determina se o cache deve ser limpo baseado no uso de memória.""" try: memory_usage = self._get_memory_usage() if memory_usage > 90: return True if memory_usage > 75 and not hasattr(self, '_last_cache_clear'): return True if hasattr(self, '_last_cache_clear'): time_since_last_clear = time.time() - self._last_cache_clear if memory_usage > 80 and time_since_last_clear > 300: return True return False except Exception as e: logger.error(f"Erro ao verificar necessidade de limpeza: {str(e)}") return False def clear_cache(self): """Limpa o cache de resultados e libera memória quando necessário.""" try: if self._should_clear_cache(): if hasattr(self, 'result_cache'): self.result_cache.clear() torch.cuda.empty_cache() gc.collect() self._last_cache_clear = time.time() logger.info(f"Cache GPU limpo com sucesso. Uso de memória: {self._get_memory_usage():.1f}%") else: logger.debug("Limpeza de cache não necessária no momento") except Exception as e: logger.error(f"Erro ao limpar cache GPU: {str(e)}")