Spaces:
Running
Running
# app.py | |
import gradio as gr | |
import pandas as pd | |
import numpy as np | |
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification | |
from sklearn.ensemble import RandomForestClassifier | |
import joblib | |
import os | |
# Load Hugging Face model for anomaly detection | |
tokenizer = AutoTokenizer.from_pretrained("huggingface-course/distilbert-base-uncased-finetuned-imdb") | |
model = AutoModelForSequenceClassification.from_pretrained("huggingface-course/distilbert-base-uncased-finetuned-imdb") | |
anomaly_detection = pipeline("text-classification", model=model, tokenizer=tokenizer) | |
# Train or load Random Forest model for failure prediction | |
if not os.path.exists('failure_prediction_model.pkl'): | |
data = pd.DataFrame({ | |
'cpu_usage': [10, 20, 15, 35, 55], | |
'memory_usage': [30, 60, 45, 50, 80], | |
'error_rate': [0, 1, 0, 2, 5], | |
'failure': [0, 1, 0, 1, 1] | |
}) | |
X = data[['cpu_usage', 'memory_usage', 'error_rate']] | |
y = data['failure'] | |
failure_prediction_model = RandomForestClassifier(n_estimators=100, random_state=42) | |
failure_prediction_model.fit(X, y) | |
joblib.dump(failure_prediction_model, 'failure_prediction_model.pkl') | |
else: | |
failure_prediction_model = joblib.load('failure_prediction_model.pkl') | |
# Preprocess logs for anomaly detection | |
def preprocess_logs(logs): | |
logs['timestamp'] = pd.to_datetime(logs['timestamp']) | |
logs['log_message'] = logs['log_message'].str.lower() | |
return logs | |
# Detect anomalies in logs with label mapping | |
def detect_anomaly(logs): | |
preprocessed_logs = preprocess_logs(logs) | |
label_map = { # Map Hugging Face output labels to meaningful labels | |
"LABEL_0": "Normal", | |
"LABEL_1": "Anomaly" | |
} | |
results = [] | |
for log in preprocessed_logs['log_message']: | |
anomaly_result = anomaly_detection(log) | |
label = anomaly_result[0]['label'] | |
results.append(label_map.get(label, label)) # Map the label or return the original label | |
return results | |
# Predict failures based on device metrics | |
def predict_failure(device_metrics): | |
if device_metrics is None: | |
return "Device metrics are missing." | |
if 'cpu_usage' not in device_metrics or 'memory_usage' not in device_metrics or 'error_rate' not in device_metrics: | |
return "Invalid metrics format. Please provide 'cpu_usage', 'memory_usage', and 'error_rate'." | |
metrics_array = np.array([device_metrics['cpu_usage'], device_metrics['memory_usage'], device_metrics['error_rate']]).reshape(1, -1) | |
failure_prediction = failure_prediction_model.predict(metrics_array) | |
return failure_prediction | |
# Process logs and predict anomalies and failures | |
def process_logs_and_predict(log_file, metrics): | |
# Read and validate log file format | |
try: | |
logs = pd.read_json(log_file) | |
if not isinstance(logs, pd.DataFrame) or logs.empty: | |
return "Invalid log file format. Please upload a JSON array of log entries." | |
except ValueError as e: | |
return f"Error reading JSON file: {str(e)}" | |
# Detect anomalies | |
anomalies = detect_anomaly(logs) | |
# Predict failures using device metrics | |
failure_pred = predict_failure(metrics) | |
return f"Anomalies Detected: {anomalies}, Failure Prediction: {failure_pred}" | |
# Gradio interface | |
iface = gr.Interface(fn=process_logs_and_predict, | |
inputs=["file", "json"], | |
outputs="text", | |
title="Cisco Device Monitoring", | |
description="Upload log files to detect anomalies and predict potential device failures.") | |
iface.launch() | |