Flink 消耗的内存比预期的要多



我正在使用Flink 1.4.1来处理事务事件,并使用HDFS来存储检查点信息以实现容错。

创建了一个作业来聚合有关客户端、星期几和一天中的小时的信息,从而创建配置文件,如下面的代码所示。

val stream = env.addSource(consumer)
val result = stream
.map(openTransaction => {
val transactionDate = openTransaction.get("transactionDate")
val date = if (transactionDate.isTextual)
LocalDateTime.parse(transactionDate.asText, DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC).toEpochMilli
else
transactionDate.asLong
(openTransaction.get("clientId").asLong, openTransaction.get("amount").asDouble, new Timestamp(date))
})
.keyBy(0)
.window(SlidingEventWeekTimeWindows.of(Time.days(28), Time.hours(1)))
.sum(1)

在上面的代码中,流有三个字段:"交易日期"、"客户端 ID"和"金额"。我们通过 clientId 和一个滑动窗口对金额求和进行键控流。我们的数据库中大约有 100.000 个唯一的活跃客户端 ID。

运行一段时间后,作业使用的总 RAM 稳定在36 GB,但 HDFS 中存储的检查点仅使用3 GB。有没有办法减少作业的 RAM 使用量,也许通过配置 Flink 的复制因子或使用 RocksDB?

对于此状态大小,绝对应该考虑使用 RocksDB,并且根据使用模式,可以有更小的检查点,因为它通过仅复制新的或更新的 SST 来增量执行它。

请记住一些需要了解的事情:

  • 每个有状态算子并行子任务都有自己的 RocksDB 实例。
  • 如果您确实切换到 RocksDB 进行检查点,并且它 开始运行的速度比您需要的慢,请确保 您使用的序列化尽可能高效。
  • Flink 根据您的后备文件系统提供了一些预定义的选项,请确保您选择得当
  • 如果预定义的选项不适合您,您可以覆盖 RocksDB 后端的 OptionsFactory 并微调各个 RocksDB 选项

关于 Flink 中具有键控时间窗口的内存使用情况,需要注意的另一件事是,如果您要进入数十万或数百万,"计时器"可能会占用大量内存。Flink 计时器是基于堆的(截至撰写本文时(,并且独立于您的状态后端进行同步检查点。

相关内容

  • 没有找到相关文章

最新更新