KAFKA流:使用相同的“应用程序.id”从多个主题中消费



我有一个需要收听多个不同主题的应用程序;每个主题都有有关如何处理消息的单独逻辑。我曾经想到为每个kafkastreams实例使用相同的kafka属性,但是我遇到了下面的错误。

错误

java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscription pattern is my-other-topic

代码(kotlin)

class KafkaSetup() {
    companion object {
        private val LOG = LoggerFactory.getLogger(this::class.java)
    }
    fun getProperties(): Properties {
        val properties = Properties()
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app")
        return properties
    }
    private fun listenOnMyTopic() {
        val kStreamBuilder = KStreamBuilder()
        val kStream: KStream<String, String> = kStreamBuilder.stream("my-topic")
        kStream.foreach { key, value -> LOG.info("do stuff") }
        val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
        kafkaStreams.start()
    }
    private fun listenOnMyOtherTopic() {
        val kStreamBuilder = KStreamBuilder()
        val kStream: KStream<String, String> = kStreamBuilder.stream("my-other-topic")
        kStream.foreach { key, value -> LOG.info("do other stuff") }
        val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
        kafkaStreams.start()
    }
}

我找到了此参考,建议您无法将application.id用于多个主题,但是我发现很难找到参考文档来支持这一点。application.id状态的文档:

流处理应用程序的标识符。在Kafka群集中必须是唯一的。它用作1)默认客户端ID前缀,2)成员管理组ID,3)ChangElog主题前缀。

问题

  1. 此错误是什么意思,以及导致它的原因。
  2. 鉴于您可以使用具有相同ID的应用程序的多个实例来从多个主题分区消耗,因此"在kafka cluster中必须是唯一的"
  3. 您可以使用相同的KAFKA流application.id来启动两个在不同主题上列出的KafkaStreams吗?如果是这样,如何?

详细信息: kafka 0.11.0.2

kafka通过分区而不是主题流量表。因此,如果您使用相同的application.id启动多个应用程序,则对于他们订阅的输入主题及其处理逻辑,它们必须相同。该应用程序使用application.id作为group.id形成消费者组,因此将输入主题的不同分区分配给不同的实例。

如果您对相同逻辑有不同的主题,则可以立即订阅 ALL 在每个实例中,您开始)。扩展仍然基于分区。(这基本上是您输入主题的"合并"。)

如果您想通过主题扩展和/或具有不同的处理逻辑,则必须使用不同的application.id用于不同的Kafka流应用程序。

相关内容

  • 没有找到相关文章

最新更新