KafkaStreams 交互式查询与 Spring-Kafka



我正在用Spring Kafka编写一个Spring Boot应用程序。由于我的应用程序专注于 Kafka 流,我需要使用交互式查询并查询我的状态存储,我想知道:是否有任何特殊方法可以使用 Spring Kafka 访问 kafka 流状态存储

据我所知,Spring Cloud Stream Binder Kafka Streams中有一些用于交互式查询的支持,但我在Spring Kafka中找不到有关它们的任何信息。我错过了什么还是在 Spring-Kafka 中没有支持它?

如果是这样 - 在创建自己的org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService版本时,我应该特别考虑什么吗?

当我只使用Kafka和Kafka Streams时,包含Spring Cloud Streams似乎有点太多了,但是如果提供的交互式查询支持将很难自己实现,也许建议无论如何都包含它?我将不胜感激任何建议。

如果你使用Spring-Kafka和Kafka-Streams,我建议你把你的KafkaStreams定义为@Bean这样Spring就会照顾对象的生命周期,有类似的东西。

@Configuration
public class KafkaConfig{
  @Bean
  KafkaStreams ingestAndAggregateStream(KafkaProperties kafkaProps){
  Topology t....
  KafkaStreams stream = new KafkaStreams(t,kafkaProps.buildStreamsProperties())
  stream.start();
  return stream;
  }
}

完成此操作后,您可以在控制器中自动连接流,并使用它来检索和查询本地KVStore

@RestController
public class StreamingController {
  private final KafkaStreams ingestAndAggregateStream;
  public StreamingController(KafkaStreams ingestAndAggregateStream){
    this.ingestAndAggregateStream = ingestAndAggregateStream;
  }
  @RequestMapping(value = "/queryStore", method = RequestMethod.GET)
  @ResponseBody
  public Value getKVStoreValue(@RequestParam String key){
    ReadOnlyKeyValueStore<String, Value> store = ingestAndAggregateStream
        .store("KV_STORE_NAME",
            QueryableStoreTypes.<String, Value>keyValueStore());
    return store.get(key);
  }
}

如果您的应用程序有@EnableKafkaStreams,那么您可以自动连线StreamsBuilderFactoryBean .您可以注入StreamsBuilderFactoryBean,然后使用.getKafkaStreams()从中获取KafkaStreams对象:

@RestController
public class StreamingController {
  private final KafkaStreams ingestAndAggregateStream;
  public StreamingController(KafkaStreams ingestAndAggregateStream){
    this.ingestAndAggregateStream = ingestAndAggregateStream;
  }
  @RequestMapping(value = "/queryStore", method = RequestMethod.GET)
  @ResponseBody
  public Value getKVStoreValue(@RequestParam String key){
    ReadOnlyKeyValueStore<String, Value> store = ingestAndAggregateStream
        .store("KV_STORE_NAME",
            QueryableStoreTypes.<String, Value>keyValueStore());
    return store.get(key);
  }
}

相关内容

  • 没有找到相关文章

最新更新