以编程方式停止 Alpakka Kafka 流的正确方法



我们尝试将 Akka Streams 与 Alpakka Kafka 一起使用,以使用服务中的事件流。为了处理事件处理错误,我们使用 Kafka 自动提交和多个队列。例如,如果我们有 主题user_created,我们想从产品服务中使用,我们还创建user_created_for_products_faileduser_created_for_products_dead_letter.这两个额外的主题耦合到特定的 Kafka 使用者组。如果一个事件处理失败,它会进入失败的队列,我们尝试在五分钟内再次消费 - 如果它再次失败,它将变成死信。

在部署时,我们希望确保不会丢失事件。因此,我们尝试在停止应用程序之前停止流。正如我所说,我们正在使用自动提交,但所有这些"飞行"的事件尚未处理。流和应用程序停止后,我们可以部署新代码并重新启动应用程序。

阅读文档后,我们已经看到了KillSwitch功能。我们在其中看到的问题是shutdown方法返回Unit而不是像我们预期的那样Future[Unit]。我们不确定使用它不会丢失事件,因为在测试中,它看起来运行得太快而无法正常工作。

作为一种解决方法,我们为每个流创建一个ActorSystem并使用terminate方法(返回Future[Terminate])。此解决方案的问题在于,我们认为为每个流创建ActorSystem不会很好地扩展,并且terminate需要花费大量时间才能解决(在测试中,关闭最多需要一分钟)。

你遇到过这样的问题吗?有没有更快的方法(与ActorSystem.terminate相比)停止流并确保Source发出的所有事件都已处理?

从文档(强调我的):

当使用外部偏移存储时,调用Consumer.Control.shutdown()就足以完成Source,这将开始完成流。

val (consumerControl, streamComplete) =
Consumer
.plainSource(consumerSettings,
Subscriptions.assignmentWithOffset(
new TopicPartition(topic, 0) -> offset
))
.via(businessFlow)
.toMat(Sink.ignore)(Keep.both)
.run()
consumerControl.shutdown()

Consumer.control.shutdown()返回一个Future[Done]。从它的Scaladoc描述:

关闭使用者Source。它将等待未完成的偏移提交请求完成,然后再关闭。

或者,如果您在Kafka 中使用偏移存储,请使用Consumer.Control.drainAndShutdown,这也返回一个Future。再次来自文档(其中包含有关drainAndShutdown幕下所做的事情的更多信息):

val drainingControl =
Consumer
.committableSource(consumerSettings.withStopTimeout(Duration.Zero), Subscriptions.topics(topic))
.mapAsync(1) { msg =>
business(msg.record).map(_ => msg.committableOffset)
}
.toMat(Committer.sink(committerSettings))(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()
val streamComplete = drainingControl.drainAndShutdown()

Scaladoc 对drainAndShutdown的描述:

停止从Source生成消息,等待流完成并关闭使用者Source以便所有消耗的消息到达流的末尾。流完成失败将被传播,源无论如何都会被关闭。

最新更新