从Kafka读取时使用KeyBy vs reinterpretAsKeyedStream()



我有一个简单的Flink流处理应用程序(Flink版本1.13)。Flink应用程序从Kakfa读取,对记录进行有状态处理,然后将结果写回Kafka。在阅读了Kafka主题之后,我选择使用reinterpretAsKeyedStream()而不是keyBy()来避免洗牌,因为记录已经在Kakfa中分区了。在Kakfa中用于分区的键是记录的字符串字段(使用默认的kafka分区器)。Kafka topic有24个分区.

映射类定义如下。它跟踪记录的状态。

public class EnvelopeMapper extends
KeyedProcessFunction<String, Envelope, Envelope> {
...
}

记录的处理如下:

DataStream<Envelope> messageStream =
env.addSource(kafkaSource)
DataStreamUtils.reinterpretAsKeyedStream(messageStream, Envelope::getId)
.process(new EnvelopeMapper(parameters))
.addSink(kafkaSink);

并行度为1时,代码运行良好。如果并行度大于1(例如4),我将遇到以下错误:

2022-06-12 21:06:30,720 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Custom Source -> Map -> Flat Map -> KeyedProcess -> Map -> Sink: Unnamed (4/4) (7ca12ec043a45e1436f45d4b20976bd7) switched from RUNNING to FAILED on 100.101.231.222:44685-bd10d5 @ 100.101.231.222 (dataPort=37839).
java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=96, endKeyGroup=127} does not contain key group 85

根据堆栈跟踪,当EnvelopeMapper类验证记录被发送到映射器对象的正确副本时,似乎会发生异常。

当使用reinterpretAsKeyedStream()时,记录在EventMapper的不同副本中是如何分布的?

提前谢谢你,艾哈迈德。

更新经过@David Anderson的反馈,将reinterpretAsKeyedStream()替换为keyBy()。现在对记录的处理如下:

DataStream<Envelope> messageStream =
env.addSource(kafkaSource)      // Line x
.map(statelessMapper1)
.flatMap(statelessMapper2);
messageStream.keyBy(Envelope::getId)
.process(new EnvelopeMapper(parameters))
.addSink(kafkaSink);

如果keyBy()在从Kakfa读取之后(标记为"x行")与在有状态映射器(EnvelopeMapper)之前执行,性能是否有差异?

With

reinterpretAsKeyedStream(
DataStream<T> stream,
KeySelector<T, K> keySelector,
TypeInformation<K> typeInfo)

您断言,如果您使用keyBy(keySelector),则记录已经完全分布。这将而不是通常是直接来自Kafka的记录的情况。即使它们在Kafka中按键进行分区,Kafka分区也不会正确地与Flink的键组相关联。

reinterpretAsKeyedStream仅在处理窗口或进程函数的输出时直接有用,其中您知道输出记录以特定方式进行键分区。要成功地在Kafka中使用它是非常困难的:你必须首先非常小心地将数据写入Kafka,或者对keySelector做一些棘手的事情,以便它计算的keyGroups与键映射到Kafka分区的方式一致。

这并不困难的一种情况是,如果数据是由Flink作业写入Kafka的,该作业与读取数据的下游作业运行相同的配置,并使用reinterpretAsKeyedStream

相关内容

  • 没有找到相关文章

最新更新