File size: 3,353 Bytes
246c955
60610b0
246c955
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f8c0662
246c955
f8c0662
 
 
246c955
 
 
 
 
60610b0
246c955
 
 
 
 
 
 
 
 
 
 
 
 
 
60610b0
246c955
 
f8c0662
246c955
55f60c2
f8c0662
 
 
0d98167
 
e577e06
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0d98167
246c955
60610b0
246c955
 
60610b0
246c955
60610b0
246c955
bab25bc
0d98167
 
 
4c5678a
246c955
4c5678a
246c955
f8c0662
 
641c113
 
01b125e
c00334e
 
2d1141c
246c955
4c2a482
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import gradio as gr
from huggingface_hub import from_pretrained_keras
import pandas as pd
import numpy as np
import json
from matplotlib import pyplot as plt

f = open('scaler.json')
scaler = json.load(f)

TIME_STEPS = 288

# Generated training sequences for use in the model.
def create_sequences(values, time_steps=TIME_STEPS):
    output = []
    for i in range(len(values) - time_steps + 1):
        output.append(values[i : (i + time_steps)])
    return np.stack(output)


def normalize_data(data):
    df_test_value = (data - scaler["mean"]) / scaler["std"]
    return df_test_value

def plot_test_data(df_test_value):
    fig, ax = plt.subplots(figsize=(12, 6))
    df_test_value.plot(legend=False, ax=ax)
    ax.set_xlabel("Time")
    ax.set_ylabel("Value")
    ax.set_title("Input Test Data")
    return fig

def get_anomalies(df_test_value):
    # Create sequences from test values.
    x_test = create_sequences(df_test_value.values)
    model = from_pretrained_keras("keras-io/timeseries-anomaly-detection")

    # Get test MAE loss.
    x_test_pred = model.predict(x_test)
    test_mae_loss = np.mean(np.abs(x_test_pred - x_test), axis=1)
    test_mae_loss = test_mae_loss.reshape((-1))

    # Detect all the samples which are anomalies.
    anomalies = test_mae_loss > scaler["threshold"]
    return anomalies

def plot_anomalies(df_test_value, data, anomalies):
    # data i is an anomaly if samples [(i - timesteps + 1) to (i)] are anomalies
    anomalous_data_indices = []
    for data_idx in range(TIME_STEPS - 1, len(df_test_value) - TIME_STEPS + 1):
        if np.all(anomalies[data_idx - TIME_STEPS + 1 : data_idx]):
            anomalous_data_indices.append(data_idx)
    df_subset = data.iloc[anomalous_data_indices]
    fig, ax = plt.subplots(figsize=(12, 6))
    data.plot(legend=False, ax=ax)
    df_subset.plot(legend=False, ax=ax, color="r")
    ax.set_xlabel("Time")
    ax.set_ylabel("Value")
    ax.set_title("Anomalous Data Points")
    return fig

def format_output(plot, indices):
    # Create a new figure and axis for the combined output
    fig_combined, (ax_plot, ax_text) = plt.subplots(nrows=2, figsize=(12, 8), gridspec_kw={'height_ratios': [6, 1]})

    # Add the plot to the top axis
    fig, ax = plot
    ax_plot.imshow(fig.canvas.renderer.buffer_rgba())
    ax_plot.axis('off')

    # Add the text to the bottom axis
    ax_text.text(0.5, 0, f"Anomalous Data Indices: {', '.join(indices)}", fontsize=12, ha='center')
    ax_text.axis('off')

    return fig_combined, None


def master(file):
    # read file
    data = pd.read_csv(file, parse_dates=True, index_col="timestamp")
    df_test_value = normalize_data(data)
    # plot input test data
    plot1 = plot_test_data(df_test_value)
    # predict
    anomalies = get_anomalies(df_test_value)
    # plot anomalous data points
    plot2 = plot_anomalies(df_test_value, data, anomalies)
    # format output
    anomalous_data_indices_str = ", ".join(map(str, np.where(anomalies)[0]))
    return format_output(plot2, anomalous_data_indices_str)

outputs = gr.outputs.Image()

iface = gr.Interface(
    fn=master,
    inputs=gr.inputs.File(label="CSV File"),
    outputs=outputs,
    examples=["art_daily_jumpsup.csv"],
    title="Timeseries Anomaly Detection Using an Autoencoder",
    description="Anomaly detection of timeseries data."
)

iface.launch()