使用Python的数据流,matchcontinuous不按预期工作



我尝试使用数据流python库,以流式方式从桶中读取数据,使用这个最近可用的链接。

https://beam.apache.org/releases/pydoc/2.33.0/apache_beam.io.fileio.html?highlight=matchall apache_beam.io.fileio.MatchContinuously

下面是我使用的代码片段,用于定期轮询bucket。

(pipeline
| 'Match Files' >> fileio.MatchContinuously(file_pattern="gs://xyz/abc/*.txt", interval=10.0, has_deduplication=True)
| 'Read Matches' >> fileio.ReadMatches()
......

等待一段时间后,此代码失败。有人能帮我解释一下我错过了什么吗?下面是堆栈跟踪。

<PCollection[Read Matches/ParDo(_ReadMatchesFn).None] at 0x7fad0ccb7e80>
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.8 interpreter.
Traceback (most recent call last):
File "<stdin>", line 3, in <module>
File "/usr/local/Caskroom/miniconda/base/lib/python3.8/site-packages/apache_beam/pipeline.py", line 586, in __exit__
self.result = self.run()
File "/usr/local/Caskroom/miniconda/base/lib/python3.8/site-packages/apache_beam/pipeline.py", line 565, in run
return self.runner.run_pipeline(self, self._options)
File "/usr/local/Caskroom/miniconda/base/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
return runner.run_pipeline(pipeline, options)
File "/usr/local/Caskroom/miniconda/base/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 195, in run_pipeline
self._latest_run_result = self.run_via_runner_api(
File "/usr/local/Caskroom/miniconda/base/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 206, in run_via_runner_api
return self.run_stages(stage_context, stages)
File "/usr/local/Caskroom/miniconda/base/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 384, in run_stages
stage_results = self._run_stage(
File "/usr/local/Caskroom/miniconda/base/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 663, in _run_stage
assert (runner_execution_context.watermark_manager.get_stage_node(
AssertionError: wrong timestamp for StageNode<inputs=['ref_PCollection_PCollection_3_split'],side_inputs=[].

通过检查日志,您的代码似乎在Read Matches/ParDo(_ReadMatchesFn)而不是MatchContinuously中失败。

这可能是由于你使用DirectRunner,如果我没有弄错的话,statfuldofns还没有完全支持它(矩阵)。RemoveDuplicates是statfuldofn .

我在Dataflow中测试了你的代码,它工作得很好,没有问题(2.32和2.33)。

(p | MatchContinuously(f"gs://{bucket}/match/*", interval=10.0, has_deduplication=True)
| ReadMatches()
)

同样,您并不真的需要ReadMatches,您可以这样做来读取文本文件。也许你可以测试一下,看看它是否能在DirectRunner

中工作
(p | MatchContinuously(f"gs://{bucket}/match/*", interval=10.0, has_deduplication=True)
| Map(lambda x: x.path)
| ReadAllFromText()
| Map(lambda x: logging.info(x))
)

它们都在Dataflow中工作得很好。

相关内容

  • 没有找到相关文章