KafkaReplyTemplate抛出:KafkaTemplate需要支持回复?



这是kafkcontroller需要一个类型为'

现在我得到了java.lang.IllegalStateException: a KafkaTemplate is required to support replies。我的设置现在看起来像这样。

@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public ReplyingKafkaTemplate<Object, KafkaExampleRecord, KafkaExampleRecord> replyingKafkaTemplate(ProducerFactory<Object, KafkaExampleRecord> producerFactory,
                                 ConcurrentMessageListenerContainer<Object, KafkaExampleRecord> rc) {
return new ReplyingKafkaTemplate<>(producerFactory, rc);
}
@Bean
public ConcurrentMessageListenerContainer<Object, KafkaExampleRecord> repliesContainer(
ConcurrentKafkaListenerContainerFactory<Object, KafkaExampleRecord> containerFactory) {
ConcurrentMessageListenerContainer<Object, KafkaExampleRecord> rc =
containerFactory.createContainer("mytopic");
rc.setAutoStartup(false);
return rc;
}
}

我的REST控制器,我现在有这个。

@RestController
public class TestController {
@Autowired
private KafkaTemplate<Object, KafkaExampleRecord> template;
@Autowired
private ReplyingKafkaTemplate<Object, KafkaExampleRecord, KafkaExampleRecord> replyingTemplate;
@PostMapping("/test/produce")
public void produceToTopic(@RequestBody KafkaExampleRecord record) {
ListenableFuture<SendResult<Object, KafkaExampleRecord>> future = template.send("mytopic", record);
}
@PostMapping("/test/request")
public void requestReply(@RequestBody KafkaExampleRecord record) throws ExecutionException, InterruptedException, TimeoutException {
ProducerRecord<Object, KafkaExampleRecord> producerRecord = new ProducerRecord<>("mytopic", record);
RequestReplyFuture<Object, KafkaExampleRecord, KafkaExampleRecord> replyFuture = replyingTemplate.sendAndReceive(producerRecord);
SendResult<Object, KafkaExampleRecord> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);

}

SpringApplication.run(MyClass.class, args)处抛出异常。

从https://docs.spring.io/spring-kafka/reference/html/#replying-template和https://www.techgalery.com/2021/08/spring-kafka-how-to-use.html看起来我现在有我需要的一切。还缺少什么?

参见服务器端receive-n-reply语义的文档:https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-send-to

为了支持@SendTo,侦听器容器工厂必须提供KafkaTemplate(在其replyTemplate属性中),用于发送应答。

由于replyingKafkaTemplate是KafkaTemplate的扩展,Boot将不再配置默认的。你需要定义一个单独的kafkatemplateobject, Object>请看看我的另一个问题,在那里我分别维护kafkatemplate和replyingKafkaTemplate以及setReplyTemplate的侦听器工厂。

ExecutionException:Due to: org.springframework.kafka.requestreply.KafkaReplyTimeoutException: Reply timed out using ReplyingKafkaTemplate

最新更新