File size: 3,160 Bytes
246c955
60610b0
246c955
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f8c0662
246c955
f8c0662
 
 
246c955
 
 
 
 
60610b0
246c955
 
 
 
 
 
 
 
 
 
 
 
 
 
60610b0
246c955
 
f8c0662
246c955
55f60c2
f8c0662
 
 
4c2a482
60610b0
246c955
60610b0
246c955
 
60610b0
246c955
60610b0
246c955
bab25bc
4c5678a
 
 
 
246c955
4c5678a
 
 
 
 
 
 
246c955
f8c0662
 
641c113
 
01b125e
c00334e
 
2d1141c
246c955
4c2a482
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import gradio as gr
from huggingface_hub import from_pretrained_keras
import pandas as pd
import numpy as np
import json
from matplotlib import pyplot as plt

f = open('scaler.json')
scaler = json.load(f)

TIME_STEPS = 288

# Generated training sequences for use in the model.
def create_sequences(values, time_steps=TIME_STEPS):
    output = []
    for i in range(len(values) - time_steps + 1):
        output.append(values[i : (i + time_steps)])
    return np.stack(output)


def normalize_data(data):
    df_test_value = (data - scaler["mean"]) / scaler["std"]
    return df_test_value

def plot_test_data(df_test_value):
    fig, ax = plt.subplots(figsize=(12, 6))
    df_test_value.plot(legend=False, ax=ax)
    ax.set_xlabel("Time")
    ax.set_ylabel("Value")
    ax.set_title("Input Test Data")
    return fig

def get_anomalies(df_test_value):
    # Create sequences from test values.
    x_test = create_sequences(df_test_value.values)
    model = from_pretrained_keras("keras-io/timeseries-anomaly-detection")

    # Get test MAE loss.
    x_test_pred = model.predict(x_test)
    test_mae_loss = np.mean(np.abs(x_test_pred - x_test), axis=1)
    test_mae_loss = test_mae_loss.reshape((-1))

    # Detect all the samples which are anomalies.
    anomalies = test_mae_loss > scaler["threshold"]
    return anomalies

def plot_anomalies(df_test_value, data, anomalies):
    # data i is an anomaly if samples [(i - timesteps + 1) to (i)] are anomalies
    anomalous_data_indices = []
    for data_idx in range(TIME_STEPS - 1, len(df_test_value) - TIME_STEPS + 1):
        if np.all(anomalies[data_idx - TIME_STEPS + 1 : data_idx]):
            anomalous_data_indices.append(data_idx)
    df_subset = data.iloc[anomalous_data_indices]
    fig, ax = plt.subplots(figsize=(12, 6))
    data.plot(legend=False, ax=ax)
    df_subset.plot(legend=False, ax=ax, color="r")
    ax.set_xlabel("Time")
    ax.set_ylabel("Value")
    ax.set_title("Anomalous Data Points")
    return fig, anomalous_data_indices
                                      
def master(file):
    # read file
    data = pd.read_csv(file, parse_dates=True, index_col="timestamp")
    df_test_value = normalize_data(data)
    # plot input test data
    plot1 = plot_test_data(df_test_value)
    # predict
    anomalies = get_anomalies(df_test_value)
    # plot anomalous data points
    plot2, anomalous_data_indices = plot_anomalies(df_test_value, data, anomalies)
    # convert indices to string
    anomalous_data_indices_str = [str(idx) for idx in anomalous_data_indices]
    return format_output(plot2, anomalous_data_indices_str)

def format_output(plot, indices):
    # Combine the plot and indices into a single output
    fig, ax = plot
    ax.text(0.5, -0.2, f"Anomalous Data Indices: {', '.join(indices)}", transform=ax.transAxes, fontsize=12, ha='center')
    return fig

outputs = gr.outputs.Image()

iface = gr.Interface(
    fn=master,
    inputs=gr.inputs.File(label="CSV File"),
    outputs=outputs,
    examples=["art_daily_jumpsup.csv"],
    title="Timeseries Anomaly Detection Using an Autoencoder",
    description="Anomaly detection of timeseries data."
)

iface.launch()