我在scikit-learn中创建了一个机器学习模型,需要在生产中使用实时数据进行部署。功能如下所示,例如:
date event_id user_id feature1 feature2 featureX...
2017-01-27 100 5555 1.23 2 2.99
2017-01-27 100 4444 2.55 5 3.16
2017-01-27 100 3333 0.45 3 1.69
2017-01-27 105 1212 3.96 4 0.0
2017-01-27 105 2424 1.55 2 5.56
2017-01-27 105 3636 0.87 4 10.28
所以,每天都有不同的活动。在事件开始之前,我基本上通过从数据库中提取它们来将其存储在数据帧中,并使用pickle scikit模型计算预测,如下所示:
df_X = df.drop(['date', 'event_id', 'user_id'], axis=1)
loaded_model = joblib.load("model.joblib.dat")
prediction = loaded_model.predict_proba(df_X)
然后,我将预测匹配回df,并根据需要将其作为输出发送到API或文件。
当事件启动时,featureX
会不断更新,这是我从API获得的。为了进行更新,我使用的循环遍历每个event_id
和user_id
,并用新的featureX
值更新df
,重新计算并再次发送到输出。
为此,我正在做这样的事情:
# get list of unique event ids
events = set(df['event_id'].tolist())
try:
while True:
start = time.time()
for event in events:
featureX = request.get(API_URL + event)
featureX_json = featureX.json()
for user in featureX_json['users']:
df.loc[df.user_id == user['user_id'],
'featureX'] = user['featureX']
df_X = df.drop(['date', 'event_id', 'user_id'], axis=1)
df['prediction'] = loaded_model.predict_proba(df_X)
# send to API or write to file
end = time.time()
print('recalculation time {} secs'.format(end - start))
except KeyboardInterrupt:
print('exiting !')
这对我来说很好,但整个预测更新在服务器上大约需要4秒,我需要它在1秒以下。我正试图弄清楚我可以在while loop
中更改什么来获得我需要的加速?
在请求event_id = 100
时添加了json的示例URLhttp://myapi/api/event_users/<event_id>
:
{
"count": 3,
"users": [
{
"user_id": 4444,
"featureY": 34,
"featureX": 4.49,
"created": "2017-01-17T13:00:09.065498Z"
},
{
"user_id": 3333,
"featureY": 22,
"featureX": 1.09,
"created": "2017-01-17T13:00:09.065498Z"
},
{
"user_id": 5555,
"featureY": 58,
"featureX": 9.54,
"created": "2017-01-17T13:00:09.065498Z"
}
]
}
# get list of unique event ids
events = df['event_id'].unique().tolist()
try:
while True: # i don't understand why do you need this loop...
start = time.time()
for event in events:
featureX = request.get(API_URL + event)
tmp = pd.DataFrame(featureX.json()['users'])
df.loc[(df.event_id == event), 'featureX'] =
df.loc[df.event_id == event, 'user_id']
.map(tmp.set_index('user_id').featureX)
df_X = df.drop(['date', 'event_id', 'user_id'], axis=1)
df['prediction'] = loaded_model.predict_proba(df_X)
# send to API or write to file
end = time.time()
print('recalculation time {} secs'.format(end - start))
except KeyboardInterrupt:
print('exiting !')
演示:用于event_id == 100
首先让我们从JSON对象创建一个DF:
tmp = pd.DataFrame(featureX_json['users'])
In [33]: tmp
Out[33]:
created featureX featureY user_id
0 2017-01-17T13:00:09.065498Z 4.49 34 4444
1 2017-01-17T13:00:09.065498Z 1.09 22 3333
2 2017-01-17T13:00:09.065498Z 9.54 58 5555
现在我们可以摆脱for user in featureX_json['users']:
循环:
In [29]: df.loc[df.event_id == 100, 'featureX'] =
df.loc[df.event_id == 100, 'user_id'].map(tmp.set_index('user_id').featureX)
In [30]: df
Out[30]:
date event_id user_id feature1 feature2 featureX
0 2017-01-27 100 5555 1.23 2 9.54 # 2.99 -> 9.54
1 2017-01-27 100 4444 2.55 5 4.49 # 3.16 -> 4.49
2 2017-01-27 100 3333 0.45 3 1.09 # 1.69 -> 1.09
3 2017-01-27 105 1212 3.96 4 0.00
4 2017-01-27 105 2424 1.55 2 5.56
5 2017-01-27 105 3636 0.87 4 10.28
最好订阅某种消息队列,比如Kafka。然后,每当FeatureX
更新时,您就可以使用它,而不是在循环中无休止地进行批处理API调用,然后在整个数据源中迭代,等等
关于预测,利用一种更具可扩展性的方法可能是有意义的。您可以将数据帧分割成块,并向可扩展的高吞吐量预测API发出异步请求。使用这种方法,您只受网络延迟和可以同时发出的请求数量的限制。如果预测API可以处理每秒数千个/10千个/100千个请求,那么您的预测时间可以减少到不到一秒(可能只有几百毫秒)。
我的服务mlrequest是一个低延迟、高吞吐量、高可用性的机器学习API,非常适合这种问题。我们可以处理并扩展到每秒成千上万的预测。Scikit Learn模型和Pandas数据帧将在下一个版本(即将发布)中得到支持。下面是一个训练和预测的简单例子。您可以获得一个免费的api密钥,每月可以获得50000个模型事务。
安装mlrequest Python客户端
$pip install mlrequest
培训一个模型并将其部署到世界各地的5个数据中心非常简单:
from mlrequest import Classifier
classifier = Classifier('my-api-key')
features = {'feature1': 'val1','feature2': 100}
training_data = [{'features': features, 'label': 1}, ...]
r = classifier.learn(training_data=training_data, model_name='my-model', class_count=2)
预测
features = [{'feature1': 'val1', 'feature2': 77}, ...]
r = classifier.predict(features=features, model_name='my-model', class_count=2)
r.predict_result
您可以尝试使用算法的加速实现,例如scikit-learn-intelex-https://github.com/intel/scikit-learn-intelex.这是一款免费软件AI加速器,可在各种应用程序中带来超过10-100X的加速度。
这个库将为训练和预测提供巨大的性能改进。
可以实现的加速示例
首次安装包
pip install scikit-learn-intelex
然后添加您的python脚本
from sklearnex import patch_sklearn
patch_sklearn()