File size: 3,094 Bytes
246c955
60610b0
246c955
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f8c0662
246c955
f8c0662
 
 
246c955
 
 
 
 
60610b0
246c955
 
 
 
 
 
 
 
 
 
 
 
 
 
60610b0
246c955
 
f8c0662
246c955
55f60c2
f8c0662
 
 
0d98167
 
 
 
 
 
 
 
 
 
 
246c955
60610b0
246c955
 
60610b0
246c955
60610b0
246c955
bab25bc
0d98167
 
 
4c5678a
246c955
4c5678a
246c955
f8c0662
 
641c113
 
01b125e
c00334e
 
2d1141c
246c955
4c2a482
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import gradio as gr
from huggingface_hub import from_pretrained_keras
import pandas as pd
import numpy as np
import json
from matplotlib import pyplot as plt

f = open('scaler.json')
scaler = json.load(f)

TIME_STEPS = 288

# Generated training sequences for use in the model.
def create_sequences(values, time_steps=TIME_STEPS):
    output = []
    for i in range(len(values) - time_steps + 1):
        output.append(values[i : (i + time_steps)])
    return np.stack(output)


def normalize_data(data):
    df_test_value = (data - scaler["mean"]) / scaler["std"]
    return df_test_value

def plot_test_data(df_test_value):
    fig, ax = plt.subplots(figsize=(12, 6))
    df_test_value.plot(legend=False, ax=ax)
    ax.set_xlabel("Time")
    ax.set_ylabel("Value")
    ax.set_title("Input Test Data")
    return fig

def get_anomalies(df_test_value):
    # Create sequences from test values.
    x_test = create_sequences(df_test_value.values)
    model = from_pretrained_keras("keras-io/timeseries-anomaly-detection")

    # Get test MAE loss.
    x_test_pred = model.predict(x_test)
    test_mae_loss = np.mean(np.abs(x_test_pred - x_test), axis=1)
    test_mae_loss = test_mae_loss.reshape((-1))

    # Detect all the samples which are anomalies.
    anomalies = test_mae_loss > scaler["threshold"]
    return anomalies

def plot_anomalies(df_test_value, data, anomalies):
    # data i is an anomaly if samples [(i - timesteps + 1) to (i)] are anomalies
    anomalous_data_indices = []
    for data_idx in range(TIME_STEPS - 1, len(df_test_value) - TIME_STEPS + 1):
        if np.all(anomalies[data_idx - TIME_STEPS + 1 : data_idx]):
            anomalous_data_indices.append(data_idx)
    df_subset = data.iloc[anomalous_data_indices]
    fig, ax = plt.subplots(figsize=(12, 6))
    data.plot(legend=False, ax=ax)
    df_subset.plot(legend=False, ax=ax, color="r")
    ax.set_xlabel("Time")
    ax.set_ylabel("Value")
    ax.set_title("Anomalous Data Points")
    return fig

def format_output(plot, anomalous_data_indices_str):
    if isinstance(plot, tuple):
        fig, ax = plot
    else:
        fig = plot
        ax = fig.axes[0]
    ax.text(0, -0.1, "Anomalous Data Indices: " + anomalous_data_indices_str, transform=ax.transAxes)
    return fig

def master(file):
    # read file
    data = pd.read_csv(file, parse_dates=True, index_col="timestamp")
    df_test_value = normalize_data(data)
    # plot input test data
    plot1 = plot_test_data(df_test_value)
    # predict
    anomalies = get_anomalies(df_test_value)
    # plot anomalous data points
    plot2 = plot_anomalies(df_test_value, data, anomalies)
    # format output
    anomalous_data_indices_str = ", ".join(map(str, np.where(anomalies)[0]))
    return format_output(plot2, anomalous_data_indices_str)

outputs = gr.outputs.Image()

iface = gr.Interface(
    fn=master,
    inputs=gr.inputs.File(label="CSV File"),
    outputs=outputs,
    examples=["art_daily_jumpsup.csv"],
    title="Timeseries Anomaly Detection Using an Autoencoder",
    description="Anomaly detection of timeseries data."
)

iface.launch()