Flink作业不是在机器上分配的



我在apache flink中的用例很小,这是批处理处理系统。我需要处理文件夹。每个文件的处理必须由一台计算机处理。我有以下代码。一直以来,只有一个任务插槽被占用,并且文件一个接一个地处理。我有6个节点(因此6个任务经理(,并在每个节点中配置了4个任务插槽。因此,我希望一次处理24个文件。

class MyMapPartitionFunction extends RichMapPartitionFunction[java.io.File, Int] {
  override def mapPartition(
      myfiles: java.lang.Iterable[java.io.File],
      out:org.apache.flink.util.Collector[Int])
    : Unit  =  {
    var temp = myfiles.iterator()
    while(temp.hasNext()){
      val fp1 = getRuntimeContext.getDistributedCache.getFile("hadoopRun.sh")
      val file = new File(temp.next().toURI)
      Process(
        "/bin/bash ./run.sh  " + argumentsList(3)+ "/" + file.getName + " " + argumentsList(7) + "/" + file.getName + ".csv",
        new File(fp1.getAbsoluteFile.getParent))
        .lines
        .foreach{println}
      out.collect(1)
    }
  }
}

我启动了Flink作为./bin/start-cluster.sh命令,Web用户界面显示了它具有6个任务管理器,24个任务插槽。

文件夹包含大约49个文件。当我在此集合上创建映射时,我希望跨越49个并行过程。但是随后,在我的基础架构中,它们都是一个接一个地处理的。这意味着只有一台机器(一个任务管理器(可以处理所有49个文件名。我想要的是,正如每个插槽配置的2个任务一样,我希望可以同时处理24个文件。

任何指针肯定会在这里有所帮助。我在flink-conf.yaml文件中有这些参数

jobmanager.heap.mb: 2048
taskmanager.heap.mb: 1024
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
parallelism.default: 24

预先感谢。有人可以把我弄清楚我出错的地方吗?

正如David所描述的问题是,env.fromCollection(Iterable[T])使用非并行InputFormat创建DataSource。因此,DataSource1的并行性执行。随后的运算符(mapPartition(从源继承了此并行性,以便它们可以链接(这可以节省我们一个网络散装(。

解决此问题的方法是通过

明确地重新平衡源DataSet
env.fromCollection(folders).rebalance()

或在随后的操作员(mapPartition(上明确设置良好的并行性:

env.fromCollection(folders).mapPartition(...).setParallelism(49)

最新更新