Sagemaker-批处理转换]内部服务器错误:500



我正试图对S3存储桶中的训练数据集进行批量转换。我已关注此链接:https://github.com/aws-samples/quicksight-sagemaker-integration-blog

正在应用转换的训练数据约为35MB。我得到这些错误:

  1. 从算法接收到错误的HTTP状态:500
  2. 服务器遇到内部错误,无法完成您的请求。服务器过载或应用程序中存在错误

随后的过程:

1. s3_input_train = sagemaker.TrainingInput(s3_data='s3://{}/{}/rawtrain/'.format(bucket, prefix), content_type='csv')
2. from sagemaker.sklearn.estimator import SKLearn
sagemaker_session = sagemaker.Session()
script_path = 'preprocessing.py'
sklearn_preprocessor = SKLearn(
entry_point=script_path,
role=role,
train_instance_type="ml.c4.xlarge",
framework_version='0.20.0',
py_version = 'py3',
sagemaker_session=sagemaker_session)
sklearn_preprocessor.fit({'train': s3_input_train})
3. transform_train_output_path = 's3://{}/{}/{}/'.format(bucket, prefix, 'transformtrain-train-output')
scikit_learn_inferencee_model = sklearn_preprocessor.create_model(env={'TRANSFORM_MODE': 'feature-transform'})
transformer_train = scikit_learn_inferencee_model.transformer(
instance_count=1,
assemble_with = 'Line',
output_path = transform_train_output_path,
accept = 'text/csv',
strategy = "MultiRecord",
max_payload =40,
instance_type='ml.m4.xlarge')
4. Preprocess training input
transformer_train.transform(s3_input_train.config['DataSource']['S3DataSource']['S3Uri'], 
content_type='text/csv',
split_type = "Line")
print('Waiting for transform job: ' + transformer_train.latest_transform_job.job_name)
transformer_train.wait()
preprocessed_train_path = transformer_train.output_path + transformer_train.latest_transform_job.job_name
preprocessing.py
from __future__ import print_function
import time
import sys
from io import StringIO
import os
import shutil
import argparse
import csv
import json
import numpy as np
import pandas as pd
import logging
from sklearn.compose import ColumnTransformer
from sklearn.externals import joblib
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import Binarizer, StandardScaler, OneHotEncoder
from sagemaker_containers.beta.framework import (
content_types, encoders, env, modules, transformer, worker)
# Specifying the column names here.
feature_columns_names = [
'A',
'B',
'C',
'D',
'E',
'F',
'G',
'H',
'I',
'J',
'K'
] 
label_column = 'ab'
feature_columns_dtype = {
'A' :  str,
'B' :  np.float64,
'C' :  np.float64,
'D' :  str,
"E" :  np.float64,
'F' :  str,
'G' :  str,
'H' :  np.float64,
'I' :  str,
'J' :  str,
'K':  str,
}
label_column_dtype = {'ab': np.int32}  
def merge_two_dicts(x, y):
z = x.copy()   # start with x's keys and values
z.update(y)    # modifies z with y's keys and values & returns None
return z
def _is_inverse_label_transform():
"""Returns True if if it's running in inverse label transform."""
return os.getenv('TRANSFORM_MODE') == 'inverse-label-transform'
def _is_feature_transform():
"""Returns True if it's running in feature transform mode."""
return os.getenv('TRANSFORM_MODE') == 'feature-transform'

if __name__ == '__main__':
parser = argparse.ArgumentParser()
# Sagemaker specific arguments. Defaults are set in the environment variables.
parser.add_argument('--output-data-dir', type=str, default=os.environ['SM_OUTPUT_DATA_DIR'])
parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAIN'])

args = parser.parse_args()
input_files = [ os.path.join(args.train, file) for file in os.listdir(args.train) ]
if len(input_files) == 0:
raise ValueError(('There are no files in {}.n' +
'This usually indicates that the channel ({}) was incorrectly specified,n' +
'the data specification in S3 was incorrectly specified or the role specifiedn' +
'does not have permission to access the data.').format(args.train, "train"))
raw_data = [ pd.read_csv(
file, 
header=None,
names=feature_columns_names + [label_column],
dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype)) for file in input_files ]
concat_data = pd.concat(raw_data)

numeric_features = list([
'B',
'C',
'E',
'H'
])

numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())])
categorical_features = list(['A','D','F','G','I','J','K'])
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('onehot', OneHotEncoder(handle_unknown='ignore'))])
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)],
remainder="drop")
preprocessor.fit(concat_data)
joblib.dump(preprocessor, os.path.join(args.model_dir, "model.joblib"))
print("saved model!")


def input_fn(input_data, request_content_type):
"""Parse input data payload

We currently only take csv input. Since we need to process both labelled
and unlabelled data we first determine whether the label column is present
by looking at how many columns were provided.
"""


content_type = request_content_type.lower(
) if request_content_type else "text/csv"
content_type = content_type.split(";")[0].strip()


if isinstance(input_data, str):
str_buffer = input_data
else:
str_buffer = str(input_data,'utf-8')

if _is_feature_transform():
if content_type == 'text/csv':
# Read the raw input data as CSV.
df = pd.read_csv(StringIO(input_data),  header=None)
if len(df.columns) == len(feature_columns_names) + 1:
# This is a labelled example, includes the  label
df.columns = feature_columns_names + [label_column]
elif len(df.columns) == len(feature_columns_names):
# This is an unlabelled example.
df.columns = feature_columns_names
return df
else:
raise ValueError("{} not supported by script!".format(content_type))


if _is_inverse_label_transform():
if (content_type == 'text/csv' or content_type == 'text/csv; charset=utf-8'):
# Read the raw input data as CSV.
df = pd.read_csv(StringIO(str_buffer),  header=None)
if len(df.columns) == len(feature_columns_names) + 1:
# This is a labelled example, includes the ring label
df.columns = feature_columns_names + [label_column]
elif len(df.columns) == len(feature_columns_names):
# This is an unlabelled example.
df.columns = feature_columns_names
return df
else:
raise ValueError("{} not supported by script!".format(content_type))


def output_fn(prediction, accept):
"""Format prediction output

The default accept/content-type between containers for serial inference is JSON.
We also want to set the ContentType or mimetype as the same value as accept so the next
container can read the response payload correctly.
"""

accept = 'text/csv'
if type(prediction) is not np.ndarray:
prediction=prediction.toarray()


if accept == "application/json":
instances = []
for row in prediction.tolist():
instances.append({"features": row})
json_output = {"instances": instances}
return worker.Response(json.dumps(json_output), mimetype=accept)
elif accept == 'text/csv':
return worker.Response(encoders.encode(prediction, accept), mimetype=accept)
else:
raise RuntimeException("{} accept type is not supported by this script.".format(accept))

def predict_fn(input_data, model):
"""Preprocess input data

We implement this because the default predict_fn uses .predict(), but our model is a preprocessor
so we want to use .transform().
The output is returned in the following order:

rest of features either one hot encoded or standardized
"""

if _is_feature_transform():
features = model.transform(input_data)

if label_column in input_data:
# Return the label (as the first column) and the set of features.
return np.insert(features.toarray(), 0, pd.get_dummies(input_data[label_column])['True.'], axis=1)
else:
# Return only the set of features
return features

if _is_inverse_label_transform():
features = input_data.iloc[:,0]>0.5
features = features.values
return features

def model_fn(model_dir):
"""Deserialize fitted model
"""
if _is_feature_transform():
preprocessor = joblib.load(os.path.join(model_dir, "model.joblib"))
return preprocessor

请帮忙。

正如我所看到的,你指的是一篇相当古老的帖子,在Github上有关于输入源和配置的问题。

我鼓励你在这里查看最新的例子,这些例子展示了如何使用Amazon QuickLight可视化Amazon SageMaker机器学习预测。

此外,如果问题仍然存在,请使用作业ARN向AWS支持部门打开服务请求,以调查内部服务器错误的未来情况。

相关内容

最新更新