Akka 流 kafka 提交偏移量后过滤



我正在尝试对 akka 流中的偏移量采用至少一次提交策略,但我无法理解我在流上使用过滤器的情况的预期模式是什么。

我的期望是,没有一个过滤的消息会提交它们的偏移量,因此它们最终将进入无限的处理循环。

阐明这一点的一个荒谬的例子是像这样过滤所有消息:

Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.filter(_ => false)
.mapAsync(3)(_.committableOffset.commitScaladsl()) 
.runWith(Sink.ignore)

我只能看到将过滤器包装在流中的解决方案,该流检查在这种情况下逻辑是否会过滤掉并提交,但这似乎并不优雅,并且降低了具有过滤器形状的价值。

过滤不是一件罕见的事情,但我看不到任何优雅的偏移方式? 对我来说,框架没有办法做到这一点似乎很奇怪,所以我错过了什么?

我无法在当前的 akka 实现中找到解决方案来更智能地提交索引,因此我将责任委托给 kafka 在 kafka 级别设置自动提交,并将其与应用程序的优雅关闭策略相结合,因此当蓝/绿部署发生时,所有消息都在应用程序关闭之前处理。

  • 自动提交为 true:
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("group1")
      .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
  • 正常关机:
val actorMaterializer = ActorMaterializer(
  ActorMaterializerSettings(system)
scala.sys.addShutdownHook {
        actorMaterializer.system.terminate()
        Await.result(actorMaterializer.system.whenTerminated, 30.seconds)
}

相关内容

  • 没有找到相关文章

最新更新