Kafka Streams 只提交 KGroupedTable 的最新消息



我有Kafka Streams应用程序如下:

static KafkaStreams build(AppConfig appConfig, SerdesHelper serdes) {
  final KStreamBuilder builder = new KStreamBuilder();
  builder
      .table(serdes.sourceKeySerde, serdes.sourceValueSerde, appConfig.sourceTopic)
      .groupBy(StreamBuilder::groupByMapper, serdes.intSerde, serdes.longSerde)
      .aggregate(
          StreamBuilder::initialize,
          StreamBuilder::add,
          StreamBuilder::subtract,
          serdes.sinkValueSerde)
      .to(serdes.intSerde, serdes.sinkValueSerde, appConfig.sinkTopic);
  return new KafkaStreams(builder, appConfig.streamConfig);
}

我的具体示例将记录分组如下

((k, v)) -> ((k), v[])

在用只有两个唯一键的 3.000.000 条消息的虚拟数据运行它时,我最终在不到一分钟的时间内sinkTopic了大约 10.000 条消息,我希望得到 4/6(基于我设法停止应用程序的那一刻(。

如何确保只有具有最新分组值的键才会提交回 Kafka,而不是每个中间消息?

它是流处理,而不是批处理。没有"最新的分组值"——输入是无限的,因此,输出是无限的......

您只能通过以下方式减少中间体的数量

  1. 增加 KTable 缓存大小(但这似乎不是您的情况问题,因为您只有 2 个唯一键,因此如果您没有禁用缓存或
  2. 增加提交间隔

相关内容

  • 没有找到相关文章

最新更新