是否有办法注入一个监听器与自定义recordfilterstrategy ?
<int-kafka:message-driven-channel-adapter
id="kafkaConsumer"
listener-container="listenerContainer"
channel="consumerChannel"
message-converter="messageConverter"
payload-type="java.lang.String"
/>
我已经试着做了以下事情:
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics">
<list>
<value>someTopic</value>
</list>
</constructor-arg>
<property name="errorHandler" ref="listenerErrorHandler"/>
<property name="messageListener" ref="filteringMessageListener"/>
</bean>
<bean id="recordFilterStrategy" class="com.some.path"/>
<bean id="filteringMessageListener" class="org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter">
<constructor-arg>
<bean factory-bean="containerProperties" factory-method="getMessageListener"/>
</constructor-arg>
<constructor-arg ref="recordFilterStrategy"/>
</bean>
但是我得到了一个错误:构造函数抛出异常;嵌套异常是java.lang.IllegalArgumentException:容器不能已经有监听器
这是XML配置中的一个错误。
KafkaMessageDrivenChannelAdapter
具有相应的属性:
/**
* Specify a {@link RecordFilterStrategy} to wrap
* {@link KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener} into
* {@link FilteringMessageListenerAdapter}.
* @param recordFilterStrategy the {@link RecordFilterStrategy} to use.
*/
public void setRecordFilterStrategy(RecordFilterStrategy<K, V> recordFilterStrategy) {
我们只是错过了为XML配置公开它。
请随意就此事提出GH问题!
作为一个解决方案,考虑为KafkaMessageDrivenChannelAdapter
声明一个常规的<bean>
,而不是那个XML配置。或者干脆放弃XML配置,转而使用Java DSL: https://docs.spring.io/spring-integration/docs/current/reference/html/kafka.html#kafka-inbound-adapter-configuration