|
|
|
|
|
import os |
|
import zipfile |
|
|
|
|
|
updated_files = { |
|
"LabOps_HuggingFace_Space/models/anomaly.py": """ |
|
import pandas as pd |
|
import numpy as np |
|
|
|
def detect_anomalies(df, threshold=2.0): |
|
df['timestamp'] = pd.to_datetime(df['timestamp']) |
|
usage = df.groupby(df['timestamp'].dt.date).size() |
|
mean = usage.mean() |
|
std = usage.std() |
|
|
|
anomalies = usage[abs(usage - mean) > threshold * std] |
|
return anomalies.reset_index().rename(columns={0: 'log_count'}) |
|
""", |
|
"LabOps_HuggingFace_Space/utils/amc.py": """ |
|
import pandas as pd |
|
from datetime import datetime, timedelta |
|
|
|
def upcoming_amc_devices(df, days_ahead=14): |
|
df['amc_expiry'] = pd.to_datetime(df['amc_expiry']) |
|
target_date = datetime.today() + timedelta(days=days_ahead) |
|
upcoming = df[df['amc_expiry'] <= target_date] |
|
return upcoming[['device_id', 'amc_expiry']] |
|
""", |
|
"LabOps_HuggingFace_Space/app.py": """ |
|
import streamlit as st |
|
import pandas as pd |
|
from utils.load_data import load_logs |
|
from utils.visualize import plot_usage |
|
from utils.report import generate_pdf |
|
from models.anomaly import detect_anomalies |
|
from utils.amc import upcoming_amc_devices |
|
|
|
st.set_page_config(page_title="LabOps Dashboard", layout="wide") |
|
st.title("π Multi-Device LabOps Dashboard") |
|
|
|
uploaded_files = st.file_uploader("Upload Device Logs (CSV)", accept_multiple_files=True) |
|
|
|
if uploaded_files: |
|
df = load_logs(uploaded_files) |
|
st.subheader("π Uploaded Logs") |
|
st.dataframe(df.head()) |
|
|
|
st.subheader("π Daily Usage Chart") |
|
st.pyplot(plot_usage(df)) |
|
|
|
st.subheader("π¨ Detected Anomalies") |
|
anomalies = detect_anomalies(df) |
|
st.dataframe(anomalies) |
|
|
|
st.subheader("π Upcoming AMC Devices") |
|
if "amc_expiry" in df.columns: |
|
amc_df = upcoming_amc_devices(df) |
|
st.dataframe(amc_df) |
|
|
|
if st.button("π Generate PDF Report"): |
|
generate_pdf(df) |
|
st.success("PDF report generated successfully.") |
|
""" |
|
} |
|
|
|
|
|
for path, content in updated_files.items(): |
|
full_path = f"/mnt/data/{path}" |
|
os.makedirs(os.path.dirname(full_path), exist_ok=True) |
|
with open(full_path, "w") as f: |
|
f.write(content.strip()) |
|
|
|
|
|
zip_path_updated = "/mnt/data/LabOps_HuggingFace_Space_Updated.zip" |
|
with zipfile.ZipFile(zip_path_updated, 'w') as zipf: |
|
for folder, _, filenames in os.walk("/mnt/data/LabOps_HuggingFace_Space"): |
|
for filename in filenames: |
|
full_path = os.path.join(folder, filename) |
|
arcname = os.path.relpath(full_path, "/mnt/data/LabOps_HuggingFace_Space") |
|
zipf.write(full_path, arcname) |
|
|
|
zip_path_updated |
|
|