我可以对 Kafka 流中的多个实例使用相同的 state.dir 吗?



我的用例是获取Kafka主题中任何键的最新值。为此,我目前正在使用GlobalStateStore。

问题是,如果我对每个实例有不同的application.id,则会创建一个不同的状态目录。

我想要的是在所有实例中重用现有的状态存储目录,以避免重复数据。 这是因为,我的所有应用程序实例都在同一台机器上运行,因此没有必要使用不同的状态目录。

我做了以下工作:

使用相同的application.id并创建了 2 个流实例, 指向相同 state.dir 的 2 个实例,但状态存储实例不同(java 对象不同)statestore1 和 statestore2

然后作为测试,我做了以下工作:

while(true)
{
new Thread(()-> stateStore1.get(key)).start();
new Thread(()-> stateStore2.get(key)).start();
}

然后我开始在这个主题中产生价值......

我观察到所有状态存储实例(statestore1 和 statestore2)都在更新相同的状态存储(即相同的 state.dir)(因为这是代码)

我在这样做的测试中没有遇到任何问题。但。。我想在将数据写入 state.dir 之前应该获取写锁定。 现在,如果由于某种原因,statestore1 没有释放锁,那么 statestore2 会继续等待吗?

我的上述方法是否安全,还是有其他方法?

更新:

用例是,我想在不同的进程(不同的JVM实例)之间共享一些通用数据。由于数据是通用的,因此不需要为每个进程设置全局存储(因为它是多余的)。

所有进程都应能够读取和写入该存储,因为每个进程彼此独立。 因此,如果一个进程出现故障,另一个进程可以完成其工作(将最新值存储在全局存储中并在需要时获取)。

如果在同一台计算机上运行所有组件,并且想要共享全局存储,则不应使用多个实例,而应使用实例中的多个线程。

全局存储设计为按实例复制,这正是您要避免的。

首先,state_dir采用将在其中创建值为application.id的文件夹的目录的参数。

因此,例如,如果state_dirC:tmp并且application.id测试,则该文件夹将被C:tmptest

因此,如果state_dir被赋予C:tmptest而不是具有相同application.id甚至不同application.idC:tmp,则将在C:tmptest中创建一个新目录作为C:tmptesttestC:tmptestsome_other_application_id

这些不会相互冲突。仍将有多个全球商店。

但是,如果为两个KafkaStreams实例C:tmpstate_dir,则会出现一个例外情况,说明

无法锁定全局状态目录。如果多个 KafkaStreams 实例在同一主机上运行,使用相同的 状态目录

对于所有实例都有一个公共位置的用例,最好使用数据库作为 Kafka 的接收器(使用 Kafka connect API 或手动从流写入数据库)。

如果用例不需要复杂的类似 SQL 的查询,那么您可以拥有自己的RocksDB,该将写入同一台计算机上运行的所有实例的同一目录。

Kafka 似乎不支持这种开箱即用的用例。

相关内容

  • 没有找到相关文章

最新更新