我有一个简单的Flink流处理应用程序(Flink版本1.13)。Flink应用程序从Kakfa读取,对记录进行有状态处理,然后将结果写回Kafka。在阅读了Kafka主题之后,我选择使用reinterpretAsKeyedStream()
而不是keyBy()
来避免洗牌,因为记录已经在Kakfa中分区了。在Kakfa中用于分区的键是记录的字符串字段(使用默认的kafka分区器)。Kafka topic有24个分区.
映射类定义如下。它跟踪记录的状态。
public class EnvelopeMapper extends
KeyedProcessFunction<String, Envelope, Envelope> {
...
}
记录的处理如下:
DataStream<Envelope> messageStream =
env.addSource(kafkaSource)
DataStreamUtils.reinterpretAsKeyedStream(messageStream, Envelope::getId)
.process(new EnvelopeMapper(parameters))
.addSink(kafkaSink);
并行度为1时,代码运行良好。如果并行度大于1(例如4),我将遇到以下错误:
2022-06-12 21:06:30,720 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Custom Source -> Map -> Flat Map -> KeyedProcess -> Map -> Sink: Unnamed (4/4) (7ca12ec043a45e1436f45d4b20976bd7) switched from RUNNING to FAILED on 100.101.231.222:44685-bd10d5 @ 100.101.231.222 (dataPort=37839).
java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=96, endKeyGroup=127} does not contain key group 85
根据堆栈跟踪,当EnvelopeMapper
类验证记录被发送到映射器对象的正确副本时,似乎会发生异常。
当使用reinterpretAsKeyedStream()
时,记录在EventMapper
的不同副本中是如何分布的?
提前谢谢你,艾哈迈德。
更新经过@David Anderson的反馈,将reinterpretAsKeyedStream()
替换为keyBy()
。现在对记录的处理如下:
DataStream<Envelope> messageStream =
env.addSource(kafkaSource) // Line x
.map(statelessMapper1)
.flatMap(statelessMapper2);
messageStream.keyBy(Envelope::getId)
.process(new EnvelopeMapper(parameters))
.addSink(kafkaSink);
如果keyBy()
在从Kakfa读取之后(标记为"x行")与在有状态映射器(EnvelopeMapper
)之前执行,性能是否有差异?
With
reinterpretAsKeyedStream(
DataStream<T> stream,
KeySelector<T, K> keySelector,
TypeInformation<K> typeInfo)
您断言,如果您使用keyBy(keySelector)
,则记录已经完全分布。这将而不是通常是直接来自Kafka的记录的情况。即使它们在Kafka中按键进行分区,Kafka分区也不会正确地与Flink的键组相关联。
reinterpretAsKeyedStream
仅在处理窗口或进程函数的输出时直接有用,其中您知道输出记录以特定方式进行键分区。要成功地在Kafka中使用它是非常困难的:你必须首先非常小心地将数据写入Kafka,或者对keySelector做一些棘手的事情,以便它计算的keyGroups与键映射到Kafka分区的方式一致。
这并不困难的一种情况是,如果数据是由Flink作业写入Kafka的,该作业与读取数据的下游作业运行相同的配置,并使用reinterpretAsKeyedStream
。