Marcus Vinicius Zerbini Canhaço
feat: versão inicial limpa
8fb6272
raw
history blame
17.1 kB
import torch
import torch.nn.functional as F
import torch._dynamo
import logging
import os
import time
import gc
import numpy as np
import cv2
from PIL import Image
from transformers import Owlv2Processor, Owlv2ForObjectDetection
from .base import BaseDetector, BaseCache
logger = logging.getLogger(__name__)
# Configurações globais do PyTorch para otimização em GPU
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
torch.backends.cudnn.benchmark = True
torch.backends.cuda.matmul.allow_fp16_reduced_precision_reduction = True
torch._dynamo.config.suppress_errors = True
class GPUCache(BaseCache):
"""Cache otimizado para GPU."""
def __init__(self, max_size: int = 1000):
super().__init__(max_size)
self.device = torch.device('cuda')
class WeaponDetectorGPU(BaseDetector):
"""Implementação GPU do detector de armas com otimizações para a última versão do OWLv2."""
def __init__(self):
"""Inicializa variáveis básicas."""
super().__init__()
self.default_resolution = 640
self.amp_dtype = torch.float16
self.preprocess_stream = torch.cuda.Stream()
self.max_batch_size = 16 # Aumentado para 16
self.current_batch_size = 8 # Aumentado para 8
self.min_batch_size = 2
def _initialize(self):
"""Inicializa o modelo e o processador para execução exclusiva em GPU."""
try:
# Configurar device
self.device = self._get_best_device()
# Diretório de cache para o modelo
cache_dir = os.getenv('CACHE_DIR', '/tmp/weapon_detection_cache')
os.makedirs(cache_dir, exist_ok=True)
# Limpar memória GPU
self._clear_gpu_memory()
logger.info("Carregando modelo e processador...")
# Carregar processador e modelo com otimizações
model_name = "google/owlv2-base-patch16"
self.owlv2_processor = Owlv2Processor.from_pretrained(
model_name,
cache_dir=cache_dir
)
# Configurações otimizadas para T4
self.owlv2_model = Owlv2ForObjectDetection.from_pretrained(
model_name,
cache_dir=cache_dir,
torch_dtype=self.amp_dtype,
device_map="auto",
low_cpu_mem_usage=True
).to(self.device)
# Otimizar modelo para inferência
self.owlv2_model.eval()
torch.compile(self.owlv2_model) # Usar torch.compile para otimização
# Usar queries do método base
self.text_queries = self._get_detection_queries()
logger.info(f"Total de queries carregadas: {len(self.text_queries)}")
# Processar queries uma única vez com otimização de memória
with torch.cuda.amp.autocast(dtype=self.amp_dtype):
self.processed_text = self.owlv2_processor(
text=self.text_queries,
return_tensors="pt",
padding=True
)
self.processed_text = {
key: val.to(self.device, non_blocking=True)
for key, val in self.processed_text.items()
}
# Ajustar batch size baseado na memória disponível
self._adjust_batch_size()
logger.info(f"Inicialização GPU completa! Batch size inicial: {self.current_batch_size}")
self._initialized = True
except Exception as e:
logger.error(f"Erro na inicialização GPU: {str(e)}")
raise
def _adjust_batch_size(self):
"""Ajusta o batch size baseado na memória disponível."""
try:
gpu_mem = torch.cuda.get_device_properties(0).total_memory
free_mem = torch.cuda.memory_reserved() - torch.cuda.memory_allocated()
mem_ratio = free_mem / gpu_mem
if mem_ratio < 0.2: # Menos de 20% livre
self.current_batch_size = max(self.min_batch_size, self.current_batch_size // 2)
elif mem_ratio > 0.4: # Mais de 40% livre
self.current_batch_size = min(self.max_batch_size, self.current_batch_size * 2)
logger.debug(f"Batch size ajustado para {self.current_batch_size} (Memória livre: {mem_ratio:.1%})")
except Exception as e:
logger.warning(f"Erro ao ajustar batch size: {str(e)}")
self.current_batch_size = self.min_batch_size
def detect_objects(self, image: Image.Image, threshold: float = 0.3) -> list:
"""Detecta objetos em uma imagem utilizando a última versão do OWLv2."""
try:
self.threshold = threshold
# Pré-processar imagem
if image.mode != 'RGB':
image = image.convert('RGB')
# Processar imagem
image_inputs = self.owlv2_processor(
images=image,
return_tensors="pt"
)
image_inputs = {
key: val.to(self.device)
for key, val in image_inputs.items()
}
# Inferência
with torch.no_grad():
inputs = {**image_inputs, **self.processed_text}
outputs = self.owlv2_model(**inputs)
target_sizes = torch.tensor([image.size[::-1]], device=self.device)
results = self.owlv2_processor.post_process_grounded_object_detection(
outputs=outputs,
target_sizes=target_sizes,
threshold=threshold
)[0]
# Processar detecções
detections = []
if len(results["scores"]) > 0:
scores = results["scores"]
boxes = results["boxes"]
labels = results["labels"]
for score, box, label in zip(scores, boxes, labels):
if score.item() >= threshold:
detections.append({
"confidence": score.item(),
"box": [int(x) for x in box.tolist()],
"label": self.text_queries[label]
})
return detections
except Exception as e:
logger.error(f"Erro em detect_objects: {str(e)}")
return []
def process_video(self, video_path: str, fps: int = None, threshold: float = 0.3, resolution: int = 640) -> tuple:
"""Processa um vídeo utilizando GPU com processamento em lote e otimizações para T4."""
try:
metrics = {
"total_time": 0,
"frame_extraction_time": 0,
"analysis_time": 0,
"frames_analyzed": 0,
"video_duration": 0,
"device_type": self.device.type,
"detections": [],
"technical": {
"model": "owlv2-base-patch16",
"input_size": f"{resolution}x{resolution}",
"threshold": threshold,
"batch_size": self.current_batch_size,
"gpu_memory": f"{torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f}GB"
}
}
start_time = time.time()
frames = self.extract_frames(video_path, fps, resolution)
metrics["frame_extraction_time"] = time.time() - start_time
metrics["frames_analyzed"] = len(frames)
if not frames:
logger.warning("Nenhum frame extraído do vídeo")
return video_path, metrics
metrics["video_duration"] = len(frames) / (fps or 2)
analysis_start = time.time()
# Processar frames em lotes com ajuste dinâmico de batch size
for i in range(0, len(frames), self.current_batch_size):
try:
batch_frames = frames[i:i + self.current_batch_size]
# Pré-processamento assíncrono
with torch.cuda.stream(self.preprocess_stream):
batch_images = [
Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
for frame in batch_frames
]
batch_inputs = self.owlv2_processor(
images=batch_images,
return_tensors="pt"
)
batch_inputs = {
key: val.to(self.device, non_blocking=True)
for key, val in batch_inputs.items()
}
# Expandir texto processado para o batch
batch_text = {
key: val.repeat(len(batch_images), 1)
for key, val in self.processed_text.items()
}
inputs = {**batch_inputs, **batch_text}
# Inferência com mixed precision
with torch.cuda.amp.autocast(dtype=self.amp_dtype):
with torch.no_grad():
outputs = self.owlv2_model(**inputs)
# Processar resultados
target_sizes = torch.tensor([[img.size[::-1] for img in batch_images]], device=self.device)
results = self.owlv2_processor.post_process_grounded_object_detection(
outputs=outputs,
target_sizes=target_sizes[0],
threshold=threshold
)
# Verificar detecções
for batch_idx, result in enumerate(results):
if len(result["scores"]) > 0:
frame_idx = i + batch_idx
max_score_idx = torch.argmax(result["scores"])
score = result["scores"][max_score_idx]
if score.item() >= threshold:
detection = {
"frame": frame_idx,
"confidence": score.item(),
"box": [int(x) for x in result["boxes"][max_score_idx].tolist()],
"label": self.text_queries[result["labels"][max_score_idx]]
}
metrics["detections"].append(detection)
metrics["analysis_time"] = time.time() - analysis_start
metrics["total_time"] = time.time() - start_time
return video_path, metrics
# Limpar memória e ajustar batch size periodicamente
if (i // self.current_batch_size) % 5 == 0:
self._clear_gpu_memory()
self._adjust_batch_size()
except RuntimeError as e:
if "out of memory" in str(e):
logger.warning("OOM detectado, reduzindo batch size")
self._clear_gpu_memory()
self.current_batch_size = max(self.min_batch_size, self.current_batch_size // 2)
continue
raise
metrics["analysis_time"] = time.time() - analysis_start
metrics["total_time"] = time.time() - start_time
return video_path, metrics
except Exception as e:
logger.error(f"Erro ao processar vídeo: {str(e)}")
return video_path, metrics
def _clear_gpu_memory(self):
"""Limpa memória GPU de forma agressiva."""
try:
torch.cuda.empty_cache()
torch.cuda.synchronize()
gc.collect()
except Exception as e:
logger.error(f"Erro ao limpar memória GPU: {str(e)}")
def _get_best_device(self):
if not torch.cuda.is_available():
raise RuntimeError("CUDA não está disponível!")
return torch.device('cuda')
def _preprocess_image(self, image: Image.Image) -> Image.Image:
"""Pré-processa a imagem com otimizações para GPU."""
try:
target_size = (self.default_resolution, self.default_resolution)
if image.mode != 'RGB':
image = image.convert('RGB')
if image.size != target_size:
ratio = min(target_size[0] / image.size[0], target_size[1] / image.size[1])
new_size = (int(image.size[0] * ratio), int(image.size[1] * ratio))
with torch.cuda.stream(self.preprocess_stream), torch.amp.autocast(device_type='cuda', dtype=self.amp_dtype):
img_tensor = torch.from_numpy(np.array(image)).permute(2, 0, 1).unsqueeze(0)
img_tensor = img_tensor.to(self.device, dtype=self.amp_dtype, non_blocking=True)
img_tensor = img_tensor / 255.0
mode = 'bilinear' if ratio < 1 else 'nearest'
img_tensor = F.interpolate(
img_tensor,
size=new_size,
mode=mode,
align_corners=False if mode == 'bilinear' else None
)
if new_size != target_size:
final_tensor = torch.zeros(
(1, 3, target_size[1], target_size[0]),
device=self.device,
dtype=self.amp_dtype
)
pad_left = (target_size[0] - new_size[0]) // 2
pad_top = (target_size[1] - new_size[1]) // 2
final_tensor[
:,
:,
pad_top:pad_top + new_size[1],
pad_left:pad_left + new_size[0]
] = img_tensor
img_tensor = final_tensor
img_tensor = img_tensor.squeeze(0).permute(1, 2, 0).cpu()
image = Image.fromarray((img_tensor.numpy() * 255).astype(np.uint8))
return image
except Exception as e:
logger.error(f"Erro no pré-processamento: {str(e)}")
return image
def _get_memory_usage(self):
"""Retorna o uso atual de memória GPU em porcentagem."""
try:
allocated = torch.cuda.memory_allocated()
reserved = torch.cuda.memory_reserved()
total = torch.cuda.get_device_properties(0).total_memory
return (allocated + reserved) / total * 100
except Exception as e:
logger.error(f"Erro ao obter uso de memória GPU: {str(e)}")
return 0
def _should_clear_cache(self):
"""Determina se o cache deve ser limpo baseado no uso de memória."""
try:
memory_usage = self._get_memory_usage()
if memory_usage > 90:
return True
if memory_usage > 75 and not hasattr(self, '_last_cache_clear'):
return True
if hasattr(self, '_last_cache_clear'):
time_since_last_clear = time.time() - self._last_cache_clear
if memory_usage > 80 and time_since_last_clear > 300:
return True
return False
except Exception as e:
logger.error(f"Erro ao verificar necessidade de limpeza: {str(e)}")
return False
def clear_cache(self):
"""Limpa o cache de resultados e libera memória quando necessário."""
try:
if self._should_clear_cache():
if hasattr(self, 'result_cache'):
self.result_cache.clear()
torch.cuda.empty_cache()
gc.collect()
self._last_cache_clear = time.time()
logger.info(f"Cache GPU limpo com sucesso. Uso de memória: {self._get_memory_usage():.1f}%")
else:
logger.debug("Limpeza de cache não necessária no momento")
except Exception as e:
logger.error(f"Erro ao limpar cache GPU: {str(e)}")