我已经将spring集成kafka用于读-写过程的简单场景。消息驱动的端点bean的配置如下:
<kafka:message-driven-channel-adapter listener-container="listenerContainer"
channel="processChannel"
mode="batch"/>
<bean id="listenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" parent="kafkaMessageListenerContainerAbstract">
<constructor-arg>
<bean class="org.springframework.kafka.listener.ContainerProperties">
<constructor-arg name="topics" value="test"/>
<property name="transactionManager" ref="kafkaTransactionManager"/>
<property name="eosMode" value="BETA"/>
</bean>
</constructor-arg>
</bean>
当KafkaMessageListenerContainer轮询一批记录(例如10条记录(时,将启动kafka事务,但所有10条记录将由IntegrationBatchMessageListener集成到单个消息中
message = toMessagingMessage(records, acknowledgment, consumer);
是否有任何解决方案可以在单个kafka事务中处理一个批,但在消息驱动的端点中单独处理每个记录,然后提交事务?
如果可能的话,我不想使用手动ack。
我见过BatchToRecordAdapter类,但认为它在消息驱动的端点中不可用。
否;但是您可以添加一个<splitter input-channel="processChannel" output-channel="..."/>
来将列表分解为单独的消息。