将自定义动态分区程序从 Flink 1.7 迁移到 Flink 1.9



我正在尝试将自定义动态分区程序从 Flink 1.7 迁移到 Flink 1.9。原始分区程序在StreamPartitioner接口中实现了selectChannels方法,如下所示:

// Original: working for Flink 1.7
//@Override
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> streamRecordSerializationDelegate,
int numberOfOutputChannels) {
T value = streamRecordSerializationDelegate.getInstance().getValue();
if (value.f0.isBroadCastPartitioning()) {
// send to all channels
int[] channels = new int[numberOfOutputChannels];
for (int i = 0; i < numberOfOutputChannels; ++i) {
channels[i] = i;
}
return channels;
} else if (value.f0.getPartitionKey() == -1) {
// random partition
returnChannels[0] = random.nextInt(numberOfOutputChannels);
} else {
returnChannels[0] = partitioner.partition(value.f0.getPartitionKey(), numberOfOutputChannels);
}
return returnChannels;
}

我不确定如何将其迁移到 Flink 1.9,因为StreamPartitioner界面已更改,如下图所示:

// New: required by Flink 1.9
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> streamRecordSerializationDelegate) {
T value = streamRecordSerializationDelegate.getInstance().getValue();
if (value.f0.isBroadCastPartitioning()) {
/* 
It is illegal to call this method for broadcast channel selectors and this method can remain not 
implemented in that case (for example by throwing UnsupportedOperationException).
*/
} else if (value.f0.getPartitionKey() == -1) {
// random partition
returnChannels[0] = random.nextInt(numberOfChannels);
} else {
returnChannels[0] = partitioner.partition(value.f0.getPartitionKey(), numberOfChannels);
}
//return returnChannels;
return returnChannels[0];
}

请注意,selectChannels已替换为selectChannel。因此,对于广播元素的情况,不再可能像上面最初所做的那样返回多个输出通道。事实上,对于这一特殊情况,不应援引selectChannel。关于如何解决这个问题的任何想法?

使用 Flink 1.9,您无法再动态广播到所有频道。您的StreamPartitioner必须静态指定它是否是带有isBroadcast的广播。然后,永远不会调用selectChannel

您是否有需要动态切换的特定用例?

相关内容

  • 没有找到相关文章

最新更新