Kafka流任务和内部状态存储的管理



假设我们在两个不同的机器(实例(上启动了两个流式任务,具有以下属性:-

public final static String applicationID = "StreamsPOC";
public final static String bootstrapServers = "10.21.22.56:9093";    
public final static String topicname = "TestTransaction";
public final static String shipmentTopicName = "TestShipment";
public final static String RECORD_COUNT_STORE_NAME = "ProcessorONEStore";

使用这些上述属性,流任务的定义如下所示:-

Map<String, String> changelogConfig = new HashMap();
changelogConfig.put("min.insyc.replicas", "1");
// Below line not working.
changelogConfig.put("topic", "myChangedTopicLog");

StoreBuilder kvStoreBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(AppConfigs.RECORD_COUNT_STORE_NAME),
AppSerdes.String(), AppSerdes.Integer()
).withLoggingEnabled(changelogConfig);
kStreamBuilder.addStateStore(kvStoreBuilder);

KStream<String, String> sourceKafkaStream = kStreamBuilder.stream
(AppConfigs.topicname, Consumed.with(AppSerdes.String(), AppSerdes.String()));

现在,正如我所观察到的,在后台,kafka在后台创建了主题(用于备份内部状态存储(,名称如下:-StreamsPOC-ProcessorONEStore-changelog

第一个问题是:-两个不同的流任务是否将内部状态存储维护并备份到同一主题?

第二个问题是:Say Task-1获取分区-1,并写入Say<K1、V1>并且Task-2开始在Partition-2上工作,并且说它还写入<K1、V1>到其各自的本地状态存储,那么它是否会带来数据被覆盖的风险,因为两个任务都在将数据备份到同一个变更日志主题?

第三个问题是:-如何指定更改日志主题的自定义名称?

非常感谢您的回复!!

首先,对术语的一些思考:术语"任务";在Kafka Stream中有着明确的含义,作为用户,你不会自己创建任务。当您的程序被执行时,Kafka Streams会创建任务;独立计算单元";并为您执行这些任务。——我想,你说的";任务";实际上是一个KafkaStreams客户端(称为实例(。

如果用相同的application.id启动多个实例,它们将组成一个使用者组,并以数据并行的方式共享负载。对于状态存储,每个实例都将托管存储的碎片(有时也称为分区(。所有实例都使用相同的主题,并且该主题为每个存储碎片都有一个分区。从存储碎片到变更日志分区有一个1:1的映射。此外,还有从输入主题分区到任务的1:1映射,以及任务和存储碎片之间的1:1映射。因此,总的来说,这是一个1:1:1:1的映射:对于每个输入主题分区,都会创建一个任务,每个任务持有状态存储的一个碎片,每个存储碎片由变更日志主题的一个分区支持。(即,底线是,输入主题分区的数量决定了您获得的并行任务和存储碎片的数量,并且创建的变更日志主题的分区数量与您的输入主题相同。(

  1. 所以是的,所有实例都使用相同的变更日志主题
  2. 由于任务是通过碎片和变更日志主题分区隔离的,所以它们不会相互覆盖。然而,任务的思想是每个任务处理不同的(非重叠的(密钥空间,因此具有相同<k1,...>的所有记录都应该由相同的任务处理。当然,这个规则可能会有例外,如果你的应用程序不使用不重叠的键空间,程序就会被执行(当然,根据你的业务逻辑要求,这可能是正确的,也可能是不正确的(
  3. 您似乎已经这样做了:注意,您只能自定义变更日志主题名称的一部分:<application.id>-<storeName>-changelog——也就是说,您可以选择application.idstoreName。不过,整个命名模式是硬编码的

相关内容

  • 没有找到相关文章

最新更新