Spring Cloud Stream函数Router输出尝试绑定到Kafka主题



我正在尝试迁移到Spring Cloud Stream的新功能编程模型,替换像这样的条件StreamListener注释

@StreamListener("app-input", condition = "headers['eventName']=='Funded'")

@Bean
fun router() = MessageRoutingCallback {
when (it.headers["eventName"]) {
"Funded" -> "funded"
else -> "ignored"
}
}
@Bean
fun funded() = Consumer { message: Message<Funded> ->
...
}
@Bean
fun ignored() = Consumer { message: Message<*> ->
...
}

具有将频道链接到主题的相关属性

spring.cloud.function.definition=functionRouter
spring.cloud.stream.bindings.functionRouter-in-0.destination=MyTopic

我需要这种级别的间接性,因为有多种Avro消息类型都到达MyTopic,需要取消序列化并以不同的路由

这一切都很顺利,我可以按预期消费和路由消息。然而,以这种方式使用functionRouter有一个意想不到的副作用,那就是当没有可用的主题时,它也试图将functionRouter-out-0绑定到Kafka,因此应用程序每30秒就试图附加到名为"的代理上的主题;函数Router-out-0";并且由于授权错误而失败,正如您所期望的那样。

2021-05-06 12:57:55.654 WARN  [screening]                            o.s.c.s.b.k.p.KafkaTopicProvisioner      : No partitions have been retrieved for the topic (functionRouter-out-0). This will affect the health check.
2021-05-06 12:57:56.198 WARN  [screening]                            org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-3] Error while fetching metadata with correlation id 4 : {functionRouter-out-0=TOPIC_AUTHORIZATION_FAILED}
2021-05-06 12:57:56.199 ERROR [screening]                            org.apache.kafka.clients.Metadata        : [Producer clientId=producer-3] Topic authorization failed for topics [functionRouter-out-0]
2021-05-06 12:57:56.199 ERROR [screening]                            o.s.c.s.b.k.p.KafkaTopicProvisioner      : Failed to obtain partition information
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [functionRouter-out-0]

因此,问题是a(我如何阻止functionRouter-out-0通道试图绑定到Kafka,或者b(我如何在不需要中间通道的情况下实现这一点?

Spring Cloud Stream事件路由功能自动创建新主题是类似的,但从未收到答案。

我认为这是一个错误。如果你想关注它,我打开了一个问题:

https://github.com/spring-cloud/spring-cloud-stream/issues/2168

作为变通方案,只需将其指向同一目的地

spring.cloud.function.definition=functionRouter
spring.cloud.stream.bindings.functionRouter-in-0.destination=so67419839
spring.cloud.stream.bindings.functionRouter-out-0.destination=so67419839
spring.cloud.stream.bindings.functionRouter-in-0.group=so67419839

由于代表都是Consumer,我们将永远不会发送任何内容。

最新更新