我正在研究 Flink 中的数据倾斜处理以及如何更改物理分区的低级控制以便对元组进行均匀处理。我创建了合成偏斜数据源,目标是在窗口上处理(聚合(它们。这是完整的代码。
streamTrainsStation01.union(streamTrainsStation02)
.union(streamTicketsStation01).union(streamTicketsStation02)
// map the keys
.map(new StationPlatformMapper(metricMapper)).name(metricMapper)
.rebalance() // or .rescale() .shuffle()
.keyBy(new StationPlatformKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.apply(new StationPlatformRichWindowFunction(metricWindowFunction)).name(metricWindowFunction)
.setParallelism(4)
.map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
.addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction)
;
根据 Flink 仪表板,我看不到 .shuffle()
、.rescale()
和 .rebalance()
之间的太大区别。尽管文档说 rebalance(( 转换更适合数据倾斜。
之后,我尝试使用.partitionCustom(partitioner, "someKey")
.然而,令我惊讶的是,我无法在窗口操作中使用 setParallelism(4(。文件说
注意:此操作本质上是非并行的,因为所有元素 必须通过相同的运算符实例。
我不明白为什么。如果允许我做partitionCustom
,为什么在那之后我不能使用并行性?这是完整的代码。
streamTrainsStation01.union(streamTrainsStation02)
.union(streamTicketsStation01).union(streamTicketsStation02)
// map the keys
.map(new StationPlatformMapper(metricMapper)).name(metricMapper)
.partitionCustom(new StationPlatformKeyCustomPartitioner(), new StationPlatformKeySelector())
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.apply(new StationPlatformRichAllWindowFunction(metricWindowFunction)).name(metricWindowFunction)
.map(new StationPlatformMapper(metricSkewedMapper)).name(metricSkewedMapper)
.addSink(new MqttStationPlatformPublisher(ipAddressSink, topic)).name(metricSinkFunction)
;
谢谢菲 利 普
我从FLink-user-mail列表中得到了答案。基本上,在rebalance()
之后使用keyBy()
会杀死rebalance()
试图做的所有效果。我发现的第一个(临时(解决方案是创建一个关心倾斜键的组合键。
public class CompositeSkewedKeyStationPlatform implements Serializable {
private static final long serialVersionUID = -5960601544505897824L;
private Integer stationId;
private Integer platformId;
private Integer skewParameter;
}
我在keyBy()
使用之前在map
功能上使用它。
public class StationPlatformSkewedKeyMapper
extends RichMapFunction<MqttSensor, Tuple2<CompositeSkewedKeyStationPlatform, MqttSensor>> {
private SkewParameterGenerator skewParameterGenerator;
public StationPlatformSkewedKeyMapper() {
this.skewParameterGenerator = new SkewParameterGenerator(10);
}
@Override
public Tuple2<CompositeSkewedKeyStationPlatform, MqttSensor> map(MqttSensor value) throws Exception {
Integer platformId = value.getKey().f2;
Integer stationId = value.getKey().f4;
Integer skewParameter = 0;
if (stationId.equals(new Integer(2)) && platformId.equals(new Integer(3))) {
skewParameter = this.skewParameterGenerator.getNextItem();
}
CompositeSkewedKeyStationPlatform compositeKey = new CompositeSkewedKeyStationPlatform(stationId, platformId,
skewParameter);
return Tuple2.of(compositeKey, value);
}
}
这是我的完整解决方案。