Marcus Vinicius Zerbini Canhaço
force use GPU
534b64d
raw
history blame
13.4 kB
import os
import time
import gc
import cv2
import json
import torch
import psutil
import shutil
import pickle
import hashlib
import tempfile
import logging
import subprocess
import numpy as np
import sys
from tqdm import tqdm
from pathlib import Path
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
from PIL import Image
from dotenv import load_dotenv
from transformers import Owlv2Processor, Owlv2ForObjectDetection
from typing import Optional
from src.domain.detectors.base import BaseDetector, BaseCache
from src.domain.detectors.gpu import WeaponDetectorGPU
from src.domain.detectors.cpu import WeaponDetectorCPU
# Carregar variáveis de ambiente
load_dotenv()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def force_gpu_init():
"""Força a inicialização da GPU."""
try:
# Forçar inicialização do CUDA
torch.cuda.init()
# Alocar e liberar um tensor pequeno para garantir que CUDA está funcionando
dummy = torch.cuda.FloatTensor(1)
del dummy
torch.cuda.empty_cache()
return True
except Exception as e:
logger.warning(f"Erro ao forçar inicialização da GPU: {str(e)}")
return False
def is_gpu_available():
"""Verifica se a GPU está disponível de forma mais robusta."""
try:
if not torch.cuda.is_available():
return False
# Tentar forçar inicialização
if not force_gpu_init():
return False
# Verificar se há memória disponível
gpu_memory = torch.cuda.get_device_properties(0).total_memory
if gpu_memory < 4 * (1024**3): # Mínimo de 4GB
logger.warning("GPU encontrada mas com memória insuficiente")
return False
return True
except Exception as e:
logger.warning(f"Erro ao verificar GPU: {str(e)}")
return False
class BaseCache:
"""Cache base para armazenar resultados de detecção."""
def __init__(self, max_size: int = 1000):
self.cache = {}
self.max_size = max_size
self.hits = 0
self.misses = 0
self.last_access = {}
def get(self, image: np.ndarray) -> list:
try:
key = hashlib.blake2b(image.tobytes(), digest_size=16).hexdigest()
if key in self.cache:
self.hits += 1
self.last_access[key] = time.time()
return self.cache[key]
self.misses += 1
return None
except Exception as e:
logger.error(f"Erro ao recuperar do cache: {str(e)}")
return None
def put(self, image: np.ndarray, results: list):
try:
key = hashlib.blake2b(image.tobytes(), digest_size=16).hexdigest()
if len(self.cache) >= self.max_size:
oldest_key = min(self.last_access.items(), key=lambda x: x[1])[0]
del self.cache[oldest_key]
del self.last_access[oldest_key]
self.cache[key] = results
self.last_access[key] = time.time()
except Exception as e:
logger.error(f"Erro ao armazenar no cache: {str(e)}")
def clear(self):
"""Limpa o cache e libera memória."""
self.cache.clear()
self.last_access.clear()
gc.collect()
def get_stats(self) -> dict:
total = self.hits + self.misses
hit_rate = (self.hits / total) * 100 if total > 0 else 0
return {
"cache_size": len(self.cache),
"max_size": self.max_size,
"hits": self.hits,
"misses": self.misses,
"hit_rate": f"{hit_rate:.2f}%",
"memory_usage": sum(sys.getsizeof(v) for v in self.cache.values())
}
class BaseWeaponDetector:
"""Classe base abstrata para detecção de armas."""
def __init__(self):
"""Inicialização básica comum a todos os detectores."""
self._initialized = False
self.device = self._get_best_device()
self._initialize()
def _check_initialized(self):
"""Verifica se o detector está inicializado."""
if not self._initialized:
raise RuntimeError("Detector não está inicializado")
def clean_memory(self):
"""Limpa memória não utilizada."""
try:
if torch.cuda.is_available():
torch.cuda.empty_cache()
logger.debug("Cache GPU limpo")
gc.collect()
logger.debug("Garbage collector executado")
except Exception as e:
logger.error(f"Erro ao limpar memória: {str(e)}")
def _get_best_device(self):
"""Deve ser implementado nas classes filhas."""
raise NotImplementedError
def _initialize(self):
"""Deve ser implementado nas classes filhas."""
raise NotImplementedError
def detect_objects(self, image, threshold=0.3):
"""Deve ser implementado nas classes filhas."""
raise NotImplementedError
def process_video(self, video_path, fps=None, threshold=0.3, resolution=640):
"""Deve ser implementado nas classes filhas."""
raise NotImplementedError
def _apply_nms(self, detections: list, iou_threshold: float = 0.5) -> list:
"""Deve ser implementado nas classes filhas."""
raise NotImplementedError
def extract_frames(self, video_path: str, fps: int = 2, resolution: int = 640) -> list:
"""Extrai frames de um vídeo utilizando ffmpeg."""
frames = []
temp_dir = Path(tempfile.mkdtemp())
try:
threads = min(os.cpu_count(), 8)
cmd = [
'ffmpeg', '-i', video_path,
'-threads', str(threads),
'-vf', (f'fps={fps},'
f'scale={resolution}:{resolution}:force_original_aspect_ratio=decrease:flags=lanczos,'
f'pad={resolution}:{resolution}:(ow-iw)/2:(oh-ih)/2'),
'-frame_pts', '1',
f'{temp_dir}/%d.jpg'
]
subprocess.run(cmd, check=True, capture_output=True)
frame_files = sorted(temp_dir.glob('*.jpg'), key=lambda x: int(x.stem))
chunk_size = 100
with ThreadPoolExecutor(max_workers=threads) as executor:
for i in range(0, len(frame_files), chunk_size):
chunk = frame_files[i:i + chunk_size]
chunk_frames = list(tqdm(
executor.map(lambda f: cv2.imread(str(f)), chunk),
desc=f"Carregando frames {i+1}-{min(i+chunk_size, len(frame_files))}",
total=len(chunk)
))
frames.extend(chunk_frames)
if i % (chunk_size * 5) == 0:
gc.collect()
finally:
shutil.rmtree(temp_dir)
return frames
def _preprocess_image(self, image: Image.Image) -> Image.Image:
"""Deve ser implementado nas classes filhas."""
raise NotImplementedError
def _update_frame_metrics(self, detections: list, frame_idx: int, metrics: dict):
"""Atualiza as métricas para um conjunto de detecções em um frame."""
try:
for detection in detections:
self._update_detection_metrics(detection, metrics)
if isinstance(detection, dict):
metrics.setdefault("detections", []).append({
"frame": frame_idx,
"box": detection.get("box", []),
"confidence": detection.get("confidence", 0),
"label": detection.get("label", "unknown")
})
except Exception as e:
logger.error(f"Erro ao atualizar métricas do frame: {str(e)}")
def _update_detection_metrics(self, detection: dict, metrics: dict):
"""Atualiza as métricas de detecção."""
try:
if not isinstance(detection, dict):
logger.warning(f"Detection não é um dicionário: {detection}")
return
confidence = detection.get("confidence", 0)
if not confidence:
return
if "detection_stats" not in metrics:
metrics["detection_stats"] = {
"total_detections": 0,
"avg_confidence": 0,
"confidence_distribution": {
"low": 0,
"medium": 0,
"high": 0
}
}
stats = metrics["detection_stats"]
stats["total_detections"] += 1
if confidence < 0.5:
stats["confidence_distribution"]["low"] += 1
elif confidence < 0.7:
stats["confidence_distribution"]["medium"] += 1
else:
stats["confidence_distribution"]["high"] += 1
n = stats["total_detections"]
old_avg = stats["avg_confidence"]
stats["avg_confidence"] = (old_avg * (n - 1) + confidence) / n
except Exception as e:
logger.error(f"Error updating metrics: {str(e)}")
def clear_cache(self):
"""Deve ser implementado nas classes filhas."""
raise NotImplementedError
class ResultCache(BaseCache):
"""
Cache otimizado para armazenar resultados de detecção.
"""
def __init__(self, max_size: int = 1000):
super().__init__(max_size)
class WeaponDetector:
"""Implementação do Factory Pattern para criar a instância apropriada do detector."""
_instance = None
def __new__(cls):
try:
if cls._instance is None:
if torch.cuda.is_available():
cls._instance = WeaponDetectorGPU()
logger.info("Detector GPU criado")
else:
cls._instance = WeaponDetectorCPU()
logger.info("Detector CPU criado")
# Garantir que o detector foi inicializado corretamente
if not cls._instance:
raise RuntimeError("Falha ao criar instância do detector")
# Inicializar o detector
if hasattr(cls._instance, 'initialize'):
cls._instance.initialize()
# Verificar se os métodos necessários existem
required_methods = ['process_video', 'clean_memory', 'detect_objects']
for method in required_methods:
if not hasattr(cls._instance, method):
raise RuntimeError(f"Detector não possui método obrigatório: {method}")
return cls._instance
except Exception as e:
logger.error(f"Erro ao criar detector: {str(e)}")
raise
@classmethod
def get_instance(cls):
"""Retorna a instância existente ou cria uma nova."""
if cls._instance is None:
return cls()
return cls._instance
def detect_objects(self, image: Image.Image, threshold: float = 0.3) -> list:
"""Detecta objetos em uma imagem."""
if not self._instance:
raise RuntimeError("Detector não inicializado")
return self._instance.detect_objects(image, threshold)
def extract_frames(self, video_path: str, fps: int = 2, resolution: int = 640) -> list:
"""Extrai frames de um vídeo."""
if not self._instance:
raise RuntimeError("Detector não inicializado")
return self._instance.extract_frames(video_path, fps, resolution)
def process_video(self, video_path: str, fps: int = None, threshold: float = 0.3, resolution: int = 640) -> tuple:
"""Processa o vídeo e retorna os detalhes técnicos e as detecções."""
if not self._instance:
raise RuntimeError("Detector não inicializado")
return self._instance.process_video(video_path, fps, threshold, resolution)
def clean_memory(self):
"""Limpa todo o cache do sistema."""
if not self._instance:
return
if hasattr(self._instance, 'clear_cache'):
self._instance.clear_cache()
if hasattr(self._instance, 'clean_memory'):
self._instance.clean_memory()
# Forçar limpeza de memória
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
class DetectorFactory:
"""Factory para criar a instância apropriada do detector."""
@staticmethod
def create_detector() -> BaseDetector:
"""Cria e retorna a instância apropriada do detector."""
try:
# Forçar verificação robusta de GPU
if is_gpu_available():
logger.info("GPU disponível e inicializada com sucesso")
return WeaponDetectorGPU()
else:
logger.warning("GPU não disponível ou com problemas, usando CPU")
return WeaponDetectorCPU()
except Exception as e:
logger.error(f"Erro ao criar detector: {str(e)}")
logger.warning("Fallback para CPU devido a erro")
return WeaponDetectorCPU()