类状态在 Flink 中的函数调用之间丢失



我有这个类:

case class IDADiscretizer(
nAttrs: Int,
nBins: Int = 5,
s: Int = 5) extends Serializable {
private[this] val log = LoggerFactory.getLogger(this.getClass)
private[this] val V = Vector.tabulate(nAttrs)(i => new IntervalHeapWrapper(nBins, i))
private[this] val randomReservoir = SamplingUtils.reservoirSample((1 to s).toList.iterator, 1)
def updateSamples(v: LabeledVector): Vector[IntervalHeapWrapper] = {
val attrs = v.vector.map(_._2)
val label = v.label
// TODO: Check for missing values
attrs
.zipWithIndex
.foreach {
case (attr, i) =>
if (V(i).getNbSamples < s) {
V(i) insertValue attr // insert
} else {
if (randomReservoir(0) <= s / (i + 1)) {
//val randVal = Random nextInt s
//V(i) replace (randVal, attr)
V(i) insertValue attr
}
}
}
V
}
/**
* Return the cutpoints for the discretization
*
*/
def cutPoints: Vector[Vector[Double]] = V map (_.getBoundaries.toVector)
def discretize(data: DataSet[LabeledVector]): (DataSet[Vector[IntervalHeapWrapper]], Vector[Vector[Double]]) = {
val r = data map (x => updateSamples(x))
val c = cutPoints
(r, c)
}
}

使用 flink,我想在调用discretize后获取切点,但似乎存储在V的信息会丢失。我必须像这个问题一样使用Broadcast吗?有没有更好的方法来访问类的状态?

我尝试以两种方式调用cutpoints,一种是:

def discretize(data: DataSet[LabeledVector]) = data map (x => updateSamples(x))

然后,从外面打电话:

val a = IDADiscretizer(nAttrs = 4)
val r = a.discretize(dataSet)
r.print
val cuts = a.cutPoints

在这里,cut是空的,所以我试图计算离散化以及discretize内的切割点:

def discretize(data: DataSet[LabeledVector]) = {
val r = data map (x => updateSamples(x))
val c = cutPoints
(r, c)
}

并像这样使用它:

val a = IDADiscretizer(nAttrs = 4)
val (d, c) = a.discretize(dataSet)
c foreach println

但同样的事情也发生了。

最后,我还试图让V完全公开:

val V = Vector.tabulate(nAttrs)(i => new IntervalHeapWrapper(nBins, i))

仍然空着

我做错了什么?

相关问题:

  • 在多个转换之间保持键控状态
  • Flink 状态后端密钥原子和分发
  • Flink:状态是否跨流访问?
  • Flink:在 CoFlatMapFunction 中共享状态

感谢@TillRohrmann我最终做的是:

private[this] def computeCutPoints(x: LabeledVector) = {
val attrs = x.vector.map(_._2)
val label = x.label
attrs
.zipWithIndex
.foldLeft(V) {
case (iv, (v, i)) =>
iv(i) insertValue v
iv
}
}
/**
* Return the cutpoints for the discretization
*
*/
def cutPoints(data: DataSet[LabeledVector]): Seq[Seq[Double]] =
data.map(computeCutPoints _)
.collect
.last.map(_.getBoundaries.toVector)
def discretize(data: DataSet[LabeledVector]): DataSet[LabeledVector] =
data.map(updateSamples _)

然后像这样使用它:

val a = IDADiscretizer(nAttrs = 4)
val d = a.discretize(dataSet)
val cuts = a.cutPoints(dataSet)
d.print
cuts foreach println

我不知道这是否是最好的方法,但至少现在正在工作。

Flink 的工作方式是用户定义运算符/用户定义的函数,这些函数对来自源函数的输入数据进行操作。为了执行程序,用户代码被发送到执行它的 Flink 集群。计算结果必须通过接收器函数输出到某个存储系统。

因此,在尝试使用解决方案时,不可能轻松地混合本地计算和分布式计算。discretize所做的是定义一个转换输入DataSetdatamap运算符。例如,一旦您调用ExecutionEnvironment#executeDataSet#print,此操作将执行。现在,IDADiscretizer的用户代码和定义将发送到实例化的集群。Flink 将更新IDADiscretizer实例中的值,该实例与客户端上的实例不同。

相关内容

  • 没有找到相关文章

最新更新