Alpakkakafka消费者抵消



我在scala中使用Alpakkakafka来消费kafka主题。这是我的代码:

val kafkaConsumerSettings: ConsumerSettings[String, String] =
ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(kafkaConfig.server)
.withGroupId(kafkaConfig.group)
.withProperties(
ConsumerConfig.MAX_POLL_RECORDS_CONFIG       -> "100",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG      -> "earliest",
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> "SSL"
)
Consumer
.plainSource(kafkaConsumerSettings, Subscriptions.topics(kafkaConfig.topic))
.runWith(Sink.foreach(println))

但是,使用者仅从主题中的第一条未提交消息开始轮询。我希望始终从偏移量0开始,无论提交的消息是什么。对于Alpakka消费者,如何手动指定偏移?

我认为您需要添加几个配置条目:

  1. ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> False,因此您的作业永远不会保存任何偏移

  2. ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",所以你的工作从头开始。

如果您的作业在过去已经提交了偏移量,则可能必须将其偏移量重置为最早。