我想在kafka流上下文以外的上下文中访问全局ktable。
作为一个例子,我有这个示例应用程序:
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
配置了这个GlobalKTable
@Component
public class Table {
@Bean("myTable")
public GlobalKTable<String, String> buildTopo(@Qualifier("ksConfig") StreamsBuilder builder) {
return builder.globalTable("my-topic",
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(
"my-state-store" /* table/store name */)
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String())
);
}
}
这个kafka流配置:
@Configuration
public class KafkaConfiguration {
@Bean("ksConfig")
public StreamsBuilderFactoryBean kafkaStreams(KafkaProperties kafkaProperties,
@Value("${spring.application.name}") String appName) {
var props = new HashMap<String, Object>(kafkaProperties.getProperties());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
var stateDir = kafkaProperties.getStreams().getStateDir() != null ?
kafkaProperties.getStreams().getStateDir() : System.getProperty("java.io.tmpdir");
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
var config = new KafkaStreamsConfiguration(props);
return new StreamsBuilderFactoryBean(config);
}
现在假设我有一个web端点:
@Path("/api")
@Service
public class Web {
@GET
@Path("/test")
@Produces(MediaType.APPLICATION_JSON)
public Response test(@QueryParam("key") String key) {
String value = // get value from global ktable with Key
return Response.status(200).entity(value).build();
}
}
我如何从这个Web::test(...)
方法内部获得ktable ?
我尝试在Web
中自动装配一些潜在的Kafka流类,如KafkaStreams
和ProcessorContext
。Spring抛出一个错误,说它找不到这些bean。
InteractiveQueryService
实际上只在使用spring-cloud时才需要,因为你可以在一个应用程序中有多个kafka-streams实例。在你的例子中,你可以通过StreamsBuilderFactoryBean
访问KafkaStreams
对象,并从那里获得存储:
@Path("/api")
@Service
public class Web {
@Autowired
@Qualifier("ksConfig")
private StreamsBuilderFactoryBean streamsBuilderFactoryBean;
@GET
@Path("/test")
@Produces(MediaType.APPLICATION_JSON)
public Response test(@QueryParam("key") String key) {
String value = getValue(key);
return Response.status(200).entity(value).build();
}
private String getValue(String key) {
QueryableStoreType<ReadOnlyKeyValueStore<String, String>> storeType = QueryableStoreTypes.keyValueStore();
ReadOnlyKeyValueStore<String, String> store = streamsBuilderFactoryBean.getKafkaStreams().store(StoreQueryParameters.fromNameAndType("my-state-store", storeType));
return store.get(key);
}
}