-
Notifications
You must be signed in to change notification settings - Fork 23
/
detect_anomalies.py
369 lines (311 loc) · 17.4 KB
/
detect_anomalies.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
# Standard modules
import os
import progressbar
import pandas as pd
from pandas import datetime
import numpy as np
from matplotlib import pyplot
# Custom modules
import nonparametric_dynamic_thresholding as ndt
__author__ = 'Shawn Polson'
__contact__ = '[email protected]'
def parser(x):
new_time = ''.join(x.split('.')[0]) # remove microseconds from time data
try:
return datetime.strptime(new_time, '%Y-%m-%d %H:%M:%S') # for bus voltage, battery temp, wheel temp, and wheel rpm data
except:
return datetime.strptime(new_time, '%Y-%m-%d') # for total bus current data
def detect_anomalies(ts, normal_model, ds_name, var_name, alg_name, outlier_def='std', num_stds=2, ndt_errors=None,
plot_save_path=None, data_save_path=None):
"""Detect outliers in the time series data by comparing points against a "normal" model.
Inputs:
ts [pd Series]: A pandas Series with a DatetimeIndex and a column for numerical values.
normal_model [pd Series]: A pandas Series with a DatetimeIndex and a column for numerical values.
ds_name [str]: The name of the time series dataset.
var_name [str]: The name of the dependent variable in the time series.
alg_name [str]: The name of the algorithm used to create 'normal_model'.
Optional Inputs:
outlier_def [str]: {'std', 'errors', 'dynamic'} The definition of an outlier to be used. Can be 'std' for [num_stds] from the data's mean,
'errors' for [num_stds] from the mean of the errors, or 'dynamic' for nonparametric dynamic thresholding
Default is 'std'.
num_stds [float]: The number of standard deviations away from the mean used to define point outliers (when applicable).
Default is 2.
ndt_errors [list]: Optionally skip nonparametric dynamic thresholding's 'get_errors()' and use these values instead.
plot_save_path [str]: The file path (ending in file name *.png) for saving plots of outliers.
data_save_path [str]: The file path (ending in file name *.csv) for saving CSVs with outliers.
Outputs:
time_series_with_outliers [pd DataFrame]: A pandas DataFrame with a DatetimeIndex, two columns for numerical values, and an Outlier column (True or False).
Optional Outputs:
None
Example:
time_series_with_outliers = detect_anomalies(time_series, model, 'BatteryTemperature', 'Temperature (C)',
'ARIMA', 'dynamic', plot_path, data_path)
"""
X = ts.values
Y = normal_model.values
outliers = pd.Series()
errors = pd.Series()
time_series_with_outliers = pd.DataFrame({var_name: ts, alg_name: normal_model})
time_series_with_outliers['Outlier'] = 'False'
column_names = [var_name, alg_name, 'Outlier'] # column order
time_series_with_outliers = time_series_with_outliers.reindex(columns=column_names) # sort columns in specified order
# Start a progress bar
widgets = [progressbar.Percentage(), progressbar.Bar(), progressbar.Timer(), ' ', progressbar.AdaptiveETA()]
progress_bar_sliding_window = progressbar.ProgressBar(
widgets=[progressbar.FormatLabel('Outliers (' + ds_name + ')')] + widgets,
maxval=int(len(X))).start()
# Define outliers by distance from "normal" model
if outlier_def == 'std':
# Label outliers using standard deviations
std = float(X.std(ddof=0))
outlier_points = []
outlier_indices = []
for t in range(len(X)):
obs = X[t]
y = Y[t]
error = abs(y - obs)
if error > std*num_stds:
time_series_with_outliers.at[ts.index[t], 'Outlier'] = 'True'
outlier_points.append(obs)
outlier_indices.append(ts.index[t])
progress_bar_sliding_window.update(t) # advance progress bar
outliers = outliers.append(pd.Series(outlier_points, index=outlier_indices))
# Define outliers by distance from mean of errors
elif outlier_def == 'errors':
# Populate errors
error_points = []
error_indices = []
for t in range(len(X)):
obs = X[t]
y = Y[t]
error = abs(y - obs)
error_points.append(error)
error_indices.append(ts.index[t])
progress_bar_sliding_window.update(t) # advance progress bar
errors = errors.append(pd.Series(error_points, index=error_indices))
mean_of_errors = float(errors.values.mean())
std_of_errors = float(errors.values.std(ddof=0))
threshold = mean_of_errors + (std_of_errors*num_stds)
# Label outliers using standard deviations from the errors' mean
outlier_points = []
outlier_indices = []
for t in range(len(X)):
obs = X[t]
error = errors[t]
if error > threshold:
time_series_with_outliers.at[ts.index[t], 'Outlier'] = 'True'
outlier_points.append(obs)
outlier_indices.append(ts.index[t])
progress_bar_sliding_window.update(t) # advance progress bar
outliers = outliers.append(pd.Series(outlier_points, index=outlier_indices))
# Define outliers using JPL's nonparamatric dynamic thresholding technique
elif outlier_def == 'dynamic':
progress_bar_sliding_window.update(int(len(X))/2) # start progress bar timer
outlier_points = []
outlier_indices = []
if ndt_errors is not None:
smoothed_errors = ndt_errors
else:
smoothed_errors = ndt.get_errors(X, Y)
# These are the results of the nonparametric dynamic thresholding
E_seq, anom_scores = ndt.process_errors(X, smoothed_errors)
progress_bar_sliding_window.update(int(len(X)) - 1) # advance progress bar timer
# Convert sets of outlier start/end indices into outlier points
for anom in E_seq:
start = anom[0]
end = anom[1]
for i in range(start, end+1):
time_series_with_outliers.at[ts.index[i], 'Outlier'] = 'True'
outlier_points.append(X[i])
outlier_indices.append(ts.index[i])
outliers = outliers.append(pd.Series(outlier_points, index=outlier_indices))
# Plot anomalies
ax = ts.plot(color='#192C87', title=ds_name + ' with ' + alg_name + ' Outliers', label=var_name, figsize=(14, 6))
normal_model.plot(color='#0CCADC', label=alg_name, linewidth=1.5)
if len(outliers) > 0:
print('Detected outliers (' + ds_name + '): ' + str(len(outliers)))
outliers.plot(color='red', style='.', label='Outliers')
ax.set(xlabel='Time', ylabel=var_name)
pyplot.legend(loc='best')
# Save plot
if plot_save_path is not None:
plot_dir = plot_save_path[:plot_save_path.rfind('/')+1]
if not os.path.exists(plot_dir):
os.makedirs(plot_dir)
pyplot.savefig(plot_save_path, dpi=500)
pyplot.show()
pyplot.clf()
# Save data
if data_save_path is not None:
data_dir = data_save_path[:data_save_path.rfind('/')+1]
if not os.path.exists(data_dir):
os.makedirs(data_dir)
time_series_with_outliers.to_csv(data_save_path)
return time_series_with_outliers
def detect_anomalies_with_many_stds(ts, normal_model, ds_name, var_name, alg_name, outlier_def='std', stds=[2,4,8],
plot_save_path=None, data_save_path=None):
"""Detect outliers in the time series data by comparing points against a "normal" model, using a set of three standard deviations as thresholds.
Inputs:
ts [pd Series]: A pandas Series with a DatetimeIndex and a column for numerical values.
normal_model [pd Series]: A pandas Series with a DatetimeIndex and a column for numerical values.
ds_name [str]: The name of the time series dataset.
var_name [str]: The name of the dependent variable in the time series.
alg_name [str]: The name of the algorithm used to create 'normal_model'.
Optional Inputs:
outlier_def [str]: {'std', 'errors'} The definition of an outlier to be used. Can be 'std' for [num_stds] from the data's mean
or 'errors' for [num_stds] from the mean of the errors.
Default is 'std'.
stds [3 floats]: Exactly 3 numbers which will be successively used as the number of standard deviations away from the mean that constitutes an outlier.
Plots will show outliers with all 3 stds and 3 CSVs will be saved.
Default is [2,4,6].
plot_save_path [str]: The file path (ending in file name *.png) for saving plots of outliers.
data_save_path [str]: The file path (ending in file name *.csv) for saving. 3 CSVs get saved, each with the std used appended to the name.
Outputs:
time_series_with_outliers1 [pd DataFrame]: A pandas DataFrame with a DatetimeIndex, two columns for numerical values, and an Outlier column (True or False).
Uses the first number in stds.
time_series_with_outliers2 [pd DataFrame]: A pandas DataFrame with a DatetimeIndex, two columns for numerical values, and an Outlier column (True or False).
Uses the second number in stds.
time_series_with_outliers3 [pd DataFrame]: A pandas DataFrame with a DatetimeIndex, two columns for numerical values, and an Outlier column (True or False).
Uses the third number in stds.
Optional Outputs:
None
Example:
df1, df2, df3 = detect_anomalies(time_series, model, 'BatteryTemperature', 'Temperature (C)', [2,4,6],
'ARIMA', 'dynamic', plot_path, data_path)
"""
X = ts.values
Y = normal_model.values
outliers1 = pd.Series()
outliers2 = pd.Series()
outliers3 = pd.Series()
errors = pd.Series()
time_series_with_outliers = pd.DataFrame({var_name: ts, alg_name: normal_model})
time_series_with_outliers['Outlier'] = 'False'
column_names = [var_name, alg_name, 'Outlier'] # column order
time_series_with_outliers1 = time_series_with_outliers.reindex(columns=column_names) # sort columns in specified order
time_series_with_outliers2 = time_series_with_outliers.reindex(columns=column_names) # sort columns in specified order
time_series_with_outliers3 = time_series_with_outliers.reindex(columns=column_names) # sort columns in specified order
# Start a progress bar
widgets = [progressbar.Percentage(), progressbar.Bar(), progressbar.Timer(), ' ', progressbar.AdaptiveETA()]
progress_bar_sliding_window = progressbar.ProgressBar(
widgets=[progressbar.FormatLabel('Outliers (' + ds_name + ')')] + widgets,
maxval=int(len(X)*len(stds))).start()
progress = 0
for i in range(len(stds)):
num_stds = stds[i]
# Define outliers by distance from "normal" model
if outlier_def == 'std':
# Label outliers using standard deviations
std = float(X.std(ddof=0))
outlier_points = []
outlier_indices = []
for t in range(len(X)):
obs = X[t]
y = Y[t]
error = abs(y - obs)
if error > std * num_stds:
if i == 0:
time_series_with_outliers1.at[ts.index[t], 'Outlier'] = 'True'
elif i == 1:
time_series_with_outliers2.at[ts.index[t], 'Outlier'] = 'True'
elif i == 2:
time_series_with_outliers3.at[ts.index[t], 'Outlier'] = 'True'
outlier_points.append(obs)
outlier_indices.append(ts.index[t])
progress = progress + 1
progress_bar_sliding_window.update(progress) # advance progress bar
if i == 0:
outliers1 = outliers1.append(pd.Series(outlier_points, index=outlier_indices))
elif i == 1:
outliers2 = outliers2.append(pd.Series(outlier_points, index=outlier_indices))
elif i == 2:
outliers3 = outliers3.append(pd.Series(outlier_points, index=outlier_indices))
# Define outliers by distance from mean of errors
elif outlier_def == 'errors':
# Populate errors
error_points = []
error_indices = []
for t in range(len(X)):
obs = X[t]
y = Y[t]
error = abs(y - obs)
error_points.append(error)
error_indices.append(ts.index[t])
errors = errors.append(pd.Series(error_points, index=error_indices))
mean_of_errors = float(errors.values.mean())
std_of_errors = float(errors.values.std(ddof=0))
threshold = mean_of_errors + (std_of_errors * num_stds)
# Label outliers using standard deviations from the errors' mean
outlier_points = []
outlier_indices = []
error_vals = errors.values
for t in range(len(X)):
obs = X[t]
error = error_vals[t]
if error > threshold:
if i == 0:
time_series_with_outliers1.at[ts.index[t], 'Outlier'] = 'True'
elif i == 1:
time_series_with_outliers2.at[ts.index[t], 'Outlier'] = 'True'
elif i == 2:
time_series_with_outliers3.at[ts.index[t], 'Outlier'] = 'True'
outlier_points.append(obs)
outlier_indices.append(ts.index[t])
progress = progress + 1
progress_bar_sliding_window.update(progress) # advance progress bar
if i == 0:
outliers1 = outliers1.append(pd.Series(outlier_points, index=outlier_indices))
elif i == 1:
outliers2 = outliers2.append(pd.Series(outlier_points, index=outlier_indices))
elif i == 2:
outliers3 = outliers3.append(pd.Series(outlier_points, index=outlier_indices))
# Plot anomalies
ax = ts.plot(color='#192C87', title=ds_name + ' with ' + alg_name + ' Outliers', label=var_name, figsize=(14, 6))
normal_model.plot(color='#0CCADC', label=alg_name, linewidth=1.5)
if len(outliers1) > 0:
print('Detected outliers (' + ds_name + ', ' + str(stds[0]) + ' stds): ' + str(len(outliers1)))
outliers1.plot(color='orange', style='.', label='Outliers (' + str(stds[0]) + '$\sigma$)')
if len(outliers2) > 0:
print('Detected outliers (' + ds_name + ', ' + str(stds[1]) + ' stds): ' + str(len(outliers2)))
outliers2.plot(color='orangered', style='.', label='Outliers (' + str(stds[1]) + '$\sigma$)')
if len(outliers3) > 0:
print('Detected outliers (' + ds_name + ', ' + str(stds[2]) + ' stds): ' + str(len(outliers3)))
outliers3.plot(color='crimson', style='.', label='Outliers (' + str(stds[2]) + '$\sigma$)')
ax.set(xlabel='Time', ylabel=var_name)
pyplot.legend(loc='best')
# Save plot
if plot_save_path is not None:
plot_dir = plot_save_path[:plot_save_path.rfind('/') + 1]
if not os.path.exists(plot_dir):
os.makedirs(plot_dir)
pyplot.savefig(plot_save_path, dpi=500)
pyplot.show()
pyplot.clf()
# Save data
if data_save_path is not None:
data_dir = data_save_path[:data_save_path.rfind('/') + 1]
if not os.path.exists(data_dir):
os.makedirs(data_dir)
file1 = data_save_path[:data_save_path.rfind('.csv')] + '_' + str(stds[0]) + '_stds.csv'
file2 = data_save_path[:data_save_path.rfind('.csv')] + '_' + str(stds[1]) + '_stds.csv'
file3 = data_save_path[:data_save_path.rfind('.csv')] + '_' + str(stds[2]) + '_stds.csv'
time_series_with_outliers1.to_csv(file1)
time_series_with_outliers2.to_csv(file2)
time_series_with_outliers3.to_csv(file3)
return time_series_with_outliers1, time_series_with_outliers2, time_series_with_outliers3
if __name__ == "__main__":
datasets = ['Data/WheelRPM.csv']
for ds in range(len(datasets)):
ds_name = datasets[ds][5:-4] # drop 'Data/' and '.csv'
file = 'save/datasets/' + ds_name + '/rrcf/data/' + ds_name + '_with_rrcf_scores.csv'
ts_with_model = pd.read_csv(file, header=0, parse_dates=[0], index_col=0, date_parser=parser)
# The very last anomaly scores are null, so fill them with 0s
ts_with_model.fillna(value=0, inplace=True)
var_name = ts_with_model.columns[0]
alg_name = ts_with_model.columns[1]
X = ts_with_model[var_name]
Y = ts_with_model[alg_name]
plot_file = './test_dir/datasets/' + ds_name + '/rolling mean/plots/' + ds_name + '_rolling_mean_outliers_from_dynamic_thresholding.png'
data_file = './test_dir/datasets/' + ds_name + '/rolling mean/data/' + ds_name + '_rolling_mean_outliers_from_dynamic_thresholding.csv'
data = detect_anomalies(X, Y, ds_name, var_name, alg_name, outlier_def='dynamic',
plot_save_path=plot_file, data_save_path=data_file)