Spaces:
Runtime error
Runtime error
| import os | |
| import time | |
| import gc | |
| import cv2 | |
| import json | |
| import torch | |
| import psutil | |
| import shutil | |
| import pickle | |
| import hashlib | |
| import tempfile | |
| import logging | |
| import subprocess | |
| import numpy as np | |
| import sys | |
| from tqdm import tqdm | |
| from pathlib import Path | |
| from datetime import datetime | |
| from concurrent.futures import ThreadPoolExecutor | |
| from PIL import Image | |
| from dotenv import load_dotenv | |
| from transformers import Owlv2Processor, Owlv2ForObjectDetection | |
| from typing import Optional | |
| from src.domain.detectors.base import BaseDetector, BaseCache | |
| from src.domain.detectors.gpu import WeaponDetectorGPU | |
| from src.domain.detectors.cpu import WeaponDetectorCPU | |
| # Carregar variáveis de ambiente | |
| load_dotenv() | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| def force_gpu_init(): | |
| """Força a inicialização da GPU.""" | |
| try: | |
| # Verificar se CUDA está disponível | |
| if not torch.cuda.is_available(): | |
| return False | |
| # Tentar alocar um tensor na GPU | |
| try: | |
| dummy = torch.cuda.FloatTensor(1) | |
| del dummy | |
| torch.cuda.empty_cache() | |
| return True | |
| except RuntimeError: | |
| return False | |
| except Exception as e: | |
| logger.warning(f"Erro ao forçar inicialização da GPU: {str(e)}") | |
| return False | |
| def is_gpu_available(): | |
| """Verifica se a GPU está disponível de forma mais robusta.""" | |
| try: | |
| # Verificar CUDA primeiro | |
| if not torch.cuda.is_available(): | |
| logger.warning("CUDA não está disponível") | |
| return False | |
| # Tentar forçar inicialização | |
| if not force_gpu_init(): | |
| logger.warning("Não foi possível inicializar a GPU") | |
| return False | |
| # Tentar obter informações da GPU | |
| try: | |
| device_count = torch.cuda.device_count() | |
| if device_count == 0: | |
| logger.warning("Nenhuma GPU encontrada") | |
| return False | |
| # Verificar se podemos realmente usar a GPU | |
| device = torch.device(0) # Usar índice do dispositivo | |
| dummy_tensor = torch.zeros(1, device=device) | |
| del dummy_tensor | |
| torch.cuda.empty_cache() | |
| logger.info(f"GPU disponível: {torch.cuda.get_device_name(0)}") | |
| return True | |
| except Exception as e: | |
| logger.warning(f"Erro ao verificar GPU: {str(e)}") | |
| return False | |
| except Exception as e: | |
| logger.warning(f"Erro ao verificar disponibilidade da GPU: {str(e)}") | |
| return False | |
| class BaseCache: | |
| """Cache base para armazenar resultados de detecção.""" | |
| def __init__(self, max_size: int = 1000): | |
| self.cache = {} | |
| self.max_size = max_size | |
| self.hits = 0 | |
| self.misses = 0 | |
| self.last_access = {} | |
| def get(self, image: np.ndarray) -> list: | |
| try: | |
| key = hashlib.blake2b(image.tobytes(), digest_size=16).hexdigest() | |
| if key in self.cache: | |
| self.hits += 1 | |
| self.last_access[key] = time.time() | |
| return self.cache[key] | |
| self.misses += 1 | |
| return None | |
| except Exception as e: | |
| logger.error(f"Erro ao recuperar do cache: {str(e)}") | |
| return None | |
| def put(self, image: np.ndarray, results: list): | |
| try: | |
| key = hashlib.blake2b(image.tobytes(), digest_size=16).hexdigest() | |
| if len(self.cache) >= self.max_size: | |
| oldest_key = min(self.last_access.items(), key=lambda x: x[1])[0] | |
| del self.cache[oldest_key] | |
| del self.last_access[oldest_key] | |
| self.cache[key] = results | |
| self.last_access[key] = time.time() | |
| except Exception as e: | |
| logger.error(f"Erro ao armazenar no cache: {str(e)}") | |
| def clear(self): | |
| """Limpa o cache e libera memória.""" | |
| self.cache.clear() | |
| self.last_access.clear() | |
| gc.collect() | |
| def get_stats(self) -> dict: | |
| total = self.hits + self.misses | |
| hit_rate = (self.hits / total) * 100 if total > 0 else 0 | |
| return { | |
| "cache_size": len(self.cache), | |
| "max_size": self.max_size, | |
| "hits": self.hits, | |
| "misses": self.misses, | |
| "hit_rate": f"{hit_rate:.2f}%", | |
| "memory_usage": sum(sys.getsizeof(v) for v in self.cache.values()) | |
| } | |
| class BaseWeaponDetector: | |
| """Classe base abstrata para detecção de armas.""" | |
| def __init__(self): | |
| """Inicialização básica comum a todos os detectores.""" | |
| self._initialized = False | |
| self.device = self._get_best_device() | |
| self._initialize() | |
| def _check_initialized(self): | |
| """Verifica se o detector está inicializado.""" | |
| if not self._initialized: | |
| raise RuntimeError("Detector não está inicializado") | |
| def clean_memory(self): | |
| """Limpa memória não utilizada.""" | |
| try: | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| logger.debug("Cache GPU limpo") | |
| gc.collect() | |
| logger.debug("Garbage collector executado") | |
| except Exception as e: | |
| logger.error(f"Erro ao limpar memória: {str(e)}") | |
| def _get_best_device(self): | |
| """Deve ser implementado nas classes filhas.""" | |
| raise NotImplementedError | |
| def _initialize(self): | |
| """Deve ser implementado nas classes filhas.""" | |
| raise NotImplementedError | |
| def detect_objects(self, image, threshold=0.3): | |
| """Deve ser implementado nas classes filhas.""" | |
| raise NotImplementedError | |
| def process_video(self, video_path, fps=None, threshold=0.3, resolution=640): | |
| """Deve ser implementado nas classes filhas.""" | |
| raise NotImplementedError | |
| def _apply_nms(self, detections: list, iou_threshold: float = 0.5) -> list: | |
| """Deve ser implementado nas classes filhas.""" | |
| raise NotImplementedError | |
| def extract_frames(self, video_path: str, fps: int = 2, resolution: int = 640) -> list: | |
| """Extrai frames de um vídeo utilizando ffmpeg.""" | |
| frames = [] | |
| temp_dir = Path(tempfile.mkdtemp()) | |
| try: | |
| threads = min(os.cpu_count(), 8) | |
| cmd = [ | |
| 'ffmpeg', '-i', video_path, | |
| '-threads', str(threads), | |
| '-vf', (f'fps={fps},' | |
| f'scale={resolution}:{resolution}:force_original_aspect_ratio=decrease:flags=lanczos,' | |
| f'pad={resolution}:{resolution}:(ow-iw)/2:(oh-ih)/2'), | |
| '-frame_pts', '1', | |
| f'{temp_dir}/%d.jpg' | |
| ] | |
| subprocess.run(cmd, check=True, capture_output=True) | |
| frame_files = sorted(temp_dir.glob('*.jpg'), key=lambda x: int(x.stem)) | |
| chunk_size = 100 | |
| with ThreadPoolExecutor(max_workers=threads) as executor: | |
| for i in range(0, len(frame_files), chunk_size): | |
| chunk = frame_files[i:i + chunk_size] | |
| chunk_frames = list(tqdm( | |
| executor.map(lambda f: cv2.imread(str(f)), chunk), | |
| desc=f"Carregando frames {i+1}-{min(i+chunk_size, len(frame_files))}", | |
| total=len(chunk) | |
| )) | |
| frames.extend(chunk_frames) | |
| if i % (chunk_size * 5) == 0: | |
| gc.collect() | |
| finally: | |
| shutil.rmtree(temp_dir) | |
| return frames | |
| def _preprocess_image(self, image: Image.Image) -> Image.Image: | |
| """Deve ser implementado nas classes filhas.""" | |
| raise NotImplementedError | |
| def _update_frame_metrics(self, detections: list, frame_idx: int, metrics: dict): | |
| """Atualiza as métricas para um conjunto de detecções em um frame.""" | |
| try: | |
| for detection in detections: | |
| self._update_detection_metrics(detection, metrics) | |
| if isinstance(detection, dict): | |
| metrics.setdefault("detections", []).append({ | |
| "frame": frame_idx, | |
| "box": detection.get("box", []), | |
| "confidence": detection.get("confidence", 0), | |
| "label": detection.get("label", "unknown") | |
| }) | |
| except Exception as e: | |
| logger.error(f"Erro ao atualizar métricas do frame: {str(e)}") | |
| def _update_detection_metrics(self, detection: dict, metrics: dict): | |
| """Atualiza as métricas de detecção.""" | |
| try: | |
| if not isinstance(detection, dict): | |
| logger.warning(f"Detection não é um dicionário: {detection}") | |
| return | |
| confidence = detection.get("confidence", 0) | |
| if not confidence: | |
| return | |
| if "detection_stats" not in metrics: | |
| metrics["detection_stats"] = { | |
| "total_detections": 0, | |
| "avg_confidence": 0, | |
| "confidence_distribution": { | |
| "low": 0, | |
| "medium": 0, | |
| "high": 0 | |
| } | |
| } | |
| stats = metrics["detection_stats"] | |
| stats["total_detections"] += 1 | |
| if confidence < 0.5: | |
| stats["confidence_distribution"]["low"] += 1 | |
| elif confidence < 0.7: | |
| stats["confidence_distribution"]["medium"] += 1 | |
| else: | |
| stats["confidence_distribution"]["high"] += 1 | |
| n = stats["total_detections"] | |
| old_avg = stats["avg_confidence"] | |
| stats["avg_confidence"] = (old_avg * (n - 1) + confidence) / n | |
| except Exception as e: | |
| logger.error(f"Error updating metrics: {str(e)}") | |
| def clear_cache(self): | |
| """Deve ser implementado nas classes filhas.""" | |
| raise NotImplementedError | |
| class ResultCache(BaseCache): | |
| """ | |
| Cache otimizado para armazenar resultados de detecção. | |
| """ | |
| def __init__(self, max_size: int = 1000): | |
| super().__init__(max_size) | |
| class WeaponDetector: | |
| """Implementação do Factory Pattern para criar a instância apropriada do detector.""" | |
| _instance = None | |
| def __new__(cls): | |
| try: | |
| if cls._instance is None: | |
| if torch.cuda.is_available(): | |
| cls._instance = WeaponDetectorGPU() | |
| logger.info("Detector GPU criado") | |
| else: | |
| cls._instance = WeaponDetectorCPU() | |
| logger.info("Detector CPU criado") | |
| # Garantir que o detector foi inicializado corretamente | |
| if not cls._instance: | |
| raise RuntimeError("Falha ao criar instância do detector") | |
| # Inicializar o detector | |
| if hasattr(cls._instance, 'initialize'): | |
| cls._instance.initialize() | |
| # Verificar se os métodos necessários existem | |
| required_methods = ['process_video', 'clean_memory', 'detect_objects'] | |
| for method in required_methods: | |
| if not hasattr(cls._instance, method): | |
| raise RuntimeError(f"Detector não possui método obrigatório: {method}") | |
| return cls._instance | |
| except Exception as e: | |
| logger.error(f"Erro ao criar detector: {str(e)}") | |
| raise | |
| def get_instance(cls): | |
| """Retorna a instância existente ou cria uma nova.""" | |
| if cls._instance is None: | |
| return cls() | |
| return cls._instance | |
| def detect_objects(self, image: Image.Image, threshold: float = 0.3) -> list: | |
| """Detecta objetos em uma imagem.""" | |
| if not self._instance: | |
| raise RuntimeError("Detector não inicializado") | |
| return self._instance.detect_objects(image, threshold) | |
| def extract_frames(self, video_path: str, fps: int = 2, resolution: int = 640) -> list: | |
| """Extrai frames de um vídeo.""" | |
| if not self._instance: | |
| raise RuntimeError("Detector não inicializado") | |
| return self._instance.extract_frames(video_path, fps, resolution) | |
| def process_video(self, video_path: str, fps: int = None, threshold: float = 0.3, resolution: int = 640) -> tuple: | |
| """Processa o vídeo e retorna os detalhes técnicos e as detecções.""" | |
| if not self._instance: | |
| raise RuntimeError("Detector não inicializado") | |
| return self._instance.process_video(video_path, fps, threshold, resolution) | |
| def clean_memory(self): | |
| """Limpa todo o cache do sistema.""" | |
| if not self._instance: | |
| return | |
| if hasattr(self._instance, 'clear_cache'): | |
| self._instance.clear_cache() | |
| if hasattr(self._instance, 'clean_memory'): | |
| self._instance.clean_memory() | |
| # Forçar limpeza de memória | |
| gc.collect() | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| class DetectorFactory: | |
| """Factory para criar a instância apropriada do detector.""" | |
| def create_detector() -> BaseDetector: | |
| """Cria e retorna a instância apropriada do detector.""" | |
| try: | |
| # Forçar verificação robusta de GPU | |
| if is_gpu_available(): | |
| logger.info("GPU disponível e inicializada com sucesso") | |
| return WeaponDetectorGPU() | |
| else: | |
| logger.warning("GPU não disponível ou com problemas. ATENÇÃO: O sistema funcionará em modo CPU, " + | |
| "que é mais lento mas igualmente funcional. Performance será reduzida.") | |
| return WeaponDetectorCPU() | |
| except Exception as e: | |
| logger.error(f"Erro ao criar detector: {str(e)}") | |
| logger.warning("Fallback para CPU devido a erro. O sistema continuará funcionando, " + | |
| "mas com performance reduzida.") | |
| return WeaponDetectorCPU() |