写入 TF 记录时 Apache BEAM 管道失败 - 属性错误:'str'对象没有属性'iteritems'



这个问题在周末开始出现。由于某种原因,这感觉是一个数据流问题。

以前,我能够执行脚本并编写TF记录。但是,现在,我无法初始化计算图来处理数据。

追溯是:

Traceback (most recent call last):
  File "my_script.py", line 1492, in <module>
    MyBeamClass()
  File "my_script.py", line 402, in __init__
    self.run()
  File "my_script.py", line 514, in run
    transform_fn_io.WriteTransformFn(path=self.JOB_DIR + '/transform/'))
  File "/anaconda3/envs/ml27/lib/python2.7/site-packages/apache_beam/pipeline.py", line 426, in __exit__
    self.run().wait_until_finish()
  File "/anaconda3/envs/ml27/lib/python2.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1238, in wait_until_finish
    (self.state, getattr(self._runner, 'last_error_msg', None)), self)
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 649, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 176, in execute
    op.start()
  File "apache_beam/runners/worker/operations.py", line 531, in apache_beam.runners.worker.operations.DoOperation.start
    def start(self):
  File "apache_beam/runners/worker/operations.py", line 532, in apache_beam.runners.worker.operations.DoOperation.start
    with self.scoped_start_state:
  File "apache_beam/runners/worker/operations.py", line 533, in apache_beam.runners.worker.operations.DoOperation.start
    super(DoOperation, self).start()
  File "apache_beam/runners/worker/operations.py", line 202, in apache_beam.runners.worker.operations.Operation.start
    def start(self):
  File "apache_beam/runners/worker/operations.py", line 206, in apache_beam.runners.worker.operations.Operation.start
    self.setup()
  File "apache_beam/runners/worker/operations.py", line 480, in apache_beam.runners.worker.operations.DoOperation.setup
    with self.scoped_start_state:
  File "apache_beam/runners/worker/operations.py", line 485, in apache_beam.runners.worker.operations.DoOperation.setup
    pickler.loads(self.spec.serialized_fn))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 247, in loads
    return dill.loads(s)
  File "/usr/local/lib/python2.7/dist-packages/dill/_dill.py", line 317, in loads
    return load(file, ignore)
  File "/usr/local/lib/python2.7/dist-packages/dill/_dill.py", line 305, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 864, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1232, in load_build
    for k, v in state.iteritems():
AttributeError: 'str' object has no attribute 'iteritems'

我正在使用tensorflow == 1.13.1和tensorflow-transform == 0.9.0和apache_beam == 2.7.0

with beam.Pipeline(options=self.pipe_opt) as p:
    with beam_impl.Context(temp_dir=self.google_cloud_options.temp_location):
         # rest of the script
         _ = (
                    transform_fn
                    | 'WriteTransformFn' >>
                    transform_fn_io.WriteTransformFn(path=self.JOB_DIR + '/transform/'))

我遇到相同的错误。

它似乎是由您的本地(或主机)机器的tensorflow-transform版本中的不匹配触发的(在setup.py文件中指定)。

在我的情况下,我在本地计算机上运行tensorflow-transform==0.13,而工人正在运行0.8

将本地版本降级到0.8修复了问题。

最新更新