避免在 Beam Python SDK 中重新计算所有云存储文件的大小



我正在研究一个从Google Cloud Storage(GCS)目录中读取~500万个文件的管道。我已将其配置为在Google Cloud Dataflow上运行。

问题是,当我启动管道时,需要花费数小时"计算所有文件的大小":

INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 5549.38778591156 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 5480000 files
INFO:apache_beam.io.gcp.gcsio:Finished listing 5483720 files in 7563.196493148804 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished computing size of: 10000 files
[...]

如您所见,计算大约 5.5M 文件的大小需要一个半小时(5549 秒),然后从头开始!又花了 2 个小时运行第二遍,然后它开始了第三次!截至撰写本文时,该作业在数据流控制台中仍然不可用,这让我相信这一切都发生在我的本地机器上,并且没有利用任何分布式计算。

当我使用较小的输入数据集(2 个文件)测试管道时,它会重复大小估计 4 次:

INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.33771586418151855 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.1244659423828125 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.13422417640686035 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:apache_beam.io.gcp.gcsio:Finished listing 2 files in 0.14139890670776367 seconds.

按照这个速度,大约需要 8 个小时才能对所有 5.5M 文件执行 GCS 大小估计 4 次,所有这些都在数据流作业开始之前完成。

我的管道配置了--runner=DataflowRunner选项,因此它应该在数据流中运行:

python bigquery_import.py --runner=DataflowRunner #other options...

管道从 GCS 读取的内容如下所示:

parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
required=True,
help='Input Cloud Storage directory to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
files = p | beam.io.ReadFromText('gs://project/dir/*.har.gz')

有关完整代码,请参阅 GitHub 上的bigquery_import.py。

我很困惑为什么这个繁琐的过程发生在数据流环境之外,以及为什么需要多次完成。我是否正确地从GCS读取了文件,或者是否有更有效的方法?

感谢您报告此问题。光束有两个用于读取文本的变换。ReadFromTextReadAllFromText.ReadFromText会遇到这个问题,但ReadAllFromText不应该。

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/textio.py#L438

ReadAllFromText的缺点是它不会执行动态工作重新平衡,但在读取大量文件时,这应该不是问题。

创建 https://issues.apache.org/jira/browse/BEAM-9620,用于跟踪 ReadFromText(以及一般基于文件的源)的问题。

最新更新