应用TensorFlow变换来变换/缩放生产中的特征



概述

我按照以下指南编写了TF Records,在那里我使用tf.Transform来预处理我的特性。现在,我想部署我的模型,为此,我需要在真实的实时数据上应用这个预处理功能。

我的方法

首先,假设我有两个功能:

features = ['amount', 'age']

我有来自Apache Beam的transform_fn,位于working_dir=gs://path-to-transform-fn/

然后我使用加载转换函数

tf_transform_output = tft.TFTransformOutput(working_dir)

我认为在生产中提供服务的最简单方法是获得一个处理过的数据的numpy数组,并调用model.predict()(我使用的是Keras模型)。

为此,我认为transform_raw_features()方法正是我所需要的。

然而,似乎在构建了模式之后:

raw_features = {}
for k in features:
raw_features.update({k: tf.constant(1)})
print(tf_transform_output.transform_raw_features(raw_features))

我得到:

AttributeError: 'Tensor' object has no attribute 'indices'

现在,我假设发生这种情况是因为我在preprocessing_fn中定义模式时使用了tf.VarLenFeature()

def preprocessing_fn(inputs):
outputs = inputs.copy()
for _ in features:
outputs[_] = tft.scale_to_z_score(outputs[_])

我使用构建元数据

RAW_DATA_FEATURE_SPEC = {}
for _ in features:
RAW_DATA_FEATURE_SPEC[_] = tf.VarLenFeature(dtype=tf.float32)
RAW_DATA_METADATA = dataset_metadata.DatasetMetadata(
dataset_schema.from_feature_spec(RAW_DATA_FEATURE_SPEC))

简而言之,给定一本字典:

d = {'amount': [50], 'age': [32]},我想应用这个transform_fn,并适当地缩放这些值以输入到我的模型中进行预测。在pre_processing()函数处理数据之前,这个字典正是我的PCollection的格式。

管道结构:

class BeamProccess():
def __init__(self):
# init 
self.run()

def run(self):
def preprocessing_fn(inputs):
# outputs = { 'id' : [list], 'amount': [list], 'age': [list] }
return outputs
with beam.Pipeline(options=self.pipe_opt) as p:
with beam_impl.Context(temp_dir=self.google_cloud_options.temp_location):
data = p | "read_table" >> beam.io.Read(table_bq) 
| "create_data" >> beam.ParDo(ProcessFn())
transformed_dataset, transform_fn = (
(train, RAW_DATA_METADATA) | beam_impl.AnalyzeAndTransformDataset(
preprocessing_fn))
transformed_data, transformed_metadata = transformed_dataset
transformed_data | "WriteTrainTFRecords" >> tfrecordio.WriteToTFRecord(
file_path_prefix=self.JOB_DIR + '/train/data',
file_name_suffix='.tfrecord',
coder=example_proto_coder.ExampleProtoCoder(transformed_metadata.schema))
_ = (
transform_fn
| 'WriteTransformFn' >>
transform_fn_io.WriteTransformFn(path=self.JOB_DIR + '/transform/'))

最后ParDo()是:

class ProcessFn(beam.DoFn):
def process(self, element):
yield { 'id' : [list], 'amount': [list], 'age': [list] }

问题出在代码段上

raw_features = {}
for k in features:
raw_features.update({k: tf.constant(1)})
print(tf_transform_output.transform_raw_features(raw_features))

在这段代码中,您构建了一个字典,其中的值是张量。就像你说的,这对VarLenFeature不起作用。尝试对FixedLenFeature使用tf.placeholder,对VarLenFeature使用tf.sparse_placeholder,而不是使用tf.constant

最新更新