我们将KTable实体化为内部状态存储。
a.(我如何以及在哪里可以指定,此内部状态存储应该是持久的,并自动备份到另一个kafka主题?
b.(我们如何指定,这个内部状态存储应该是全局的,这样我的任何流任务都应该能够引用它?
c.(是否存在将传入消息记录写入内部状态存储的频率?会不会发生这种情况,一个特定的MessageRecord被流处理器处理,存储在KTable中,然后我的流处理器死了,它无法进入内部状态存储!!
下面的片段我们现在使用:-
KTable<String, String> KT0 = streamsBuilder.table(AppConfigs.topicName, Materialized.as(AppConfigs.stateStoreName)));
如有任何回应,我们将不胜感激!!
a(如果您有一个状态存储的自定义实现,您可以通过Materialized.as(KeyValueStoreSupplier)
传递它。
b( 对于全局商店用例,您可以使用builder.globalKTable()
。
c( 写入是以每条记录为基础进行的,但可以缓存在内存中。在提交输入主题偏移量之前,状态存储将被刷新,因此您永远不会错过任何数据。默认情况下,KafkaStreams至少提供一次处理语义。