我正在构建一个kafka流应用程序,目的是在业务逻辑出现故障时进行水平扩展和数据再处理。
应用程序使用来自两个主题的数据,这两个主题具有相同数量的分区,并使用KStream::merge
连接。还有第三个主题提供了所有应用程序实例都必须使用的数据,这给我带来了困难。
到目前为止,我尝试使用globalTable
来提供全局主题中的数据,但当我重置应用程序以使用历史数据时,我不确定它的行为。
据我所知,在应用程序重置后,所有merged
输入主题都以处理器接收时间戳不断增加的数据的方式使用。我担心的是,当我通过GlobalTable
使用数据并通过StateStore
将其提供给处理器时,此功能不适用。当我重新处理数据时,StateStore
提供的状态似乎只是最新消耗的状态,与时间戳输入的数据无关。
我的问题是:
- 我如何提供";全局";将主题输入到所有应用程序实例,以便每个应用程序实例都具有所有数据
- 应用程序重置后,
GlobalTable
状态存储的行为如何?状态主题是否与其他输入主题同步使用
不知道你说的";全局";输入主题。但是,如果数据转换/扩展的多个应用程序使用相同的数据,那么最好的选择是设置与订阅同一主题的应用程序一样多的用户组。通过这种方式,同一组数据将在具有同一应用程序的多个消费者实例的不同消费者组之间广播。
另外,所谓应用程序重置,您是指KAFKA通过CLI提供的ART(应用程序重置工具(?
如果是这样的话,根据我的理解,单独对GlobalTable没有特定的要求,但ART作为一个单独的实体在流处理器上工作。
当您使用应用程序重置工具时,Kafka状态存储会发生什么?
https://docs.confluent.io/4.0.0/streams/developer-guide/app-reset-tool.html
ART的作用是什么
输入主题:将偏移重置到指定位置(默认为主题的开头(.c
中间主题:跳到主题的末尾,即将应用程序为所有分区提交的使用者偏移量设置为每个分区的logSize(对于使用者组application.id(。
内部主题:删除内部主题(这会自动删除任何已提交的偏移量(。
什么是ART没有的
-
重置应用程序的输出主题。如果下游应用程序使用任何输出(或中间(主题,则您有责任在重置上游应用程序时适当调整这些下游应用程序。
-
重置应用程序实例的本地环境。您有责任删除运行应用程序实例的任何计算机上的本地状态。
要完成应用程序重置,必须删除运行应用程序实例的任何计算机上的应用程序的本地状态目录。在同一台计算机上重新启动应用程序实例之前,必须执行此操作。您可以使用以下任一方法:
- 应用程序代码中的API方法KafkaStreams#cleanUp((
- 手动删除相应的本地状态目录(默认位置:/tmp/kafka streams/<application.id>(