如何确保Executor服务处理StateStore中的数据而不会永远被阻止



我有一个基于Kafka流的应用程序(使用低级别的API(,简单地说,它做以下事情:

流处理(具有process()方法的流程(

  • 从Kafka主题读取数据

  • 将数据放入StateStore(由RocksDB支持(

Stream处理器在初始化期间生成一个线程(在init((方法中,称其为工作线程(:

  • 工作线程是执行器服务,每1分钟唤醒一次

  • 从同一StateStore读取数据,然后进行处理。

我有一个要求,这个Worker执行器服务需要每2小时(而不是几分钟(唤醒一次。

由于流线程在Worker执行器服务试图进行读写的同一StateStore上进行读写,我如何确保Worker线程在不被永远阻止的情况下进行工作;流线程很可能在StateStore上持有锁/监视器。

有人能就如何解决这个问题提供一些意见吗?

我相信RocksDB API是线程安全的,所以它不应该是一个问题。

顺便说一句,你可以考虑使用标点符号,而不是生成不同的线程。punctuator由流线程调用,因此它将是一个单独的线程。

相关内容

  • 没有找到相关文章

最新更新