如何与张量流保存的模型预测器并行进行推理?



Tensorflow 版本:1.14

我们当前的设置是使用张量流估计器来执行实时 NER,即一次执行一个文档的推理。我们有 30 个不同的字段要提取,每个字段运行一个模型,因此总共有 30 个模型。

我们当前的设置使用 python 多处理来并行进行推理。(推理是在 CPU 上完成的。此方法在每次进行预测时重新加载模型权重。

使用此处提到的方法,我们将估计器模型导出为tf.saved_model。这按预期工作,因为它不会为每个请求重新加载权重。它也适用于一个进程中的单个字段推理,但不适用于多处理。当我们进行预测函数(predict_fn在链接的帖子中(调用时,所有进程都会挂起。

这篇文章是相关的,但不确定如何适应保存的模型。

为每个预测变量单独导入张量流也不起作用:

class SavedModelPredictor():
def __init__(self, model_path):
import tensorflow as tf
self.predictor_fn = tf.contrib.predictor.from_saved_model(model_path)
def predictor_fn(self, input_dict):
return self.predictor_fn(input_dict)

如何使tf.saved_model与多处理一起工作?

RayServe是Ray的模型服务解决方案,也支持离线批处理。您可以将模型包装在 Ray Serve 的后端,并将其缩放到所需的副本数量。

from ray import serve
client = serve.start()
class MyTFModel:
def __init__(self, model_path):
self.model = ... # load model
@serve.accept_batch
def __call__(self, input_batch):
assert isinstance(input_batch, list)
# forward pass
self.model([item.data for item in input_batch])
# return a list of response
return [...]
client.create_backend("tf", MyTFModel, 
# configure resources
ray_actor_options={"num_cpus": 2, "num_gpus": 1},
# configure replicas
config={
"num_replicas": 2, 
"max_batch_size": 24,
"batch_wait_timeout": 0.5
}
)
client.create_endpoint("tf", backend="tf")
handle = serve.get_handle("tf")
# perform inference on a list of input
futures = [handle.remote(data) for data in fields]
result = ray.get(futures)

尝试使用夜间轮子,这是教程:https://docs.ray.io/en/master/serve/tutorials/batch.html

编辑:更新了 Ray 1.0 的代码示例

好的,所以这个答案中概述的ray方法有效。

构建了这样的类,它在 init 上加载模型并公开一个函数run来执行预测:

import tensorflow as tf
import ray
ray.init()
@ray.remote
class MyModel(object):
def __init__(self, field, saved_model_path):
self.field = field
# load the model once in the constructor
self.predictor_fn = tf.contrib.predictor.from_saved_model(saved_model_path)
def run(self, df_feature, *args):
# ...
# code to perform prediction using self.predictor_fn
# ...
return self.field, list_pred_string, list_pred_proba

然后在主模块中使用上述作为:

# form a dictionary with key 'field' and value MyModel
model_dict = {}
for field in fields:
export_dir = f"saved_model/{field}"
subdirs = [x for x in Path(export_dir).iterdir()
if x.is_dir() and 'temp' not in str(x)]
latest = str(sorted(subdirs)[-1])
model_dict[field] = MyModel.remote(field, latest)

然后使用上面的模型字典做了这样的预测:

results = ray.get([model_dict[field].run.remote(df_feature) for field in fields])

更新:

虽然这种方法有效,但发现并行运行估计器与多处理比并行运行预测器与射线更快。对于大型文档尤其如此。看起来预测器方法可能适用于少量维度以及输入数据不大的情况。也许像这里提到的方法可能对我们的用例更好。

相关内容

  • 没有找到相关文章

最新更新