我觉得我可能错过了一些非常基本的东西,但我还是会问。
存在具有多个分区的输入主题。我正在使用选择键作为DSL拓扑的一部分。选择键始终返回相同的值。我的期望是,在 selectKey() 触发内部重新分区后,拓扑中的下一个处理器将在同一分区上为同一键调用。但是,下一个处理器 transform() 在不同的分区上调用相同的密钥。
拓扑学:
Topology buildTopology() {
final StreamsBuilder builder = new StreamsBuilder();
builder
.stream("in-topic", Consumed.with(Serdes.String(), new JsonSerde<>(CatalogEvent.class)))
.selectKey((k,v) -> "key")
.transform(() -> new Processor())
.print();
return builder.build();
}
转换使用的处理器类
public class Processor implements Transformer<String, CatalogEvent, KeyValue<String, DispEvent>> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<String, DispEvent> transform(String key, CatalogEvent catalogEvent) {
System.out.println("key:" + key + " partition:" + context.partition());
return null;
}
@Override
public KeyValue<String, DispatcherEvent> punctuate(long timestamp) {
// TODO Auto-generated method stub
return null;
}
@Override
public void close() {
// TODO Auto-generated method stub
}
}
"in-topic"有两个以随机 UUID 为键的消息,即"8f45e552-8886-4781-bb0c-79ca98f9d927"、"a794ed2a-6f7d-4522-a7ac-27c51a64fa28",两条消息的有效负载相同
两个 UUID 的处理器::转换的输出为
key:key partition: 2
key:key partition: 0
如何更改拓扑以确保具有相同键的消息将到达同一分区 - 我需要它来确保具有相同键的消息将转到同一个本地 Kafka 存储实例(用于插入或更新)。
对于process()
和[flat]transform[Values]()
,没有自动重新分区。您需要插入手动repartition()
(或旧版本中的through()
)调用来重新分区数据。如果您比较JavaDocs(与支持自动重新分区的groupBy()
或join()
),您会发现它们没有提到自动重新分区。
原因是,这三种方法是处理器API集成到DSL的一部分,因此没有DSL运算符。它们的语义是未知的,因此我们无法判断它们是否需要重新分区,如果键是否发生了变化。为避免不必要的重新分区,不执行自动重新分区。
还有一个对应的Jira:https://issues.apache.org/jira/browse/KAFKA-7608