>问题当我重新启动/完成/停止流时,旧的消费者不会死亡/关闭:
[INFO ] a.a.RepointableActorRef -
Message [akka.kafka.KafkaConsumerActor$Internal$Stop$]
from Actor[akka://ufo-sightings/deadLetters]
to Actor[akka://ufo-sightings/system/kafka-consumer-1#1896610594]
was not delivered. [1] dead letters encountered.
描述我正在构建一个服务,该服务从 Kafka 主题接收消息并通过 HTTP 请求将消息发送到外部服务。
与外部服务的连接可能会中断,我的服务需要重试该请求。
此外,如果流中出现错误,则需要重新启动整个流。
最后,有时我不需要流及其相应的 Kafka 消费者,我想关闭整个流
所以我有一个流:
Consumer.committableSource(customizedSettings, subscriptions)
.flatMapConcat(sourceFunction)
.toMat(Sink.ignore)
.run
HTTP 请求以sourceFunction
发送
我遵循了新文档中新的 Kafka 消费者重启说明
RestartSource.withBackoff(
minBackoff = 20.seconds,
maxBackoff = 5.minutes,
randomFactor = 0.2 ) { () =>
Consumer.committableSource(customizedSettings, subscriptions)
.watchTermination() {
case (consumerControl, streamComplete) =>
logger.info(s" Started Watching Kafka consumer id = ${consumer.id} termination: is shutdown: ${consumerControl.isShutdown}, is f completed: ${streamComplete.isCompleted}")
consumerControl.isShutdown.map(_ => logger.info(s"Shutdown of consumer finally happened id = ${consumer.id} at ${DateTime.now}"))
streamComplete
.flatMap { _ =>
consumerControl.shutdown().map(_ -> logger.info(s"3.consumer id = ${consumer.id} SHUTDOWN at ${DateTime.now} GRACEFULLY:CLOSED FROM UPSTREAM"))
}
.recoverWith {
case _ =>
consumerControl.shutdown().map(_ -> logger.info(s"3.consumer id = ${consumer.id} SHUTDOWN at ${DateTime.now} ERROR:CLOSED FROM UPSTREAM"))
}
}
.flatMapConcat(sourceFunction)
}
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.ignore)(Keep.left)
.run
有一个问题打开了,在复杂的 Akka 流中讨论了这个非终止消费者,但还没有解决方案。
是否有强制终止 Kafka 消费者的解决方法
如何将使用者包装在 Actor 中并注册 KillSwitch,请参阅:https://doc.akka.io/docs/akka/2.5/stream/stream-dynamic.html#dynamic-stream-handling
然后在 Actor postStop 方法中,您可以终止流。 通过将 Actor 包装在退避主管中,您可以获得指数退避。
示例演员:https://github.com/tradecloud/kafka-akka-extension/blob/master/src/main/scala/nl/tradecloud/kafka/KafkaSubscriberActor.scala#L27