云数据流全局窗口触发器被忽略



使用 AfterPane.elementCountAtMinimum 触发器在使用数据流运行器运行时不起作用,但在本地运行时可以正常工作。 在数据流上运行时,它仅生成一个窗格。

目标是从云SQL中提取数据,转换并写入云存储。 但是,内存中要保留的数据太多,因此需要将其拆分并分块写入云存储。 这就是我希望它能做到的。

完整的代码是:

val pipe = sc.jdbcSelect(getReadOptions(connOptions, stmt))
.applyTransform(ParDo.of(new Translator()))
.map(row => row.mkString("|"))
// produce one global window with one pane per ~500 records
.withGlobalWindow(WindowOptions(
trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(500)),
accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES
))
val out = TextIO
.write()
.to("gs://test-bucket/staging")
.withSuffix(".txt")
.withNumShards(1)
.withShardNameTemplate("-P-S")
.withWindowedWrites() // gets us one file per window & pane
pipe.saveAsCustomOutput("writer",out)

我认为问题的根源可能是JdbcIO类是作为PTransform<PBegin,PCollection>实现的,并且对 processElement 的单个调用会输出整个 SQL 查询结果:

public void processElement(ProcessContext context) throws Exception {
try (PreparedStatement statement =
connection.prepareStatement(
query.get(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
statement.setFetchSize(fetchSize);
parameterSetter.setParameters(context.element(), statement);
try (ResultSet resultSet = statement.executeQuery()) {
while (resultSet.next()) {
context.output(rowMapper.mapRow(resultSet));
}
}
}
}

最后,我有两个问题需要解决: 1. 进程将耗尽内存,以及 2.数据已写入单个文件。

Beam的JdbcIO和Cloud SQL没有办法解决问题1,因为它使用MySQL驱动程序的方式。 驱动程序本身在对executeStatement的单个调用中加载整个结果。 有一种方法可以让驱动程序流式传输结果,但我必须实现自己的代码才能做到这一点。 具体来说,我为 JDBC 实现了一个 BoundedSource。

对于第二个问题,我使用行号来设置每个元素的时间戳。 这允许我使用FixedWindows显式控制每个窗口中的行数。

elementCountAtMinimum是一个下限,因此只创建一个窗格是运行器可以执行的有效选项。

为批处理管道执行此操作时,有几个选项:

  1. 允许运行器决定文件的大小以及写入的分片数:
val pipe = sc.jdbcSelect(getReadOptions(connOptions, stmt))
.applyTransform(ParDo.of(new Translator()))
.map(row => row.mkString("|"))
val out = TextIO
.write()
.to("gs://test-bucket/staging")
.withSuffix(".txt")
pipe.saveAsCustomOutput("writer",out)

TextIO具有GroupByKey或支持在其前面拆分的源时,这通常是最快的选项。据我所知,JDBC 不支持拆分,因此最好的选择是在jdbcSelect之后添加 Reshuffle,这将在从数据库中读取数据后启用处理的并行化。

  1. 使用 GroupIntoBatches 转换手动分组到批次中。
val pipe = sc.jdbcSelect(getReadOptions(connOptions, stmt))
.applyTransform(ParDo.of(new Translator()))
.map(row => row.mkString("|"))
.apply(GroupIntoBatches.ofSize(500))
val out = TextIO
.write()
.to("gs://test-bucket/staging")
.withSuffix(".txt")
.withNumShards(1)
pipe.saveAsCustomOutput("writer",out)

通常,这将比选项 #1 慢,但它确实允许您选择每个文件写入的记录数。

还有其他几种方法可以做到这一点,它们的优缺点,但上述两种可能是最接近您想要的。如果你在你的问题中添加更多细节,我可能会进一步修改这个问题。

相关内容

  • 没有找到相关文章

最新更新