我正试图使用RepliyngKafkaTemplate
,就像我设法在REST控制器中使用KafkaTemplate
一样。
@RestController
public class TestController {
@Autowired
private ReplyingKafkaTemplate<Object, KafkaExampleRecord, KafkaExampleRecord> replyingTemplate;
@PostMapping("/test/request")
public void requestReply(@RequestBody KafkaExampleRecord record) throws ExecutionException, InterruptedException, TimeoutException {
ProducerRecord<Object, KafkaExampleRecord> producerRecord = new ProducerRecord<>("mytopic", record);
RequestReplyFuture<Object, KafkaExampleRecord, KafkaExampleRecord> replyFuture = replyingTemplate.sendAndReceive(producerRecord);
SendResult<Object, KafkaExampleRecord> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<Object, KafkaExampleRecord> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
}
}
然而,我得到以下异常:
Field replyingTemplate in com.blah.KafkaController required a bean of type 'org.springframework.kafka.requestreply.ReplyingKafkaTemplate' that could not be found.
我启用了这样的自动配置。
@Configuration
@EnableKafka
public class KafkaConfig {
}
所有Kafka设置都在我的application.yml
中。
我还需要什么?我真的需要定义bean吗?似乎是不必要的。
我真的必须定义bean吗?似乎是不必要的。
是的,你必须为回复模板(包括回复容器)声明一个bean;Spring Boot只自动配置一个简单的KafkaTemplate
。
您可以检查一下,您是否正在正确扫描baseppackages。有时候,如果你没有正确扫描包,你可能会遇到这个问题,我在Spring Boot应用程序中遇到过很多次。
@ComponentScan(
basePackages = {
"x.x.x.x"
}
)