是否有在Flink中使用直方图蓄能器的示例



我偶然发现了弗林克层次结构中的直方图类,但是没有"您可以使用这种"文档围绕它。我想做类似的事情:

dataStream
    .countWindowAll(100)
    .fold(new Histogram(), (histogram,data) -> {histogram.add(data.getValue()); return histogram;})
    .flatmap((h, out) -> h.getLocalValue().navigableKeySet.iterator().forEachRemaining(key -> out.collect(key.toString()+","+h.get(key).toString()))
    .print()

,但可悲的是,Histogram无法通过Flink序列化。也许有一个"您可以使用此方法",或者还有另一种通过Flink进行直方图的方法。

我显然做错了什么。

flink的蓄能器不应用作DataStreamDataSet的数据类型。

相反,它们是通过RuntimeContext注册的,可从RichFunction.getRuntimeContext(). This is usually done in the Open()method of a RichFunction":

class MyFunc extends RichFlatMapFunction[Int, Int] {
  val hist: Histogram = new Histogram()
  override def open(conf: Configuration): Unit = {
    getRuntimeContext.addAccumulator("myHist", hist)
  }
  override def flatMap(value: Int, out: Collector[Int]): Unit = {
    hist.add(value)
  }
}

累加器的所有并行实例都会定期运送到Jobmanager(主过程)并合并。可以从StreamExecutionEnvironment.execute()返回的JobExecutionResult访问它们的值。

我认为您的用例无法由Flink的蓄能器解决。您应该创建自定义直方图数据类型。

相关内容

  • 没有找到相关文章

最新更新