张量流TFX管道中的图像处理



我正在尝试使用MNIST数据集启动和运行Tensorflow TFX管道。

# Imports
import pandas as pd
import numpy as np
from keras.datasets import mnist
import tensorflow as tf
from tfx import v1 as tfx
import os
from tfx.components import ImportExampleGen
from platform import python_version
python_version() #'3.8.8'
# Load the data - 60,000 training examples and 10,000 testing examples
(train_x, train_y), (test_x, test_y) = mnist.load_data()

设置管道路径

_pipeline_root = './pipeline'
_data_root = './data'
if not os.path.isdir(_pipeline_root) and not os.path.isdir(_data_root):
!mkdir {_pipeline_root}
!mkdir {_data_root}

将数据写入TF。记录格式并保存在eval和train文件中。请注意,MNIST数据以一个28x28的numpy数组开始,并被转换为一个字节字符串,以使其能够作为Tf.record的一部分进行编码。


def _bytes_feature(value):
"""Returns a bytes_list from a string / byte."""
if isinstance(value, type(tf.constant(0))): # if value ist tensor
value = value.numpy() # get value of tensor
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def _int64_feature(value):
"""Returns an int64_list from a bool / enum / int / uint."""
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

def serialize_array(array):
array = tf.io.serialize_tensor(array)
return array
def image_label_to_tf_train(image, label):
image_shape = np.shape(image)
#define the dictionary -- the structure -- of our single example
data = {
'height': _int64_feature(image_shape[0]),
'width': _int64_feature(image_shape[1]),
'raw_image' : _bytes_feature(serialize_array(image)),
'label' : _int64_feature(label)
}
#create an Example, wrapping the single features
return tf.train.Example(features=tf.train.Features(feature=data))
def write_images_to_tfr_short(images, labels, filename:str="images", folder = ""):
if not os.path.isdir(folder):
!mkdir {folder}
filename= folder + "/" + filename+".tfrecords"
writer = tf.io.TFRecordWriter(filename) #create a writer that'll store our data to disk
count = 0
for index in range(len(images)):
#get the data we want to write
current_image = images[index]
current_label = labels[index]
out = image_label_to_tf_train(image=current_image, label=current_label)
writer.write(out.SerializeToString())
count += 1
writer.close()
print(f"Wrote {count} elements to TFRecord")
return count

下一个阶段是调用使用preprocessing_fn的转换组件。这个函数应该处理所有的数据,例如,将图像数组除以255是一个标准的特征处理。但图像仍然是一个字节串,我不能为我的生活弄清楚如何把它变成一个数组。以下是我尝试过的。

def preprocessing_fn(inputs):
"""tf.transform's callback function for preprocessing inputs.
Args:
inputs: map from feature keys to raw not-yet-transformed features.
Returns:
Map from string feature key to transformed feature operations.
"""

# Initialize outputs dictionary
outputs = {}


raw_image_dataset = inputs[_IMAGE_KEY]


img = tf.io.decode_raw(raw_image_dataset, tf.int64)


outputs[_IMAGE_KEY] = img



outputs[_LABEL_KEY] = tf.cast(inputs[_LABEL_KEY], tf.int64)

return outputs

我得到以下错误:

WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Tuple[Dict[str, Union[NoneType, _Dataset]], Union[Dict[str, Dict[str, PCollection]], NoneType], int] instead.
WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Tuple[Dict[str, Union[NoneType, _Dataset]], Union[Dict[str, Dict[str, PCollection]], NoneType], int] instead.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.8 interpreter.
INFO:tensorflow:Assets written to: ./pipeline/Transform/transform_graph/225/.temp_path/tftransform_tmp/26150ae80de847fab932efeb0f0c610f/assets
INFO:tensorflow:Assets written to: ./pipeline/Transform/transform_graph/225/.temp_path/tftransform_tmp/26150ae80de847fab932efeb0f0c610f/assets
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()
/opt/conda/lib/python3.8/site-packages/apache_beam/transforms/core.py in <lambda>(x, *args, **kwargs)
1636   if fn_takes_side_inputs(fn):
-> 1637     wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
1638   else:
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/beam/impl.py in _create_v2_saved_model(tensor_replacement_map, base_temp_dir, preprocessing_fn, input_signature, baseline_analyzers_fingerprint, output_keys_to_name_map)
662   saved_model_dir = beam_common.get_unique_temp_path(base_temp_dir)
--> 663   impl_helper.trace_and_write_v2_saved_model(saved_model_dir, preprocessing_fn,
664                                              input_signature, base_temp_dir,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in trace_and_write_v2_saved_model(saved_model_dir, preprocessing_fn, input_signature, base_temp_dir, baseline_analyzers_fingerprint, tensor_replacement_map, output_keys_to_name_map)
893       analyzer_nodes.TENSOR_REPLACEMENTS):
--> 894     metadata = _trace_and_get_metadata(concrete_transform_fn, structured_inputs,
895                                        preprocessing_fn, base_temp_dir,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in _trace_and_get_metadata(concrete_transform_fn, structured_inputs, preprocessing_fn, base_temp_dir, tensor_replacement_map)
805   return dataset_metadata.DatasetMetadata(
--> 806       schema=schema_inference.infer_feature_schema_v2(
807           concrete_transform_fn.structured_outputs,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in infer_feature_schema_v2(features, concrete_metadata_fn, evaluate_schema_overrides)
255         metadata)
--> 256   return _infer_feature_schema_common(
257       features,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in _infer_feature_schema_common(features, tensor_ranges, feature_annotations, global_annotations, is_evaluation_complete)
300           min=min_value, max=max_value, is_categorical=True)
--> 301   feature_spec = _feature_spec_from_batched_tensors(features,
302                                                     is_evaluation_complete)
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in _feature_spec_from_batched_tensors(tensors, is_evaluation_complete)
128           dim is None for dim in shape.as_list()[1:]):
--> 129         raise ValueError(
130             'Feature {} ({}) had invalid shape {} for FixedLenFeature: apart '
ValueError: Feature raw_image (Tensor("Identity_1:0", shape=(None, 1, None), dtype=int64)) had invalid shape (None, 1, None) for FixedLenFeature: apart from the batch dimension, all dimensions must have known size
During handling of the above exception, another exception occurred:
ValueError                                Traceback (most recent call last)
<ipython-input-37-7beafa4fe436> in <module>
3     schema=schema_gen.outputs['schema'],
4     module_file=os.path.abspath(_mnist_transform_module))
----> 5 context.run(transform, enable_cache=False)
/opt/conda/lib/python3.8/site-packages/tfx/orchestration/experimental/interactive/interactive_context.py in run_if_ipython(*args, **kwargs)
61       # __IPYTHON__ variable is set by IPython, see
62       # https://ipython.org/ipython-doc/rel-0.10.2/html/interactive/reference.html#embedding-ipython.
---> 63       return fn(*args, **kwargs)
64     else:
65       absl.logging.warning(
/opt/conda/lib/python3.8/site-packages/tfx/orchestration/experimental/interactive/interactive_context.py in run(self, component, enable_cache, beam_pipeline_args)
181         telemetry_utils.LABEL_TFX_RUNNER: runner_label,
182     }):
--> 183       execution_id = launcher.launch().execution_id
184 
185     return execution_result.ExecutionResult(
/opt/conda/lib/python3.8/site-packages/tfx/orchestration/launcher/base_component_launcher.py in launch(self)
198       # be immutable in this context.
199       # output_dict can still be changed, specifically properties.
--> 200       self._run_executor(execution_decision.execution_id,
201                          copy.deepcopy(execution_decision.input_dict),
202                          execution_decision.output_dict,
/opt/conda/lib/python3.8/site-packages/tfx/orchestration/launcher/in_process_component_launcher.py in _run_executor(self, execution_id, input_dict, output_dict, exec_properties)
71     # be immutable in this context.
72     # output_dict can still be changed, specifically properties.
---> 73     executor.Do(
74         copy.deepcopy(input_dict), output_dict, copy.deepcopy(exec_properties))
/opt/conda/lib/python3.8/site-packages/tfx/components/transform/executor.py in Do(self, input_dict, output_dict, exec_properties)
581     # remove the `_pip_dependencies` attribute.
582     with udf_utils.TempPipInstallContext(self._pip_dependencies):
--> 583       TransformProcessor().Transform(label_inputs, label_outputs, status_file)
584     logging.debug('Cleaning up temp path %s on executor success', temp_path)
585     io_utils.delete_dir(temp_path)
/opt/conda/lib/python3.8/site-packages/tfx/components/transform/executor.py in Transform(***failed resolving arguments***)
1114     materialization_format = (
1115         transform_paths_file_formats[-1] if materialize_output_paths else None)
-> 1116     self._RunBeamImpl(analyze_data_list, transform_data_list, preprocessing_fn,
1117                       stats_options_updater_fn, force_tf_compat_v1,
1118                       input_dataset_metadata, transform_output_path,
/opt/conda/lib/python3.8/site-packages/tfx/components/transform/executor.py in _RunBeamImpl(self, analyze_data_list, transform_data_list, preprocessing_fn, stats_options_updater_fn, force_tf_compat_v1, input_dataset_metadata, transform_output_path, raw_examples_data_format, temp_path, input_cache_dir, output_cache_dir, disable_statistics, per_set_stats_output_paths, materialization_format, analyze_paths_count, stats_output_paths, make_beam_pipeline_fn)
1496             for dataset in transform_data_list:
1497               infix = 'TransformIndex{}'.format(dataset.index)
-> 1498               (dataset.transformed
1499                | 'EncodeAndSerialize[{}]'.format(infix) >> beam.ParDo(
1500                    self._RecordBatchToExamplesFn(transformed_schema_proto))
/opt/conda/lib/python3.8/site-packages/apache_beam/pipeline.py in __exit__(self, exc_type, exc_val, exc_tb)
594     try:
595       if not exc_type:
--> 596         self.result = self.run()
597         self.result.wait_until_finish()
598     finally:
/opt/conda/lib/python3.8/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
571         finally:
572           shutil.rmtree(tmpdir)
--> 573       return self.runner.run_pipeline(self, self._options)
574     finally:
575       if not is_in_ipython():
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py in run_pipeline(self, pipeline, options)
129       runner = BundleBasedDirectRunner()
130 
--> 131     return runner.run_pipeline(pipeline, options)
132 
133 
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_pipeline(self, pipeline, options)
197         options.view_as(pipeline_options.ProfilingOptions))
198 
--> 199     self._latest_run_result = self.run_via_runner_api(
200         pipeline.to_runner_api(default_environment=self._default_environment))
201     return self._latest_run_result
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_via_runner_api(self, pipeline_proto)
208     # TODO(pabloem, BEAM-7514): Create a watermark manager (that has access to
209     #   the teststream (if any), and all the stages).
--> 210     return self.run_stages(stage_context, stages)
211 
212   @contextlib.contextmanager
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_stages(self, stage_context, stages)
393           )
394 
--> 395           stage_results = self._run_stage(
396               runner_execution_context, bundle_context_manager)
397 
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_stage(self, runner_execution_context, bundle_context_manager)
658     while True:
659       last_result, deferred_inputs, fired_timers, watermark_updates = (
--> 660           self._run_bundle(
661               runner_execution_context,
662               bundle_context_manager,
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_bundle(self, runner_execution_context, bundle_context_manager, data_input, data_output, input_timers, expected_timer_output, bundle_manager)
781         expected_timer_output)
782 
--> 783     result, splits = bundle_manager.process_bundle(
784         data_input, data_output, input_timers, expected_timer_output)
785     # Now we collect all the deferred inputs remaining from bundle execution.
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in process_bundle(self, inputs, expected_outputs, fired_timers, expected_output_timers, dry_run)
1092             process_bundle_descriptor.id,
1093             cache_tokens=[next(self._cache_token_generator)]))
-> 1094     result_future = self._worker_handler.control_conn.push(process_bundle_req)
1095 
1096     split_results = []  # type: List[beam_fn_api_pb2.ProcessBundleSplitResponse]
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py in push(self, request)
376       self._uid_counter += 1
377       request.instruction_id = 'control_%s' % self._uid_counter
--> 378     response = self.worker.do_instruction(request)
379     return ControlFuture(request.instruction_id, response)
380 
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py in do_instruction(self, request)
578     if request_type:
579       # E.g. if register is set, this will call self.register(request.register))
--> 580       return getattr(self, request_type)(
581           getattr(request, request_type), request.instruction_id)
582     else:
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py in process_bundle(self, request, instruction_id)
616         with self.maybe_profile(instruction_id):
617           delayed_applications, requests_finalization = (
--> 618               bundle_processor.process_bundle(instruction_id))
619           monitoring_infos = bundle_processor.monitoring_infos()
620           monitoring_infos.extend(self.state_cache_metrics_fn())
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py in process_bundle(self, instruction_id)
993                   element.timer_family_id, timer_data)
994           elif isinstance(element, beam_fn_api_pb2.Elements.Data):
--> 995             input_op_by_transform_id[element.transform_id].process_encoded(
996                 element.data)
997 
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py in process_encoded(self, encoded_windowed_values)
219       decoded_value = self.windowed_coder_impl.decode_from_stream(
220           input_stream, True)
--> 221       self.output(decoded_value)
222 
223   def monitoring_infos(self, transform_id, tag_to_pcollection_id):
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()
/opt/conda/lib/python3.8/site-packages/apache_beam/transforms/core.py in <lambda>(x, *args, **kwargs)
1635   from apache_beam.transforms.util import fn_takes_side_inputs
1636   if fn_takes_side_inputs(fn):
-> 1637     wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
1638   else:
1639     wrapper = lambda x: [fn(x)]
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/beam/impl.py in _create_v2_saved_model(tensor_replacement_map, base_temp_dir, preprocessing_fn, input_signature, baseline_analyzers_fingerprint, output_keys_to_name_map)
661   """
662   saved_model_dir = beam_common.get_unique_temp_path(base_temp_dir)
--> 663   impl_helper.trace_and_write_v2_saved_model(saved_model_dir, preprocessing_fn,
664                                              input_signature, base_temp_dir,
665                                              baseline_analyzers_fingerprint,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in trace_and_write_v2_saved_model(saved_model_dir, preprocessing_fn, input_signature, base_temp_dir, baseline_analyzers_fingerprint, tensor_replacement_map, output_keys_to_name_map)
892   if not concrete_transform_fn.graph.get_collection(
893       analyzer_nodes.TENSOR_REPLACEMENTS):
--> 894     metadata = _trace_and_get_metadata(concrete_transform_fn, structured_inputs,
895                                        preprocessing_fn, base_temp_dir,
896                                        tensor_replacement_map)
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in _trace_and_get_metadata(concrete_transform_fn, structured_inputs, preprocessing_fn, base_temp_dir, tensor_replacement_map)
804       evaluate_schema_overrides=True)
805   return dataset_metadata.DatasetMetadata(
--> 806       schema=schema_inference.infer_feature_schema_v2(
807           concrete_transform_fn.structured_outputs,
808           concrete_metadata_fn,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in infer_feature_schema_v2(features, concrete_metadata_fn, evaluate_schema_overrides)
254     tensor_annotations, global_annotations = _get_schema_annotations_v2(
255         metadata)
--> 256   return _infer_feature_schema_common(
257       features,
258       tensor_ranges,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in _infer_feature_schema_common(features, tensor_ranges, feature_annotations, global_annotations, is_evaluation_complete)
299       domains[name] = schema_pb2.IntDomain(
300           min=min_value, max=max_value, is_categorical=True)
--> 301   feature_spec = _feature_spec_from_batched_tensors(features,
302                                                     is_evaluation_complete)
303 
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in _feature_spec_from_batched_tensors(tensors, is_evaluation_complete)
127       if is_evaluation_complete and any(
128           dim is None for dim in shape.as_list()[1:]):
--> 129         raise ValueError(
130             'Feature {} ({}) had invalid shape {} for FixedLenFeature: apart '
131             'from the batch dimension, all dimensions must have known size'
ValueError: Feature raw_image (Tensor("Identity_1:0", shape=(None, 1, None), dtype=int64)) had invalid shape (None, 1, None) for FixedLenFeature: apart from the batch dimension, all dimensions must have known size [while running 'Analyze/CreateSavedModel[tf_v2_only]/CreateSavedModel']

我知道标签功能正在工作,因为我可以调用下面的代码并获得打印结果....

transform = tfx.components.Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
module_file=os.path.abspath(_mnist_transform_module))
context.run(transform, enable_cache=False)
# Get the URI of the output artifact representing the transformed examples
train_uri = os.path.join(transform.outputs['transformed_examples'].get()[0].uri, 'Split-train')
# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
for name in os.listdir(train_uri)]
# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
# Decode the first record and print output
for tfrecord in dataset.take(1):
serialized_example = tfrecord.numpy()
example = tf.train.Example()
example.ParseFromString(serialized_example)
print(example)

如果我删除行:

img = tf.io.decode_raw(raw_image_dataset, tf.int64)


outputs[_IMAGE_KEY] = img

输出

features {
feature {
key: "label"
value {
int64_list {
value: 5
}
}
}
}

这显示了我对标签功能所做的工作,但我真的不知道如何转换图像字节。问题的一部分是我不完全确定格式是什么,因为它只是一个张量,非常不透明。似乎给定标签操作,我在一列数据上有效地操作,但再次,无法找出正确的操作或语法

对于任何未来的观众,这是有效的

raw_image_dataset = tf.map_fn(fn = lambda x : tf.io.parse_tensor(x[0], tf.uint8, name=None), elems = raw_image_dataset, fn_output_signature = tf.TensorSpec((28,28),dtype=tf.uint8,    name=None), infer_shape = True)
raw_image_dataset = tf.cast(raw_image_dataset, tf.int64)
outputs[_IMAGE_KEY] = raw_image_dataset

我想我用

解决了这个问题
raw_image_dataset = inputs[_IMAGE_KEY]

raw_image_dataset = tf.map_fn(fn = lambda x : tf.io.decode_image(x[0]) , elems = raw_image_dataset, dtype=tf.uint8)

有一些关于数据作为批处理进入,所以需要映射它,也使用结果张量的正确分量"x[0]",我仍然不是100%确定为什么会这样,但它似乎运行。

现在我正在与TFX作斗争,因为它不会让我输出与进入的内容不同的功能…

相关内容

  • 没有找到相关文章

最新更新