我有 2 个主题(实际上更多,但在这里保持简单(,我使用 Streams DSL 加入,一旦加入,将数据发布到下游。
我正在主题 1 之上创建一个 KTable,并将其存储到一个命名的 状态存储。主题 1 的键如下所示:
{ sourceCode:"WXYZ",
platformCode:"ABCD",
transactionIdentifier:"012345:01:55555:12345000:1"
}
我按预期在更改日志主题中看到数据。
在主题 2 的顶部有一个 KStream。主题 2 的键如下所示:
{ sourceCode:"WXYZ",
platformCode:"ABCD",
transactionIdentifier:"012345:01:55555:12345000:1"
lineIdentifier:"1"
}
我正在重新键入以及聚合主题 2 中的数据,并将其放入另一个命名的状态存储中,因为主题 1 和主题 2 中的数据之间存在 1-Many 关系。 重新生成数据后,主题 2 中的键看起来与主题 1 中的键相同。我可以按预期在重新分区主题中看到重新键入的数据以及更改日志主题中的聚合数据。但是,联接不会触发。
其他关键细节 –
- 所有主题中的数据均为 Avro 格式。
- 我正在使用Java/Spring Boot。
- 我保留了commit.interval.ms和缓存的默认设置.max.bytes.buffering
有什么指示可以说明我在这里可能做错了什么吗?
编辑 1:我查看了数据分区,看起来一个最终在 14 个,另一个在 20 个。我也发现了一个类似的问题。
编辑 2:主题 1 和主题 2 的生产者是一个 golang 应用程序。 流还原使用者具有以下配置:
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
流使用者具有以下配置:
partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]
我在下面发布答案,以便它可以帮助其他人从此类问题中寻找涅槃。正如链接问题的评论部分所指出的,这是由于生产者应用程序造成的问题。
生产者应用程序是用golang编写的,因此,它的哈希是 与Java不同,这是我使用Streams DSL连接数据的。
早些时候,这就是我阅读 KTable 的方式,它保持与源主题中相同的分区:
@Bean
public KTable<MyKey, MyValue> myKTable(StreamsBuilder streamsBuilder) {
return streamsBuilder.table(inputTopic1, Materialized.as(transactionStore));
}
我重写了代码如下,以达到预期的结果:
@Bean
public KTable<MyKey, MyValue> myKTable(StreamsBuilder streamsBuilder) {
SpecificAvroSerde<MyKey> keySpecificAvroSerde = myKeySpecificAvroSerde();
SpecificAvroSerde<MyValue> valueSpecificAvroSerde = mySpecificAvroSerde();
streamsBuilder.stream(inputTopic1, Consumed.with(keySpecificAvroSerde, valueSpecificAvroSerde)).
selectKey((key, value) -> new MyKey(key.get1(), key.get2(), key.get3())).
to("dummyTopic", Produced.with(keySpecificAvroSerde, valueSpecificAvroSerde));
return streamsBuilder.table("dummyTopic",
Materialized.<MyKey, MyValue, KeyValueStore<Bytes, byte[]>>as("myStateStore").
withKeySerde(keySpecificAvroSerde).withValueSerde(valueSpecificAvroSerde));
}