使用 AfterPane.elementCountAtMinimum 触发器在使用数据流运行器运行时不起作用,但在本地运行时可以正常工作。 在数据流上运行时,它仅生成一个窗格。
目标是从云SQL中提取数据,转换并写入云存储。 但是,内存中要保留的数据太多,因此需要将其拆分并分块写入云存储。 这就是我希望它能做到的。
完整的代码是:
val pipe = sc.jdbcSelect(getReadOptions(connOptions, stmt))
.applyTransform(ParDo.of(new Translator()))
.map(row => row.mkString("|"))
// produce one global window with one pane per ~500 records
.withGlobalWindow(WindowOptions(
trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(500)),
accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES
))
val out = TextIO
.write()
.to("gs://test-bucket/staging")
.withSuffix(".txt")
.withNumShards(1)
.withShardNameTemplate("-P-S")
.withWindowedWrites() // gets us one file per window & pane
pipe.saveAsCustomOutput("writer",out)
我认为问题的根源可能是JdbcIO
类是作为PTransform<PBegin,PCollection>
实现的,并且对 processElement 的单个调用会输出整个 SQL 查询结果:
public void processElement(ProcessContext context) throws Exception {
try (PreparedStatement statement =
connection.prepareStatement(
query.get(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
statement.setFetchSize(fetchSize);
parameterSetter.setParameters(context.element(), statement);
try (ResultSet resultSet = statement.executeQuery()) {
while (resultSet.next()) {
context.output(rowMapper.mapRow(resultSet));
}
}
}
}
最后,我有两个问题需要解决: 1. 进程将耗尽内存,以及 2.数据已写入单个文件。
Beam的JdbcIO和Cloud SQL没有办法解决问题1,因为它使用MySQL驱动程序的方式。 驱动程序本身在对executeStatement
的单个调用中加载整个结果。 有一种方法可以让驱动程序流式传输结果,但我必须实现自己的代码才能做到这一点。 具体来说,我为 JDBC 实现了一个 BoundedSource。
对于第二个问题,我使用行号来设置每个元素的时间戳。 这允许我使用FixedWindows
显式控制每个窗口中的行数。
elementCountAtMinimum是一个下限,因此只创建一个窗格是运行器可以执行的有效选项。
为批处理管道执行此操作时,有几个选项:
- 允许运行器决定文件的大小以及写入的分片数:
val pipe = sc.jdbcSelect(getReadOptions(connOptions, stmt))
.applyTransform(ParDo.of(new Translator()))
.map(row => row.mkString("|"))
val out = TextIO
.write()
.to("gs://test-bucket/staging")
.withSuffix(".txt")
pipe.saveAsCustomOutput("writer",out)
当TextIO具有GroupByKey或支持在其前面拆分的源时,这通常是最快的选项。据我所知,JDBC 不支持拆分,因此最好的选择是在jdbcSelect之后添加 Reshuffle,这将在从数据库中读取数据后启用处理的并行化。
- 使用 GroupIntoBatches 转换手动分组到批次中。
val pipe = sc.jdbcSelect(getReadOptions(connOptions, stmt))
.applyTransform(ParDo.of(new Translator()))
.map(row => row.mkString("|"))
.apply(GroupIntoBatches.ofSize(500))
val out = TextIO
.write()
.to("gs://test-bucket/staging")
.withSuffix(".txt")
.withNumShards(1)
pipe.saveAsCustomOutput("writer",out)
通常,这将比选项 #1 慢,但它确实允许您选择每个文件写入的记录数。
还有其他几种方法可以做到这一点,它们的优缺点,但上述两种可能是最接近您想要的。如果你在你的问题中添加更多细节,我可能会进一步修改这个问题。