|
import logging
|
|
import time
|
|
import torch
|
|
import psutil
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class EnsembleMonitorAgent:
|
|
def __init__(self):
|
|
self.performance_metrics = {
|
|
"model_accuracy": {},
|
|
"response_times": {},
|
|
"confidence_distribution": {},
|
|
"consensus_rate": 0.0
|
|
}
|
|
self.alerts = []
|
|
|
|
def monitor_prediction(self, model_id, prediction, confidence, response_time):
|
|
"""Monitor individual model performance"""
|
|
if model_id not in self.performance_metrics["model_accuracy"]:
|
|
self.performance_metrics["model_accuracy"][model_id] = []
|
|
self.performance_metrics["response_times"][model_id] = []
|
|
self.performance_metrics["confidence_distribution"][model_id] = []
|
|
|
|
self.performance_metrics["response_times"][model_id].append(response_time)
|
|
self.performance_metrics["confidence_distribution"][model_id].append(confidence)
|
|
|
|
|
|
self._check_performance_issues(model_id)
|
|
|
|
def _check_performance_issues(self, model_id):
|
|
"""Check for any performance anomalies"""
|
|
response_times = self.performance_metrics["response_times"][model_id]
|
|
if len(response_times) > 10:
|
|
avg_time = sum(response_times[-10:]) / 10
|
|
if avg_time > 2.0:
|
|
self.alerts.append(f"High latency detected for {model_id}: {avg_time:.2f}s")
|
|
|
|
class WeightOptimizationAgent:
|
|
def __init__(self, weight_manager):
|
|
self.weight_manager = weight_manager
|
|
self.prediction_history = []
|
|
self.optimization_threshold = 0.05
|
|
self.min_history_for_optimization = 20
|
|
|
|
def analyze_performance(self, ensemble_prediction_label, actual_label=None):
|
|
"""Analyze ensemble performance and record for optimization"""
|
|
|
|
assumed_actual_label = actual_label
|
|
if assumed_actual_label is None and ensemble_prediction_label != "UNCERTAIN":
|
|
assumed_actual_label = ensemble_prediction_label
|
|
|
|
self.prediction_history.append((ensemble_prediction_label, assumed_actual_label))
|
|
|
|
if len(self.prediction_history) >= self.min_history_for_optimization and self._should_optimize():
|
|
self._optimize_weights()
|
|
|
|
def _calculate_accuracy(self, history_subset):
|
|
"""Calculates accuracy based on history where actual_label is known."""
|
|
correct_predictions = 0
|
|
total_known = 0
|
|
for ensemble_pred, actual_label in history_subset:
|
|
if actual_label is not None:
|
|
total_known += 1
|
|
if ensemble_pred == actual_label:
|
|
correct_predictions += 1
|
|
return correct_predictions / total_known if total_known > 0 else 0.0
|
|
|
|
def _should_optimize(self):
|
|
"""Determine if weights should be optimized based on recent performance change."""
|
|
if len(self.prediction_history) < self.min_history_for_optimization * 2:
|
|
return False
|
|
|
|
|
|
recent_batch = self.prediction_history[-self.min_history_for_optimization:]
|
|
previous_batch = self.prediction_history[-self.min_history_for_optimization*2:-self.min_history_for_optimization]
|
|
|
|
recent_accuracy = self._calculate_accuracy(recent_batch)
|
|
previous_accuracy = self._calculate_accuracy(previous_batch)
|
|
|
|
|
|
if previous_accuracy > 0 and (previous_accuracy - recent_accuracy) / previous_accuracy > self.optimization_threshold:
|
|
logger.warning(f"Performance degradation detected (from {previous_accuracy:.2f} to {recent_accuracy:.2f}). Triggering weight optimization.")
|
|
return True
|
|
return False
|
|
|
|
def _optimize_weights(self):
|
|
"""Optimize model weights based on performance."""
|
|
logger.info("Optimizing model weights based on recent performance.")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SystemHealthAgent:
|
|
def __init__(self):
|
|
self.health_metrics = {
|
|
"memory_usage": [],
|
|
"gpu_utilization": [],
|
|
"model_load_times": {},
|
|
"error_rates": {}
|
|
}
|
|
|
|
def monitor_system_health(self):
|
|
"""Monitor overall system health"""
|
|
self._check_memory_usage()
|
|
self._check_gpu_utilization()
|
|
|
|
|
|
def _check_memory_usage(self):
|
|
"""Monitor memory usage"""
|
|
try:
|
|
import psutil
|
|
memory = psutil.virtual_memory()
|
|
self.health_metrics["memory_usage"].append(memory.percent)
|
|
|
|
if memory.percent > 90:
|
|
logger.warning(f"High memory usage detected: {memory.percent}%")
|
|
except ImportError:
|
|
logger.warning("psutil not installed. Cannot monitor memory usage.")
|
|
|
|
def _check_gpu_utilization(self):
|
|
"""Monitor GPU utilization if available"""
|
|
if torch.cuda.is_available():
|
|
try:
|
|
gpu_util = torch.cuda.memory_allocated() / torch.cuda.max_memory_allocated()
|
|
self.health_metrics["gpu_utilization"].append(gpu_util)
|
|
|
|
if gpu_util > 0.9:
|
|
logger.warning(f"High GPU utilization detected: {gpu_util*100:.2f}%")
|
|
except Exception as e:
|
|
logger.warning(f"Error monitoring GPU utilization: {e}")
|
|
else:
|
|
logger.info("CUDA not available. Skipping GPU utilization monitoring.") |