我正在尝试对 akka 流中的偏移量采用至少一次提交策略,但我无法理解我在流上使用过滤器的情况的预期模式是什么。
我的期望是,没有一个过滤的消息会提交它们的偏移量,因此它们最终将进入无限的处理循环。
阐明这一点的一个荒谬的例子是像这样过滤所有消息:
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.filter(_ => false)
.mapAsync(3)(_.committableOffset.commitScaladsl())
.runWith(Sink.ignore)
我只能看到将过滤器包装在流中的解决方案,该流检查在这种情况下逻辑是否会过滤掉并提交,但这似乎并不优雅,并且降低了具有过滤器形状的价值。
过滤不是一件罕见的事情,但我看不到任何优雅的偏移方式? 对我来说,框架没有办法做到这一点似乎很奇怪,所以我错过了什么?
我无法在当前的 akka 实现中找到解决方案来更智能地提交索引,因此我将责任委托给 kafka 在 kafka 级别设置自动提交,并将其与应用程序的优雅关闭策略相结合,因此当蓝/绿部署发生时,所有消息都在应用程序关闭之前处理。
- 自动提交为 true:
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
- 正常关机:
val actorMaterializer = ActorMaterializer(
ActorMaterializerSettings(system)
scala.sys.addShutdownHook {
actorMaterializer.system.terminate()
Await.result(actorMaterializer.system.whenTerminated, 30.seconds)
}