我们正在尝试为Apache Flink创建一个扩展,它使用自定义分区。对于某些操作符,我们希望检查/检索使用的分区符。不幸的是,我无法在给定的数据集上找到任何这样做的可能性。我错过了什么,还是有其他的解决办法?
我会这样开头:
class MyPartitioner[..](..) extends Partitioner[..] {..}
[..]
val myP = new MyPartitioner(...)
val ds = in.partitionCustom(myP, 0)
现在,我想从另一个类访问分区器(如果定义的话)。在Spark中,我会这样做:
val myP = ds.partitioner.get.asInstanceOf[MyPartitioner]
然而,对于Flink,我找不到这样的可能性。
Edit1:
在Fabian的建议下,这似乎是可能的。然而,有两个限制:
(1)当使用Scala时,你必须首先检索底层Java数据集以将其转换为PartitionOperator
(2)分区必须是最后一次操作。因此在设置和获取分区之间不能使用其他操作。例如:
val in: DataSet[(String, Int)] = ???
val myP = new MyPartitioner()
val ds = in.partitionCustom(myP, 0)
val ds2 = ds.map(x => x)
val myP2 = ds2.asInstanceOf[PartitionOperator].getCustomPartitioner
谢谢你,并致以最良好的问候。菲利普
您可以将返回的DataSet
强制转换为PartitionOperator
并调用PartitionOperator.getCustomPartitioner()
:
val in: DataSet[(String, Int)] = ???
val myP = new MyPartitioner()
val ds = in.partitionCustom(myP, 0)
val myP2 = ds.asInstanceOf[PartitionOperator].getCustomPartitioner
注意
-
getCustomPartitioner()
是一个内部方法(即,不是公共API的一部分),并且可能在Flink的未来版本中更改。 -
PartitionOperator
也用于其他分区类型,如DataSet.partitionByHash()
。在这些情况下,getCustomPartitioner()
可能返回null
。