Spring与Kafka集成,不发送消息



我正在使用Java DSL开发Spring-Kafka,我发现消息没有生成/发送到Kafka主题。

我一直在使用的代码是:

@Bean
public IntegrationFlow sendToKafkaFlow() {
return IntegrationFlows.from(kafkaPublishChannel)
.handle(kafkaMessageHandler())
.get();
}
private KafkaProducerMessageHandlerSpec<String, Object, ?> kafkaMessageHandler() {
return Kafka
.outboundChannelAdapter(_kafkaProducerFactory.getKafkaTemplate().getProducerFactory())
.messageKey(m -> m
.getHeaders()
.getId())
//.headerMapper(mapper())
.topic(_topicConfiguration.getCheProgressUpdateTopic())
.configureKafkaTemplate(t -> t.getTemplate());
}
@Bean
public DefaultKafkaHeaderMapper mapper() {
return new DefaultKafkaHeaderMapper();
}

我使用的生产者配置是:

private ProducerFactory<String, Object> producerFactory() {
final Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProducerConfiguration.getKafkaServerProducerHost());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, kafkaProducerConfiguration.isKafkaProducerIdempotentEnabled());
producerProps.put(ProducerConfig.ACKS_CONFIG, kafkaProducerConfiguration.getKafkaProducerAcks());
producerProps.put(ProducerConfig.RETRIES_CONFIG, kafkaProducerConfiguration.getKafkaProducerRetries());
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaProducerConfiguration.getKafkaProducerBatchSize());
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProducerConfiguration.getKafkaProducerLingerMs());
return new DefaultKafkaProducerFactory<>(producerProps);
}

不知道为什么我没有看到卡夫卡主题中的消息。你能帮我一下吗?

尝试在该Kafka.outboundChannelAdapter()上使用sync(true)。我认为如果你在发送过程中没有看到任何进展,应该会有一些错误。您还可以考虑使用org.springframework.integration类别的DEBUG日志记录级别来查看您的消息是如何在集成流中传播的。

最新更新