我正在尝试将自定义动态分区程序从 Flink 1.7 迁移到 Flink 1.9。原始分区程序在StreamPartitioner
接口中实现了selectChannels
方法,如下所示:
// Original: working for Flink 1.7
//@Override
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> streamRecordSerializationDelegate,
int numberOfOutputChannels) {
T value = streamRecordSerializationDelegate.getInstance().getValue();
if (value.f0.isBroadCastPartitioning()) {
// send to all channels
int[] channels = new int[numberOfOutputChannels];
for (int i = 0; i < numberOfOutputChannels; ++i) {
channels[i] = i;
}
return channels;
} else if (value.f0.getPartitionKey() == -1) {
// random partition
returnChannels[0] = random.nextInt(numberOfOutputChannels);
} else {
returnChannels[0] = partitioner.partition(value.f0.getPartitionKey(), numberOfOutputChannels);
}
return returnChannels;
}
我不确定如何将其迁移到 Flink 1.9,因为StreamPartitioner
界面已更改,如下图所示:
// New: required by Flink 1.9
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> streamRecordSerializationDelegate) {
T value = streamRecordSerializationDelegate.getInstance().getValue();
if (value.f0.isBroadCastPartitioning()) {
/*
It is illegal to call this method for broadcast channel selectors and this method can remain not
implemented in that case (for example by throwing UnsupportedOperationException).
*/
} else if (value.f0.getPartitionKey() == -1) {
// random partition
returnChannels[0] = random.nextInt(numberOfChannels);
} else {
returnChannels[0] = partitioner.partition(value.f0.getPartitionKey(), numberOfChannels);
}
//return returnChannels;
return returnChannels[0];
}
请注意,selectChannels
已替换为selectChannel
。因此,对于广播元素的情况,不再可能像上面最初所做的那样返回多个输出通道。事实上,对于这一特殊情况,不应援引selectChannel
。关于如何解决这个问题的任何想法?
使用 Flink 1.9,您无法再动态广播到所有频道。您的StreamPartitioner
必须静态指定它是否是带有isBroadcast
的广播。然后,永远不会调用selectChannel
。
您是否有需要动态切换的特定用例?