如何在Apache Flink中获得分区器



我们正在尝试为Apache Flink创建一个扩展,它使用自定义分区。对于某些操作符,我们希望检查/检索使用的分区符。不幸的是,我无法在给定的数据集上找到任何这样做的可能性。我错过了什么,还是有其他的解决办法?

我会这样开头:

class MyPartitioner[..](..) extends Partitioner[..] {..}
[..]
val myP = new MyPartitioner(...)
val ds = in.partitionCustom(myP, 0)

现在,我想从另一个类访问分区器(如果定义的话)。在Spark中,我会这样做:

val myP = ds.partitioner.get.asInstanceOf[MyPartitioner]

然而,对于Flink,我找不到这样的可能性。


Edit1:

在Fabian的建议下,这似乎是可能的。然而,有两个限制:

(1)当使用Scala时,你必须首先检索底层Java数据集以将其转换为PartitionOperator

(2)分区必须是最后一次操作。因此在设置和获取分区之间不能使用其他操作。例如:

val in: DataSet[(String, Int)] = ???
val myP = new MyPartitioner()
val ds = in.partitionCustom(myP, 0)
val ds2 = ds.map(x => x)
val myP2 = ds2.asInstanceOf[PartitionOperator].getCustomPartitioner

谢谢你,并致以最良好的问候。菲利普

您可以将返回的DataSet强制转换为PartitionOperator并调用PartitionOperator.getCustomPartitioner():

val in: DataSet[(String, Int)] = ???
val myP = new MyPartitioner()
val ds = in.partitionCustom(myP, 0)
val myP2 = ds.asInstanceOf[PartitionOperator].getCustomPartitioner

注意

  1. getCustomPartitioner()是一个内部方法(即,不是公共API的一部分),并且可能在Flink的未来版本中更改。
  2. PartitionOperator也用于其他分区类型,如DataSet.partitionByHash()。在这些情况下,getCustomPartitioner()可能返回null

相关内容

  • 没有找到相关文章