我的用例是获取Kafka主题中任何键的最新值。为此,我目前正在使用GlobalStateStore。
问题是,如果我对每个实例有不同的application.id
,则会创建一个不同的状态目录。
我想要的是在所有实例中重用现有的状态存储目录,以避免重复数据。 这是因为,我的所有应用程序实例都在同一台机器上运行,因此没有必要使用不同的状态目录。
我做了以下工作:
使用相同的application.id并创建了 2 个流实例, 指向相同 state.dir 的 2 个实例,但状态存储实例不同(java 对象不同)statestore1 和 statestore2
然后作为测试,我做了以下工作:
while(true)
{
new Thread(()-> stateStore1.get(key)).start();
new Thread(()-> stateStore2.get(key)).start();
}
然后我开始在这个主题中产生价值......
我观察到所有状态存储实例(statestore1 和 statestore2)都在更新相同的状态存储(即相同的 state.dir)(因为这是代码)
我在这样做的测试中没有遇到任何问题。但。。我想在将数据写入 state.dir 之前应该获取写锁定。 现在,如果由于某种原因,statestore1 没有释放锁,那么 statestore2 会继续等待吗?
我的上述方法是否安全,还是有其他方法?
更新:
用例是,我想在不同的进程(不同的JVM实例)之间共享一些通用数据。由于数据是通用的,因此不需要为每个进程设置全局存储(因为它是多余的)。
所有进程都应能够读取和写入该存储,因为每个进程彼此独立。 因此,如果一个进程出现故障,另一个进程可以完成其工作(将最新值存储在全局存储中并在需要时获取)。
如果在同一台计算机上运行所有组件,并且想要共享全局存储,则不应使用多个实例,而应使用实例中的多个线程。
全局存储设计为按实例复制,这正是您要避免的。
首先,state_dir
采用将在其中创建值为application.id
的文件夹的目录的参数。
因此,例如,如果state_dir
C:tmp
并且application.id
是测试,则该文件夹将被C:tmptest
因此,如果state_dir
被赋予C:tmptest
而不是具有相同application.id
甚至不同application.id
C:tmp
,则将在C:tmptest
中创建一个新目录作为C:tmptesttest
或C:tmptestsome_other_application_id
。
这些不会相互冲突。仍将有多个全球商店。
但是,如果为两个KafkaStreams
实例C:tmp
state_dir
,则会出现一个例外情况,说明
无法锁定全局状态目录。如果多个 KafkaStreams 实例在同一主机上运行,使用相同的 状态目录
对于所有实例都有一个公共位置的用例,最好使用数据库作为 Kafka 的接收器(使用 Kafka connect API 或手动从流写入数据库)。
如果用例不需要复杂的类似 SQL 的查询,那么您可以拥有自己的RocksDB
,该将写入同一台计算机上运行的所有实例的同一目录。
Kafka 似乎不支持这种开箱即用的用例。