这是kafkcontroller需要一个类型为'
现在我得到了java.lang.IllegalStateException: a KafkaTemplate is required to support replies
。我的设置现在看起来像这样。
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public ReplyingKafkaTemplate<Object, KafkaExampleRecord, KafkaExampleRecord> replyingKafkaTemplate(ProducerFactory<Object, KafkaExampleRecord> producerFactory,
ConcurrentMessageListenerContainer<Object, KafkaExampleRecord> rc) {
return new ReplyingKafkaTemplate<>(producerFactory, rc);
}
@Bean
public ConcurrentMessageListenerContainer<Object, KafkaExampleRecord> repliesContainer(
ConcurrentKafkaListenerContainerFactory<Object, KafkaExampleRecord> containerFactory) {
ConcurrentMessageListenerContainer<Object, KafkaExampleRecord> rc =
containerFactory.createContainer("mytopic");
rc.setAutoStartup(false);
return rc;
}
}
我的REST控制器,我现在有这个。
@RestController
public class TestController {
@Autowired
private KafkaTemplate<Object, KafkaExampleRecord> template;
@Autowired
private ReplyingKafkaTemplate<Object, KafkaExampleRecord, KafkaExampleRecord> replyingTemplate;
@PostMapping("/test/produce")
public void produceToTopic(@RequestBody KafkaExampleRecord record) {
ListenableFuture<SendResult<Object, KafkaExampleRecord>> future = template.send("mytopic", record);
}
@PostMapping("/test/request")
public void requestReply(@RequestBody KafkaExampleRecord record) throws ExecutionException, InterruptedException, TimeoutException {
ProducerRecord<Object, KafkaExampleRecord> producerRecord = new ProducerRecord<>("mytopic", record);
RequestReplyFuture<Object, KafkaExampleRecord, KafkaExampleRecord> replyFuture = replyingTemplate.sendAndReceive(producerRecord);
SendResult<Object, KafkaExampleRecord> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
}
在SpringApplication.run(MyClass.class, args)
处抛出异常。
从https://docs.spring.io/spring-kafka/reference/html/#replying-template和https://www.techgalery.com/2021/08/spring-kafka-how-to-use.html看起来我现在有我需要的一切。还缺少什么?
参见服务器端receive-n-reply语义的文档:https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-send-to
为了支持
@SendTo
,侦听器容器工厂必须提供KafkaTemplate
(在其replyTemplate
属性中),用于发送应答。
由于replyingKafkaTemplate是KafkaTemplate的扩展,Boot将不再配置默认的。你需要定义一个单独的kafkatemplateobject, Object>请看看我的另一个问题,在那里我分别维护kafkatemplate和replyingKafkaTemplate以及setReplyTemplate的侦听器工厂。
ExecutionException:Due to: org.springframework.kafka.requestreply.KafkaReplyTimeoutException: Reply timed out using ReplyingKafkaTemplate