我有一个基于Kafka流的应用程序(使用低级别的API(,简单地说,它做以下事情:
流处理(具有process()
方法的流程(
-
从Kafka主题读取数据
-
将数据放入
StateStore
(由RocksDB
支持(
Stream处理器在初始化期间生成一个线程(在init((方法中,称其为工作线程(:
-
工作线程是执行器服务,每1分钟唤醒一次
-
从同一StateStore读取数据,然后进行处理。
我有一个要求,这个Worker执行器服务需要每2小时(而不是几分钟(唤醒一次。
由于流线程在Worker执行器服务试图进行读写的同一StateStore上进行读写,我如何确保Worker线程在不被永远阻止的情况下进行工作;流线程很可能在StateStore上持有锁/监视器。
有人能就如何解决这个问题提供一些意见吗?
我相信RocksDB API是线程安全的,所以它不应该是一个问题。
顺便说一句,你可以考虑使用标点符号,而不是生成不同的线程。punctuator由流线程调用,因此它将是一个单独的线程。