我们使用的是spring cloud stream Hoxton RC7项目中包含的Kafka流(因此使用提供的Kaf卡流和Kafka客户端版本[2.3.1](
ext {
set('springCloudVersion', 'Hoxton.SR7')
}
...
dependencies {
// spring cloud stream
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation("org.springframework.cloud:spring-cloud-stream")
// redis
implementation 'io.lettuce:lettuce-core'
implementation 'org.springframework.data:spring-data-redis'
testCompile 'it.ozimov:embedded-redis:0.7.2'
...
我们已经实现了一个kstreams应用程序
@Bean
public Consumer<KStream<String, IncomingEvent>> process() {
return input -> {
我们在其中进行一些聚合,如:
.aggregate(Foo::new, (key, value1, aggregate) ->
(aggregate == null || aggregate.getLastModified() == null || this.mustProcess(key, value1))
? value1
: aggregate,
materialized
)
现在物化应该是一个自定义的外部状态存储(Redis(:
Materialized<String, Foo, KeyValueStore<Bytes, byte[]>> materialized =
Materialized.as("redis-store");
由StoreBuilder Bean提供:
@Bean
public StoreBuilder<KeyValueStore<String, Foo>> builder(RedisKeyValueStoreBytes redisKeyValueStoreBytes){
return Stores.keyValueStoreBuilder(supplier(redisKeyValueStoreBytes),
new Serdes.StringSerde(),
new SomeFooSerde());
}
public static KeyValueBytesStoreSupplier supplier(RedisKeyValueStoreBytes redisKeyValueStoreBytes) {
return new KeyValueBytesStoreSupplier() {
@Override
public String name() {
return "redis-store";
}
@Override
public KeyValueStore<Bytes, byte[]> get() {
return redisKeyValueStoreBytes;
}
@Override
public String metricsScope() {
return "redis-session-state";
}
};
}
我现在用EmbeddedKafka:测试应用程序
@ActiveProfiles("test")
@RunWith(SpringRunner.class)
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@SpringBootTest(classes = {TestConfigurationTests.class})
@EmbeddedKafka(count = 3, ports = {29901, 29902, 29903}, zookeeperPort = 33991)
public class TestKafkaIntegration {
我尝试访问状态存储并查询添加的项目的位置:
ReadOnlyKeyValueStore<String, Foo> queryableStore = interactiveQueryService.getQueryableStore(
"redis-store", QueryableStoreTypes.keyValueStore());
return queryableStore;
但当我运行测试时,我收到一个错误:
Caused by: org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore redis-store is already added.
几个问题:
- [1]解释的使用自定义状态存储的示例在处理器中使用它。这是否自动意味着,我不能在聚合中使用自定义状态存储
- 当无法在聚合中使用它时,无论如何,使用自定义状态存储有什么意义
- 当我稍微更改上面针对kstreams的代码并定义一个处理器而不是在聚合方法中使用物化时,错误发生了变化,然后它抱怨缺少状态";redis商店;尝试执行getQueryableStore时存储。但事实上,我可以看到,addStateStoreBeans注册了"redis store"。这怎么会发生
我想使用自定义状态存储的原因是,我(真的很难(为应用程序实例提供专用硬盘。为了快速启动应用程序,我希望避免在每次启动应用程序时处理完整的变更日志(最好每天进行几次,目前需要一个多小时(。所以现在最后一个问题是:
- 当使用自定义外部状态存储时,我是否能够在应用程序重新启动时恢复到上一个状态
[1]https://spring.io/blog/2019/12/09/stream-processing-with-spring-cloud-stream-and-apache-kafka-streams-part-6-state-stores-and-interactive-queries
您正在使用Materialized.as(java.lang.String storeName(,它将创建(具体化(具有给定名称的StateStore
(此处为"redis store"(。另一方面,使用builder(RedisKeyValueStoreBytes redisKeyValueStoreBytes)
,您正在创建另一个具有相同名称的StateStore
,该springframework可能会自动将其添加到拓扑中,这样您就可以获得";存储已添加";错误
q1:您可以在聚合中使用自定义状态存储;将其与Materialized.as(KeyValueBytesStoreSupplier供应商(一起使用
q2:也可以使用带有转换器或自定义处理器的StateStore
来进行交互式查询;同样,使用全局StateStore
,可以访问整个主题,而不是只访问KafkaStreams实例分配的分区(请参阅addGlobalStore和globalTable(
q3:我猜您没有(手动(向拓扑注册状态存储;请参阅Topology.addStateStore(StoreBuilder<?>StoreBuilder,java.lang.String…processorNames(和连接处理器和状态存储
q4:是的,状态存储是从变更日志主题加载的(使用优化时可能是原始主题(