Apache Flink:如何使用 Flink DataSet API 从一个数据集创建两个数据集



我正在使用 Flink 0.10.1 的 DataSet API 编写一个应用程序。我可以在 Flink 中使用单个运算符获取多个收集器吗?

我想做的是这样的:

val lines = env.readTextFile(...)
val (out_small, out_large) = lines **someOp** {
  (iterator, collector1, collector2) => {
    for (line <- iterator) {
      val (elem1, elem2) = doParsing(line)
      collector1.collect(elem1)
      collector2.collect(elem2)
    }
  } 
} 

目前,我调用mapPartition两次,从一个源数据集创建两个数据集。

val lines = env.readTextFile(...)
val out_small = lines mapPartition {
  (iterator, collector) => {
    for (line <- iterator) {
      val (elem1, elem2) = doParsing(line)
      collector.collect(elem1)
    }
  } 
}
val out_large = lines mapPartition {
  (iterator, collector) => {
    for (line <- iterator) {
      val (elem1, elem2) = doParsing(line)
      collector.collect(elem2)
    }
  } 
}

由于doParsing函数非常昂贵,因此我想每行只调用一次。

附言如果您能让我知道以更简单的方式做这种事情的其他方法,我将不胜感激。

Flink 不支持多个收集器。但是,您可以通过添加指示输出类型的附加字段来更改分析步骤的输出:

val lines = env.readTextFile(...)
val intermediate = lines **someOp** {
  (iterator, collector) => {
    for (line <- iterator) {
      val (elem1, elem2) = doParsing(line)
      collector.collect(0, elem1) // 0 indicates small
      collector.collect(1, elem2) // 1 indicates large
    }
  } 
} 

接下来,您将使用输出intermediate两次,并分别筛选第一个属性。第一个筛选器筛选0第二个筛选器筛选1(您还需要添加投影以删除第一个属性)。

               +---> filter("0") --->
               | 
intermediate --+
               | 
               +---> filter("1") --->

相关内容

  • 没有找到相关文章

最新更新