File size: 7,878 Bytes
36459c4 6eaa3dc e2b7de3 36459c4 e2b7de3 221e826 e2b7de3 221e826 13ed916 e2b7de3 13ed916 e2b7de3 13ed916 e2b7de3 952210b e2b7de3 6eaa3dc e2b7de3 684911e e2b7de3 0d752e6 e2b7de3 0d752e6 e2b7de3 8846627 e2b7de3 221e826 e2b7de3 08fef74 e2b7de3 221e826 e2b7de3 36459c4 e2b7de3 221e826 e2b7de3 36459c4 e2b7de3 36459c4 e2b7de3 36459c4 e2b7de3 36459c4 e2b7de3 36459c4 e2b7de3 36459c4 e2b7de3 36459c4 221e826 e2b7de3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
import gradio as gr
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.ensemble import IsolationForest
from datetime import datetime, timedelta
import os
import logging
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
import tempfile
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def process_files(uploaded_files):
"""
Process uploaded CSV files, generate usage plots, detect anomalies, and process AMC expiries.
Returns a dataframe, plot, PDF path, and status message.
"""
if not uploaded_files:
logging.warning("No files uploaded.")
return None, None, None, "Please upload at least one valid CSV file."
valid_files = [f for f in uploaded_files if f.name.endswith('.csv')]
if not valid_files:
logging.warning("No valid CSV files uploaded.")
return None, None, None, "Please upload at least one valid CSV file."
logging.info(f"Processing {len(valid_files)} valid files: {[f.name for f in valid_files]}")
all_data = []
# Load and combine CSV files
for file in valid_files:
try:
logging.info(f"Loading logs from {file.name}")
df = pd.read_csv(file.name)
logging.info(f"Loaded {len(df)} records from {file.name}")
all_data.append(df)
except Exception as e:
logging.error(f"Failed to load {file.name}: {str(e)}")
return None, None, None, f"Error loading {file.name}: {str(e)}"
if not all_data:
logging.warning("No data loaded from uploaded files.")
return None, None, None, "No valid data found in uploaded files."
combined_df = pd.concat(all_data, ignore_index=True)
logging.info(f"Combined {len(combined_df)} total records.")
logging.info(f"CSV columns: {combined_df.columns.tolist()}")
# Generate usage plot
plot_path = generate_usage_plot(combined_df)
# Detect anomalies
anomaly_df = detect_anomalies(combined_df)
# Process AMC expiries
amc_message, amc_df = process_amc_expiries(combined_df)
# Generate PDF report
pdf_path = generate_pdf_report(combined_df, anomaly_df, amc_df)
# Prepare output dataframe (combine original data with anomalies)
output_df = combined_df.copy()
if anomaly_df is not None:
output_df['anomaly'] = anomaly_df['anomaly']
return output_df, plot_path, pdf_path, amc_message
def generate_usage_plot(df):
"""
Generate a bar plot of usage_count by equipment and status.
Returns the path to the saved plot.
"""
logging.info("Generating usage plot...")
try:
plt.figure(figsize=(10, 6))
for status in df['status'].unique():
subset = df[df['status'] == status]
plt.bar(subset['equipment'] + f" ({status})", subset['usage_count'], label=status)
plt.xlabel("Equipment (Status)")
plt.ylabel("Usage Count")
plt.title("Usage Count by Equipment and Status")
plt.legend()
plt.xticks(rotation=45)
plt.tight_layout()
# Save plot to temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as tmp:
plt.savefig(tmp.name, format='png')
plot_path = tmp.name
plt.close()
logging.info("Usage plot generated successfully.")
return plot_path
except Exception as e:
logging.error(f"Failed to generate usage plot: {str(e)}")
return None
def detect_anomalies(df):
"""
Detect anomalies in usage_count using Isolation Forest.
Returns a dataframe with an 'anomaly' column (-1 for anomalies, 1 for normal).
"""
logging.info("Detecting anomalies...")
try:
model = IsolationForest(contamination=0.1, random_state=42)
anomalies = model.fit_predict(df[['usage_count']].values)
anomaly_df = df.copy()
anomaly_df['anomaly'] = anomalies
logging.info(f"Detected {sum(anomalies == -1)} anomalies.")
return anomaly_df
except Exception as e:
logging.error(f"Failed to detect anomalies: {str(e)}")
return None
def process_amc_expiries(df):
"""
Identify devices with AMC expiries within 7 days from 2025-06-05.
Returns a message and a dataframe of devices with upcoming expiries.
"""
logging.info("Processing AMC expiries...")
try:
current_date = datetime(2025, 6, 5)
threshold = current_date + timedelta(days=7)
df['amc_expiry'] = pd.to_datetime(df['amc_expiry'])
upcoming_expiries = df[df['amc_expiry'] <= threshold]
unique_devices = upcoming_expiries['equipment'].unique()
message = f"Found {len(unique_devices)} devices with upcoming AMC expiries: {', '.join(unique_devices)}"
logging.info(message)
return message, upcoming_expiries
except Exception as e:
logging.error(f"Failed to process AMC expiries: {str(e)}")
return f"Error processing AMC expiries: {str(e)}", None
def generate_pdf_report(original_df, anomaly_df, amc_df):
"""
Generate a PDF report with data summary, anomalies, and AMC expiries.
Returns the path to the saved PDF.
"""
logging.info("Generating PDF report...")
try:
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp:
c = canvas.Canvas(tmp.name, pagesize=letter)
c.drawString(100, 750, "Equipment Log Analysis Report")
y = 700
# Summary
c.drawString(100, y, f"Total Records: {len(original_df)}")
c.drawString(100, y-20, f"Devices: {', '.join(original_df['equipment'].unique())}")
y -= 40
# Anomalies
if anomaly_df is not None:
num_anomalies = sum(anomaly_df['anomaly'] == -1)
c.drawString(100, y, f"Anomalies Detected: {num_anomalies}")
if num_anomalies > 0:
anomaly_equipment = anomaly_df[anomaly_df['anomaly'] == -1]['equipment'].unique()
c.drawString(100, y-20, f"Anomalous Devices: {', '.join(anomaly_equipment)}")
y -= 40
else:
c.drawString(100, y, "Anomaly detection failed.")
y -= 20
# AMC Expiries
if amc_df is not None:
c.drawString(100, y, f"Devices with Upcoming AMC Expiries: {len(amc_df['equipment'].unique())}")
for _, row in amc_df.iterrows():
c.drawString(100, y-20, f"{row['equipment']}: {row['amc_expiry'].strftime('%Y-%m-%d')}")
y -= 20
else:
c.drawString(100, y, "No AMC expiry data available.")
y -= 20
c.showPage()
c.save()
pdf_path = tmp.name
logging.info("PDF report generated successfully.")
return pdf_path
except Exception as e:
logging.error(f"Failed to generate PDF report: {str(e)}")
return None
# Gradio interface
with gr.Blocks() as demo:
gr.Markdown("# Equipment Log Analysis")
with gr.Row():
file_input = gr.File(file_count="multiple", label="Upload CSV Files")
process_button = gr.Button("Process Files")
with gr.Row():
output_df = gr.Dataframe(label="Processed Data")
output_plot = gr.Image(label="Usage Plot")
with gr.Row():
output_message = gr.Textbox(label="AMC Expiry Status")
output_pdf = gr.File(label="Download PDF Report")
process_button.click(
fn=process_files,
inputs=[file_input],
outputs=[output_df, output_plot, output_pdf, output_message]
)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860) |