File size: 5,241 Bytes
8f7f87a e1eac06 8f7f87a e1eac06 8f7f87a e1eac06 8f7f87a e1eac06 8f7f87a e1eac06 8f7f87a e1eac06 8f7f87a e1eac06 8f7f87a e1eac06 8f7f87a e1eac06 8f7f87a e1eac06 8f7f87a e1eac06 8f7f87a e1eac06 8f7f87a e1eac06 8f7f87a e1eac06 8f7f87a e1eac06 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
import logging
import time
import torch
import psutil # Ensure psutil is imported here as well
import GPUtil
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class EnsembleMonitorAgent:
def __init__(self):
logger.info("Initializing EnsembleMonitorAgent.")
self.performance_metrics = {}
self.alerts = []
def monitor_prediction(self, model_id, prediction_label, confidence_score, inference_time):
logger.info(f"Monitoring prediction for model '{model_id}'. Label: {prediction_label}, Confidence: {confidence_score:.2f}, Time: {inference_time:.4f}s")
if model_id not in self.performance_metrics:
self.performance_metrics[model_id] = {
"total_predictions": 0,
"correct_predictions": 0, # This would require ground truth, which we don't have here.
"total_confidence": 0.0,
"total_inference_time": 0.0
}
metrics = self.performance_metrics[model_id]
metrics["total_predictions"] += 1
metrics["total_confidence"] += confidence_score
metrics["total_inference_time"] += inference_time
# Example alert: model taking too long
if inference_time > 5.0: # Threshold for slow inference
alert_msg = f"ALERT: Model '{model_id}' inference time exceeded 5.0s: {inference_time:.4f}s"
self.alerts.append(alert_msg)
logger.warning(alert_msg)
# Example alert: low confidence
if confidence_score < 0.5: # Threshold for low confidence
alert_msg = f"ALERT: Model '{model_id}' returned low confidence: {confidence_score:.2f}"
self.alerts.append(alert_msg)
logger.warning(alert_msg)
logger.debug(f"Updated metrics for '{model_id}': {metrics}")
def get_performance_summary(self):
logger.info("Generating performance summary for all models.")
summary = {}
for model_id, metrics in self.performance_metrics.items():
avg_confidence = metrics["total_confidence"] / metrics["total_predictions"] if metrics["total_predictions"] > 0 else 0
avg_inference_time = metrics["total_inference_time"] / metrics["total_predictions"] if metrics["total_predictions"] > 0 else 0
summary[model_id] = {
"avg_confidence": avg_confidence,
"avg_inference_time": avg_inference_time,
"total_predictions": metrics["total_predictions"]
}
logger.info(f"Performance summary: {summary}")
return summary
class WeightOptimizationAgent:
def __init__(self, weight_manager):
logger.info("Initializing WeightOptimizationAgent.")
self.weight_manager = weight_manager
self.prediction_history = []
self.performance_window = timedelta(hours=24) # Evaluate performance over last 24 hours
def analyze_performance(self, final_prediction, ground_truth=None):
logger.info(f"Analyzing performance. Final prediction: {final_prediction}, Ground truth: {ground_truth}")
timestamp = datetime.now()
self.prediction_history.append({
"timestamp": timestamp,
"final_prediction": final_prediction,
"ground_truth": ground_truth # Ground truth is often not available in real-time
})
# Keep history windowed
self.prediction_history = [p for p in self.prediction_history if timestamp - p["timestamp"] < self.performance_window]
logger.debug(f"Prediction history length: {len(self.prediction_history)}")
# In a real scenario, this would involve a more complex optimization logic
# For now, it just logs the history length.
class SystemHealthAgent:
def __init__(self):
logger.info("Initializing SystemHealthAgent.")
self.health_metrics = {
"cpu_percent": 0,
"memory_usage": {"total": 0, "available": 0, "percent": 0},
"gpu_utilization": []
}
def monitor_system_health(self):
logger.info("Monitoring system health...")
self.health_metrics["cpu_percent"] = psutil.cpu_percent(interval=1)
mem = psutil.virtual_memory()
self.health_metrics["memory_usage"] = {
"total": mem.total,
"available": mem.available,
"percent": mem.percent
}
gpu_info = []
try:
gpus = GPUtil.getGPUs()
for gpu in gpus:
gpu_info.append({
"id": gpu.id,
"name": gpu.name,
"load": gpu.load,
"memoryUtil": gpu.memoryUtil,
"memoryTotal": gpu.memoryTotal,
"memoryUsed": gpu.memoryUsed
})
except Exception as e:
logger.warning(f"Could not retrieve GPU information: {e}")
gpu_info.append({"error": str(e)})
self.health_metrics["gpu_utilization"] = gpu_info
logger.info(f"System health metrics: CPU: {self.health_metrics['cpu_percent']}%, Memory: {self.health_metrics['memory_usage']['percent']}%, GPU: {gpu_info}") |