优化 Flink 转换



我有以下方法计算DataSet中值的概率:

/**
* Compute the probabilities of each value on the given [[DataSet]]
*
* @param x single colum [[DataSet]]
* @return Sequence of probabilites for each value
*/
private[this] def probs(x: DataSet[Double]): Seq[Double] = {
val counts = x.groupBy(_.doubleValue)
.reduceGroup(_.size.toDouble)
.name("X Probs")
.collect
val total = counts.sum
counts.map(_ / total)
}

问题是,当我提交使用此方法的 flink 作业时,由于任务TimeOut导致 flink 杀死作业。我正在只有 40.000 个实例和 9 个属性的DataSet上为每个属性执行此方法。

有没有办法更有效地完成此代码?

经过几次尝试,我让它与mapPartition一起工作,这种方法是类InformationTheory的一部分,它做一些计算来计算熵、互信息等。因此,例如,SymmetricalUncertainty计算如下:

/**
* Computes 'symmetrical uncertainty' (SU) - a symmetric mutual information measure.
*
* It is defined as SU(X, y) = 2 * (IG(X|Y) / (H(X) + H(Y)))
*
* @param xy [[DataSet]] with two features
* @return SU value
*/
def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {
val su = xy.mapPartitionWith {
case in ⇒
val x = in map (_._2)
val y = in map (_._1)
val mu = mutualInformation(x, y)
val Hx = entropy(x)
val Hy = entropy(y)
Some(2 * mu / (Hx + Hy))
}
su.collect.head.head
}

有了这个,我可以有效地计算entropy,互信息等。问题是,它仅适用于并行级别为 1 的情况,问题在于mapPartition.

有没有办法做一些类似于我在这里用SymmetricalUncertainty做的事情,但无论并行程度如何?

我终于做到了,不知道这是否是最好的解决方案,但它与 n 级并行性一起工作:

def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {
val su = xy.reduceGroup { in ⇒
val invec = in.toVector
val x = invec map (_._2)
val y = invec map (_._1)
val mu = mutualInformation(x, y)
val Hx = entropy(x)
val Hy = entropy(y)
2 * mu / (Hx + Hy)
}
su.collect.head
} 

您可以在 InformationTheory.scala 上查看整个代码,其测试 InformationTheorySpec.scala

相关内容

  • 没有找到相关文章

最新更新