Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Ceph Upstream Prediction Models #1

293 changes: 291 additions & 2 deletions device_prediction/model.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,282 @@
#!/usr/bin/env python3
# vim: ts=4 sw=4 expandtab
"""Machine learning model for disk failure prediction.

This classes defined here provide the disk failure prediction module.
RHDiskFailurePredictor uses the models developed at the AICoE in the
Office of the CTO at Red Hat. These models were built using the open
source Backblaze SMART metrics dataset.
PSDiskFailurePredictor uses the models developed by ProphetStor as an
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can also remove this ProphetStor reference

example.

An instance of the predictor is initialized by providing the path to trained
models. Then, to predict hard drive health and deduce time to failure, the
predict function is called with 6 days worth of SMART data from the hard drive.
It will return a string to indicate disk failure status: "Good", "Warning",
"Bad", or "Unknown".

An example code is as follows:

>>> model = disk_failure_predictor.RHDiskFailurePredictor()
>>> status = model.initialize("./models")
>>> if status:
>>> model.predict(disk_days)
'Bad'
"""
import os
import pickle
import logging
import argparse

import numpy as np

import json
import sys
from enum import Enum


s_Reallocated_Sector_Count = '5'
s_Reported_Uncorrectable_Errors = '187'
s_Command_Timeout = '188'
s_Current_Pending_Sector_Count = '197'
s_Offline_Uncorrectable = '198'


def get_diskfailurepredictor_path():
path = os.path.abspath(__file__)
dir_path = os.path.dirname(path)
return dir_path


class RHDiskFailurePredictor(object):
"""Disk failure prediction module developed at Red Hat

This class implements a disk failure prediction module.
"""

# json with manufacturer names as keys
# and features used for prediction as values
CONFIG_FILE = "config.json"
PREDICTION_CLASSES = {-1: "Unknown", 0: "Good", 1: "Warning", 2: "Bad"}

# model name prefixes to identify vendor
MANUFACTURER_MODELNAME_PREFIXES = {
"WD": "WDC", # for cases like "WDxxx"
"WDC": "WDC",
"Toshiba": "Toshiba", # for cases like "Toshiba xxx"
"TOSHIBA": "Toshiba", # for cases like "TOSHIBA xxx"
"toshiba": "Toshiba", # for cases like "toshiba xxx"
"ST": "Seagate", # for cases like "STxxxx"
"Seagate": "Seagate", # for cases like "Seagate BarraCuda ZAxxx"
"ZA": "Seagate", # for cases like "ZAxxxx"
"Hitachi": "Hitachi",
"HGST": "HGST",
}

LOGGER = logging.getLogger()

def __init__(self):
"""
This function may throw exception due to wrong file operation.
"""
self.model_dirpath = ""
self.model_context = {}

def initialize(self, model_dirpath):
"""Initialize all models. Save paths of all trained model files to list

Arguments:
model_dirpath {str} -- path to directory of trained models

Returns:
str -- Error message. If all goes well, return None
"""
# read config file as json, if it exists
config_path = os.path.join(model_dirpath, self.CONFIG_FILE)
if not os.path.isfile(config_path):
return "Missing config file: " + config_path
else:
with open(config_path) as f_conf:
self.model_context = json.load(f_conf)

# ensure all manufacturers whose context is defined in config file
# have models and scalers saved inside model_dirpath
for manufacturer in self.model_context:
scaler_path = os.path.join(model_dirpath, manufacturer + "_scaler.pkl")
if not os.path.isfile(scaler_path):
return "Missing scaler file: {}".format(scaler_path)
model_path = os.path.join(model_dirpath, manufacturer + "_predictor.pkl")
if not os.path.isfile(model_path):
return "Missing model file: {}".format(model_path)

self.model_dirpath = model_dirpath

def __preprocess(self, disk_days, manufacturer):
"""Scales and transforms input dataframe to feed it to prediction model

Arguments:
disk_days {list} -- list in which each element is a dictionary with key,val
as feature name,value respectively.
e.g.[{'smart_1_raw': 0, 'user_capacity': 512 ...}, ...]
manufacturer {str} -- manufacturer of the hard drive

Returns:
numpy.ndarray -- (n, d) shaped array of n days worth of data and d
features, scaled
"""
# get the attributes that were used to train model for current manufacturer
try:
model_smart_attr = self.model_context[manufacturer]
except KeyError as e:
RHDiskFailurePredictor.LOGGER.debug(
"No context (SMART attributes on which model has been trained) found for manufacturer: {}".format(
manufacturer
)
)
return None

# from the input json construct a list such that, each element is
# a tuple representing SMART data from one day e.g. (60, 100, 500.2)
days = sorted(disk_days['smart_data'].keys())
values = []
for day in days:
curr_day_values = []
for attr in model_smart_attr:
if 'raw' in attr:
curr_day_values.append(
disk_days['smart_data'][day]['attr'][attr.split('_')[1]]['val_raw']
)
elif 'normalized' in attr:
curr_day_values.append(
disk_days['smart_data'][day]['attr'][attr.split('_')[1]]['val_norm']
)
elif 'user_capacity' in attr:
curr_day_values.append(
disk_days['capacity_bytes']
)
else:
raise ValueError('Unknown attribute found in config')
values.append(
curr_day_values
)

# create a numpy structured array i.e. include "column names" and
# "column dtypes" for the input matrix `values`
try:
struc_dtypes = [(attr, np.float64) for attr in model_smart_attr]
disk_days_sa = np.array(values, dtype=struc_dtypes)
except KeyError as e:
RHDiskFailurePredictor.LOGGER.debug(
"Mismatch in SMART attributes used to train model and SMART attributes available"
)
return None

# view structured array as 2d array for applying rolling window transforms
# do not include capacity_bytes in this. only use smart_attrs
disk_days_attrs = disk_days_sa[[attr for attr in model_smart_attr if 'smart_' in attr]]\
.view(np.float64).reshape(disk_days_sa.shape + (-1,))

# featurize n (6 to 12) days data - mean,std,coefficient of variation
# current model is trained on 6 days of data because that is what will be
# available at runtime

# rolling time window interval size in days
roll_window_size = 6

# rolling means generator
gen = (disk_days_attrs[i: i + roll_window_size, ...].mean(axis=0) \
for i in range(0, disk_days_attrs.shape[0] - roll_window_size + 1))
means = np.vstack(gen)

# rolling stds generator
gen = (disk_days_attrs[i: i + roll_window_size, ...].std(axis=0, ddof=1) \
for i in range(0, disk_days_attrs.shape[0] - roll_window_size + 1))
stds = np.vstack(gen)

# coefficient of variation
cvs = stds / means
cvs[np.isnan(cvs)] = 0

# combine all extracted features
featurized = np.hstack((
means,
stds,
cvs,
disk_days_sa['user_capacity'][: disk_days_attrs.shape[0] - roll_window_size + 1].reshape(-1, 1)
))

# scale features
scaler_path = os.path.join(self.model_dirpath, manufacturer + "_scaler.pkl")
with open(scaler_path, 'rb') as f:
scaler = pickle.load(f)
featurized = scaler.transform(featurized)
return featurized

@staticmethod
def __get_manufacturer(model_name):
"""Returns the manufacturer name for a given hard drive model name

Arguments:
model_name {str} -- hard drive model name

Returns:
str -- manufacturer name
"""
for prefix, manufacturer in RHDiskFailurePredictor.MANUFACTURER_MODELNAME_PREFIXES.items():
if model_name.startswith(prefix):
return manufacturer
# print error message
RHDiskFailurePredictor.LOGGER.debug(
"Could not infer manufacturer from model name {}".format(model_name)
)

def predict(self, disk_days):
# get manufacturer preferably as a smartctl attribute
manufacturer = disk_days.get("vendor", None)

# if not available then try to infer using model name
if manufacturer is None:
RHDiskFailurePredictor.LOGGER.debug(
'"vendor" field not found in smartctl output. Will try to infer manufacturer from model name.'
)
if "model" not in disk_days.keys():
RHDiskFailurePredictor.LOGGER.debug(
'"model" field not found in smartctl output.'
)
else:
manufacturer = RHDiskFailurePredictor.__get_manufacturer(
disk_days["model"]
)

# print error message and return Unknown
if manufacturer is None:
RHDiskFailurePredictor.LOGGER.debug(
"Manufacturer could not be determiend. This may be because \
DiskPredictor has never encountered this manufacturer before, \
or the model name is not according to the manufacturer's \
naming conventions known to DiskPredictor"
)
return RHDiskFailurePredictor.PREDICTION_CLASSES[-1]
else:
manufacturer = manufacturer.lower()

# preprocess for feeding to model
preprocessed_data = self.__preprocess(disk_days, manufacturer)
if preprocessed_data is None:
return RHDiskFailurePredictor.PREDICTION_CLASSES[-1]

# get model for current manufacturer
model_path = os.path.join(
self.model_dirpath, manufacturer + "_predictor.pkl"
)
with open(model_path, 'rb') as f:
model = pickle.load(f)

# use prediction for latest day as the output
pred_class_id = model.predict(preprocessed_data)[-1]
return RHDiskFailurePredictor.PREDICTION_CLASSES[pred_class_id]


class PreditionResult(Enum):
GOOD = 0
FAIL = 1
Expand All @@ -26,9 +292,32 @@ def simple_prediction(device_data):
return PreditionResult.GOOD

def main():
# args to decide which model to use for prediction
parser = argparse.ArgumentParser()
parser.add_argument(
'--predictor-model', '--pm',
type=str,
choices=['redhat', 'prophetstor', 'dummy'],
default='redhat',
)
args = parser.parse_args()

inp_json = sys.stdin.read()
device_data = json.loads(inp_json)
prediction_result = simple_prediction(device_data)
# fname = 'input_samples/predict_1669.json'
# with open(fname, 'rb') as f:
# device_data = json.load(f)

if args.predictor_model == 'dummy':
prediction_result = simple_prediction(device_data)
elif args.predictor_model == 'redhat':
# init model
predictor = RHDiskFailurePredictor()
predictor.initialize("{}/models/{}".format(get_diskfailurepredictor_path(), args.predictor_model))

# make prediction
prediction_result = predictor.predict(device_data)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'device_data' is not defined.

I think you meant to add after line 305
device_data = json.load(inp_json)
?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, looks like I accidentally deleted that after testing 😅
I'll update it to include device_data = json.loads(inp_json) (coz IIUC inp_json will be a string so loads should be used instead of load)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @chauhankaranraj!

Yes, json.loads().

I still see several warnings and another error:

$ ./predict_device.py 1669 redhat
./model.py:189: FutureWarning: arrays to stack must be passed as a "sequence" type such as list or tuple. Support for non-sequence iterables such as generators is deprecated as of NumPy 1.16 and will raise an error in the future.
  means = np.vstack(gen)
./model.py:194: FutureWarning: arrays to stack must be passed as a "sequence" type such as list or tuple. Support for non-sequence iterables such as generators is deprecated as of NumPy 1.16 and will raise an error in the future.
  stds = np.vstack(gen)
./model.py:197: RuntimeWarning: invalid value encountered in true_divide
  cvs = stds / means
/usr/local/lib64/python3.6/site-packages/sklearn/utils/deprecation.py:143: FutureWarning: The sklearn.preprocessing.data module is  deprecated in version 0.22 and will be removed in version 0.24. The corresponding classes / functions should instead be imported from sklearn.preprocessing. Anything that cannot be imported from sklearn.preprocessing is now part of the private API.
  warnings.warn(message, FutureWarning)
/usr/local/lib64/python3.6/site-packages/sklearn/base.py:334: UserWarning: Trying to unpickle estimator RobustScaler from version 0.19.2 when using version 0.23.2. This might lead to breaking code or invalid results. Use at your own risk.
  UserWarning)
Traceback (most recent call last):
  File "./model.py", line 326, in <module>
    sys.exit(main())
  File "./model.py", line 319, in main
    prediction_result = predictor.predict(device_data)
  File "./model.py", line 264, in predict
    preprocessed_data = self.__preprocess(disk_days, manufacturer)
  File "./model.py", line 212, in __preprocess
    featurized = scaler.transform(featurized)
  File "/usr/local/lib64/python3.6/site-packages/sklearn/preprocessing/_data.py", line 1259, in transform
    X -= self.center_
ValueError: operands could not be broadcast together with shapes (161,70) (67,) (161,70)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yaarith I think most of the warnings are expected future deprecation warnings. But the last UserWarning is something I haven't seen before - are the packages installed according to the requirements? As per requirements.txt, scikit-learn needs to be 0.19.2, but from the output it seems like 0.23.2 is being used?

Could you please lmk how I can replicate the last ValueError? I tried running ./predict_device.py 1669 redhat but I don't have the grafana.dsn file so I get an error. So then I ran ./model.py --predictor-model redhat but that didn't throw this error.

Note: for testing, I had commented out the stdin.read() and loaded the sample input as follows:

# inp_json = sys.stdin.read()
# device_data = json.loads(inp_json)
fname = 'input_samples/predict_1669.json'
with open(fname, 'rb') as f:
    device_data = json.load(f)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think most of the warnings are expected future deprecation warnings

These warnings will become errors soon, and I'd rather have this code stable for a while :-)

As per requirements.txt, scikit-learn needs to be 0.19.2, but from the output it seems like 0.23.2 is being used?

Indeed, 0.23.2 is used. I think this issue relates to the issues Ceph users are having with diskprediction local module. Users who still have version 0.19.2 will eventually upgrade scikit-learn and the module will be broken. Also, the latest version in Ceph is 0.23.2, so we want to keep it on par with that. Is it possible to make it work with the latest scikit-learn version?

Could you please lmk how I can replicate the last ValueError?

You can run:
$ cat ./input_samples/predict_1669.json | ./model.py --predictor-model redhat
which saves you the changes between testing and production versions (no need to add the open file code block).
When I ran the code like this I see the exact same error.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case I prefer we move on to refining the time and confidence granularity of the existing model /
training the new model. So we can use what we have now, then replace it with an improved model later.

Yep, that's exactly what I had in mind.

Regarding ProphetStor: since we will not retrain the model it should not be a problem.

Sounds good!

  1. Please remove any logic from model.py which is not related to 'redhat' model; we will add different models in separate files later.

Cool, shall I remove the placeholder function simple_prediction as well then?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that's exactly what I had in mind.

Can we make sure we train using the latest versions of numpy, scikit-learn, etc.? These models serve Ceph upstream and we want to move forward to the latest releases.

Cool, shall I remove the placeholder function simple_prediction as well then?

Yes, you can remove anything in this file which is not related to 'redhat' model (so anything which was part of the original placeholder model). The idea is that each model will be in a separate file.

The output should go to stdout, please remove the comment from print(prediction_result) (line 303 in model.py).

Oh, I meant to uncomment the line :-)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make sure we train using the latest versions of numpy, scikit-learn, etc.? These models serve Ceph upstream and we want to move forward to the latest releases.

Definitely. IIRC when we trained these models last year, the constraint sklearn==0.19 already existed, so we didn't want to break the existing models by updating it. But since that constraint is not relevant here, we'll use the latest and greatest now :)

Yes, you can remove anything in this file which is not related to 'redhat' model (so anything which was part of the original placeholder model). The idea is that each model will be in a separate file.

Oh, I meant to uncomment the line :-)

Pushed these changes 👍

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

But since that constraint is not relevant here, we'll use the latest and greatest now :)

Sounds good. We do need to understand how we integrate them with future Ceph releases, so the diskprediction module uses them and does not break.

Does model.py need the SMART reports to be consecutive?
So if we have 6 reports with SMART data, but there's a gap between them - will it affect the results in any way?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does model.py need the SMART reports to be consecutive?
So if we have 6 reports with SMART data, but there's a gap between them - will it affect the results in any way?

It won't break, but the output may not be accurate in this case. The model has been trained to predict health based on exactly last 6 days of device behavior. So if the input is data from days other than the last 6 days, then we can't be sure that the result will be meaningful.

elif args.predictor_model == 'prophetstor':
raise NotImplementedError("ProphetStor sample model has not been updated for use on Grafana dashboards")

#print(prediction_result)

Expand Down
4 changes: 4 additions & 0 deletions device_prediction/models/redhat/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"seagate": ["user_capacity", "smart_1_raw", "smart_5_raw", "smart_7_raw", "smart_10_raw", "smart_187_raw", "smart_188_raw", "smart_190_raw", "smart_193_raw", "smart_197_raw", "smart_198_raw", "smart_241_raw", "smart_1_normalized", "smart_5_normalized", "smart_7_normalized", "smart_10_normalized", "smart_187_normalized", "smart_188_normalized", "smart_190_normalized", "smart_193_normalized", "smart_197_normalized", "smart_198_normalized", "smart_241_normalized"],
"hgst": ["user_capacity", "smart_1_normalized", "smart_1_raw", "smart_2_normalized", "smart_2_raw", "smart_3_normalized", "smart_3_raw", "smart_4_raw", "smart_5_normalized", "smart_5_raw", "smart_7_normalized", "smart_7_raw", "smart_8_normalized", "smart_8_raw", "smart_9_normalized", "smart_9_raw", "smart_10_normalized", "smart_10_raw", "smart_12_raw", "smart_192_normalized", "smart_192_raw", "smart_193_normalized", "smart_193_raw", "smart_194_normalized", "smart_194_raw", "smart_196_normalized", "smart_196_raw", "smart_197_normalized", "smart_197_raw", "smart_198_raw", "smart_199_raw"]
}
Binary file not shown.
Binary file added device_prediction/models/redhat/hgst_scaler.pkl
Binary file not shown.
Binary file not shown.
Binary file not shown.
2 changes: 2 additions & 0 deletions server/ceph_telemetry/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ requests
flask
flask_restful
psycopg2
numpy==1.15.1
scikit-learn==0.19.2