我正在用Spring Kafka编写一个Spring Boot应用程序。由于我的应用程序专注于 Kafka 流,我需要使用交互式查询并查询我的状态存储,我想知道:是否有任何特殊方法可以使用 Spring Kafka 访问 kafka 流状态存储?
据我所知,Spring Cloud Stream Binder Kafka Streams中有一些用于交互式查询的支持,但我在Spring Kafka中找不到有关它们的任何信息。我错过了什么还是在 Spring-Kafka 中没有支持它?
如果是这样 - 在创建自己的org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService
版本时,我应该特别考虑什么吗?
当我只使用Kafka和Kafka Streams时,包含Spring Cloud Streams似乎有点太多了,但是如果提供的交互式查询支持将很难自己实现,也许建议无论如何都包含它?我将不胜感激任何建议。
如果你使用Spring-Kafka和Kafka-Streams,我建议你把你的KafkaStreams定义为@Bean这样Spring就会照顾对象的生命周期,有类似的东西。
@Configuration
public class KafkaConfig{
@Bean
KafkaStreams ingestAndAggregateStream(KafkaProperties kafkaProps){
Topology t....
KafkaStreams stream = new KafkaStreams(t,kafkaProps.buildStreamsProperties())
stream.start();
return stream;
}
}
完成此操作后,您可以在控制器中自动连接流,并使用它来检索和查询本地KVStore
@RestController
public class StreamingController {
private final KafkaStreams ingestAndAggregateStream;
public StreamingController(KafkaStreams ingestAndAggregateStream){
this.ingestAndAggregateStream = ingestAndAggregateStream;
}
@RequestMapping(value = "/queryStore", method = RequestMethod.GET)
@ResponseBody
public Value getKVStoreValue(@RequestParam String key){
ReadOnlyKeyValueStore<String, Value> store = ingestAndAggregateStream
.store("KV_STORE_NAME",
QueryableStoreTypes.<String, Value>keyValueStore());
return store.get(key);
}
}
如果您的应用程序有@EnableKafkaStreams
,那么您可以自动连线StreamsBuilderFactoryBean
.您可以注入StreamsBuilderFactoryBean
,然后使用.getKafkaStreams()
从中获取KafkaStreams
对象:
@RestController
public class StreamingController {
private final KafkaStreams ingestAndAggregateStream;
public StreamingController(KafkaStreams ingestAndAggregateStream){
this.ingestAndAggregateStream = ingestAndAggregateStream;
}
@RequestMapping(value = "/queryStore", method = RequestMethod.GET)
@ResponseBody
public Value getKVStoreValue(@RequestParam String key){
ReadOnlyKeyValueStore<String, Value> store = ingestAndAggregateStream
.store("KV_STORE_NAME",
QueryableStoreTypes.<String, Value>keyValueStore());
return store.get(key);
}
}