缓存和持久数据集



我想多次使用org.apache.flink.api.scala.dataset对象:

  • 使用Count(),
  • 打印行数
  • 写入neo4j数据库,
  • 转换为凝胶图对象,
  • 等。

使用这些动作中的每一个,完全重新计算数据集的值,而不是缓存。我找不到像Spark中的任何缓存()或Persist()函数。

这确实对我的应用程序产生了巨大影响,其中约1.000.000个数据,其中有许多加入/cogroup用法等:运行时似乎增加了3倍,为几个小时!那么如何缓存或坚持数据集并大大减少运行时?

我使用的是最新的Flink版本1.3.2和Scala 2.11。

示例:

package dummy
import org.apache.flink.api.scala._
import org.apache.flink.graph.scala.Graph
import org.apache.flink.graph.{Edge, Vertex}
import org.apache.logging.log4j.scala.Logging
object Trials extends Logging {
  def main(args: Array[String]) {
    val env = ExecutionEnvironment.getExecutionEnvironment
    // some dataset which could be huge in reality
    val dataSet = env.fromElements((1, 436), (2, 235), (3, 67), (4, 51), (5, 15), (6, 62), (7, 155))
    // some complex joins, coGroup functions etc.
    val joined = dataSet.cross(dataSet).filter(tuple => (tuple._1._2 + tuple._2._2) % 7 == 0)
    // log the number of rows --> performs the join above
    logger.info(f"results contains ${joined.count()} rows")
    // convert to Gelly graph format
    val graph = Graph.fromDataSet(
      dataSet.map(nodeTuple => new Vertex[Long, Long](nodeTuple._1, nodeTuple._2)),
      joined.map(edgeTuple => new Edge[Long, String](edgeTuple._1._1, edgeTuple._2._1, "someValue")),
      env
    )
    // do something with the graph
    logger.info("get number of vertices")
    val numberOfVertices = graph.numberOfVertices()
    logger.info("get number of edges")
    val numberOfEdges = graph.numberOfEdges() // --> performs the join again!
    logger.info(f"the graph has ${numberOfVertices} vertices and ${numberOfEdges} edges")
  }
}

必需的libs:log4j-core,log4j-api-scala_2.11,flink-core,flink-scala_2.11,flink-gelly-scala_2.10

我认为,如果您需要在同一流上执行多个操作,则值得使用侧面输出-https://ci.apache.org/projects/flink/flink/flink/flink--docs stable/dev/stream/side_output.html。

一旦执行了一些复杂的连接,cogroup功能等,并且获得了一个joined数据集,就可以将值收集到不同的侧面输出 - 后来将在计算计算上进行计算,而另一个将完成另一个作业。<<<<<<<<。/p>

相关内容

  • 没有找到相关文章

最新更新