RecordFilterStrategy with int-kafka:消息驱动的通道适配器



是否有办法注入一个监听器与自定义recordfilterstrategy ?

<int-kafka:message-driven-channel-adapter
id="kafkaConsumer"
listener-container="listenerContainer"
channel="consumerChannel"
message-converter="messageConverter"
payload-type="java.lang.String"
/>

我已经试着做了以下事情:

<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics">
<list>
<value>someTopic</value>
</list>
</constructor-arg>
<property name="errorHandler" ref="listenerErrorHandler"/>
<property name="messageListener" ref="filteringMessageListener"/>
</bean>
<bean id="recordFilterStrategy" class="com.some.path"/>
<bean id="filteringMessageListener" class="org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter">
<constructor-arg>
<bean factory-bean="containerProperties" factory-method="getMessageListener"/>
</constructor-arg>
<constructor-arg ref="recordFilterStrategy"/>
</bean>

但是我得到了一个错误:构造函数抛出异常;嵌套异常是java.lang.IllegalArgumentException:容器不能已经有监听器

这是XML配置中的一个错误。

KafkaMessageDrivenChannelAdapter具有相应的属性:

/**
* Specify a {@link RecordFilterStrategy} to wrap
* {@link KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener} into
* {@link FilteringMessageListenerAdapter}.
* @param recordFilterStrategy the {@link RecordFilterStrategy} to use.
*/
public void setRecordFilterStrategy(RecordFilterStrategy<K, V> recordFilterStrategy) {

我们只是错过了为XML配置公开它。

请随意就此事提出GH问题!

作为一个解决方案,考虑为KafkaMessageDrivenChannelAdapter声明一个常规的<bean>,而不是那个XML配置。或者干脆放弃XML配置,转而使用Java DSL: https://docs.spring.io/spring-integration/docs/current/reference/html/kafka.html#kafka-inbound-adapter-configuration

相关内容

  • 没有找到相关文章

最新更新