我使用Sagemaker python sdk进行推理工作,并遵循本指南。我正在触发我的sagemaker推断工作从气流与以下python可调用:
def transform(sage_role, inference_file_local_path, **kwargs):
"""
Python callable to execute Sagemaker SDK train job. It takes infer_batch_output, infer_batch_input, model_artifact,
instance_type and infer_file_name as run time parameter.
:param inference_file_local_path: Local entry_point path for Inference file.
:param sage_role: Sagemaker execution role.
"""
model = TensorFlowModel(entry_point=infer_file_name,
source_dir=inference_file_local_path,
model_data=model_artifact,
role=sage_role,
framework_version="2.5.1")
tensorflow_serving_transformer = model.transformer(
instance_count=1,
instance_type=instance_type,
accept="text/csv",
strategy="SingleRecord",
max_payload=10,
max_concurrent_transforms=10,
output_path=batch_output)
return tensorflow_serving_transformer.transform(data=batch_input, content_type='text/csv')
和我简单的inference.py
看起来像:
def input_handler(data, context):
""" Pre-process request input before it is sent to TensorFlow Serving REST API
Args:
data (obj): the request data, in format of dict or string
context (Context): an object containing request and configuration details
Returns:
(dict): a JSON-serializable dict that contains request body and headers
"""
if context.request_content_type == 'application/x-npy':
# very simple numpy handler
payload = np.load(data.read().decode('utf-8'))
x_user_feature = np.asarray(payload.item().get('test').get('feature_a_list'))
x_channel_feature = np.asarray(payload.item().get('test').get('feature_b_list'))
examples = []
for index, elem in enumerate(x_user_feature):
examples.append({'feature_a_list': elem, 'feature_b_list': x_channel_feature[index]})
return json.dumps({'instances': examples})
if context.request_content_type == 'text/csv':
payload = pd.read_csv(data)
print("Model name is ..............")
model_name = context.model_name
print(model_name)
examples = []
row_ch = []
if config_exists(model_bucket, "{}{}".format(config_path, model_name)):
config_keys = get_s3_json_file(model_bucket, "{}{}".format(config_path, model_name))
feature_b_list = config_keys["feature_b_list"].split(",")
row_ch = [float(ch_feature_str) for ch_feature_str in feature_b_list]
if "column_names" in config_keys.keys():
cols = config_keys["column_names"].split(",")
payload.columns = cols
for index, row in payload.iterrows():
row_user = row['feature_a_list'].replace('[', '').replace(']', '').split()
row_user = [float(x) for x in row_user]
if not row_ch:
row_ch = row['feature_b_list'].replace('[', '').replace(']', '').split()
row_ch = [float(x) for x in row_ch]
example = {'feature_a_list': row_user, 'feature_b_list': row_ch}
examples.append(example)
raise ValueError('{{"error": "unsupported content type {}"}}'.format(
context.request_content_type or "unknown"))
def output_handler(data, context):
"""Post-process TensorFlow Serving output before it is returned to the client.
Args:
data (obj): the TensorFlow serving response
context (Context): an object containing request and configuration details
Returns:
(bytes, string): data to return to client, response content type
"""
if data.status_code != 200:
raise ValueError(data.content.decode('utf-8'))
response_content_type = context.accept_header
prediction = data.content
return prediction, response_content_type
它工作得很好,但是我想将自定义参数传递给inference.py
,以便我可以根据需求相应地修改输入数据。我想使用配置文件每个需求,并根据模型名称从s3下载,但由于我使用model_data
并在运行时通过model.tar.gz
,context.model_name
始终是None
。
是否有一种方法,我可以通过运行时参数inference.py
,我可以使用自定义?在文档中,我看到sagemaker提供了custom_attributes
,但我没有看到任何关于如何在inference.py
中使用它和访问它的示例。
custom_attributes (string): content of ‘X-Amzn-SageMaker-Custom-Attributes’ header from the original request. For example, ‘tfs-model-name=half*plus*three,tfs-method=predict’
目前在使用实时端点时,在InvokeEndpoint API调用中支持CustomAttributes
。
作为一个例子,您可以查看将JSON行作为输入传递给Transform作业,该作业包含输入有效负载和一些您可以在inference.py文件中使用的自定义参数。
例如,
{
"input":"1,2,3,4",
"custom_args":"my_custom_arg"
}