Spaces:
Runtime error
Runtime error
LPX
refactor: ensemble monitoring and weight optimization agents, refactor minmax processing function, and update requirements
a7003f1
| import logging | |
| import time | |
| import torch | |
| import psutil # Ensure psutil is imported here as well | |
| logger = logging.getLogger(__name__) | |
| class EnsembleMonitorAgent: | |
| def __init__(self): | |
| self.performance_metrics = { | |
| "model_accuracy": {}, | |
| "response_times": {}, | |
| "confidence_distribution": {}, | |
| "consensus_rate": 0.0 | |
| } | |
| self.alerts = [] | |
| def monitor_prediction(self, model_id, prediction, confidence, response_time): | |
| """Monitor individual model performance""" | |
| if model_id not in self.performance_metrics["model_accuracy"]: | |
| self.performance_metrics["model_accuracy"][model_id] = [] | |
| self.performance_metrics["response_times"][model_id] = [] | |
| self.performance_metrics["confidence_distribution"][model_id] = [] | |
| self.performance_metrics["response_times"][model_id].append(response_time) | |
| self.performance_metrics["confidence_distribution"][model_id].append(confidence) | |
| # Check for performance issues | |
| self._check_performance_issues(model_id) | |
| def _check_performance_issues(self, model_id): | |
| """Check for any performance anomalies""" | |
| response_times = self.performance_metrics["response_times"][model_id] | |
| if len(response_times) > 10: | |
| avg_time = sum(response_times[-10:]) / 10 | |
| if avg_time > 2.0: # More than 2 seconds | |
| self.alerts.append(f"High latency detected for {model_id}: {avg_time:.2f}s") | |
| class WeightOptimizationAgent: | |
| def __init__(self, weight_manager): | |
| self.weight_manager = weight_manager | |
| self.prediction_history = [] # Stores (ensemble_prediction_label, assumed_actual_label) | |
| self.optimization_threshold = 0.05 # 5% change in accuracy triggers optimization | |
| self.min_history_for_optimization = 20 # Minimum samples before optimizing | |
| def analyze_performance(self, ensemble_prediction_label, actual_label=None): | |
| """Analyze ensemble performance and record for optimization""" | |
| # If actual_label is not provided, assume ensemble is correct if not UNCERTAIN | |
| assumed_actual_label = actual_label | |
| if assumed_actual_label is None and ensemble_prediction_label != "UNCERTAIN": | |
| assumed_actual_label = ensemble_prediction_label | |
| self.prediction_history.append((ensemble_prediction_label, assumed_actual_label)) | |
| if len(self.prediction_history) >= self.min_history_for_optimization and self._should_optimize(): | |
| self._optimize_weights() | |
| def _calculate_accuracy(self, history_subset): | |
| """Calculates accuracy based on history where actual_label is known.""" | |
| correct_predictions = 0 | |
| total_known = 0 | |
| for ensemble_pred, actual_label in history_subset: | |
| if actual_label is not None: | |
| total_known += 1 | |
| if ensemble_pred == actual_label: | |
| correct_predictions += 1 | |
| return correct_predictions / total_known if total_known > 0 else 0.0 | |
| def _should_optimize(self): | |
| """Determine if weights should be optimized based on recent performance change.""" | |
| if len(self.prediction_history) < self.min_history_for_optimization * 2: # Need enough history for comparison | |
| return False | |
| # Compare accuracy of recent batch with previous batch | |
| recent_batch = self.prediction_history[-self.min_history_for_optimization:] | |
| previous_batch = self.prediction_history[-self.min_history_for_optimization*2:-self.min_history_for_optimization] | |
| recent_accuracy = self._calculate_accuracy(recent_batch) | |
| previous_accuracy = self._calculate_accuracy(previous_batch) | |
| # Trigger optimization if there's a significant drop in accuracy | |
| if previous_accuracy > 0 and (previous_accuracy - recent_accuracy) / previous_accuracy > self.optimization_threshold: | |
| logger.warning(f"Performance degradation detected (from {previous_accuracy:.2f} to {recent_accuracy:.2f}). Triggering weight optimization.") | |
| return True | |
| return False | |
| def _optimize_weights(self): | |
| """Optimize model weights based on performance.""" | |
| logger.info("Optimizing model weights based on recent performance.") | |
| # Placeholder for sophisticated optimization logic. | |
| # This is where you would adjust self.weight_manager.base_weights | |
| # based on which models contributed more to correct predictions or errors. | |
| # For now, it's just a log message. | |
| class SystemHealthAgent: | |
| def __init__(self): | |
| self.health_metrics = { | |
| "memory_usage": [], | |
| "gpu_utilization": [], | |
| "model_load_times": {}, | |
| "error_rates": {} | |
| } | |
| def monitor_system_health(self): | |
| """Monitor overall system health""" | |
| self._check_memory_usage() | |
| self._check_gpu_utilization() | |
| # You might add _check_model_health() here later | |
| def _check_memory_usage(self): | |
| """Monitor memory usage""" | |
| try: | |
| import psutil | |
| memory = psutil.virtual_memory() | |
| self.health_metrics["memory_usage"].append(memory.percent) | |
| if memory.percent > 90: | |
| logger.warning(f"High memory usage detected: {memory.percent}%") | |
| except ImportError: | |
| logger.warning("psutil not installed. Cannot monitor memory usage.") | |
| def _check_gpu_utilization(self): | |
| """Monitor GPU utilization if available""" | |
| if torch.cuda.is_available(): | |
| try: | |
| gpu_util = torch.cuda.memory_allocated() / torch.cuda.max_memory_allocated() | |
| self.health_metrics["gpu_utilization"].append(gpu_util) | |
| if gpu_util > 0.9: | |
| logger.warning(f"High GPU utilization detected: {gpu_util*100:.2f}%") | |
| except Exception as e: | |
| logger.warning(f"Error monitoring GPU utilization: {e}") | |
| else: | |
| logger.info("CUDA not available. Skipping GPU utilization monitoring.") |