Spark Job只需大量数据即可



我正在尝试从S3(数据的15天)查询。我尝试单独查询它们(每天),它可以正常工作。它也可以正常工作14天。但是,当我查询15天的时间时,工作将永远运行(悬挂),任务#没有更新。

我的设置:

我使用的是51个节点群集R3.4x大,具有动态分配和最大资源打开。

我所做的就是=

val startTime="2017-11-21T08:00:00Z"
val endTime="2017-12-05T08:00:00Z"
val start = DateUtils.getLocalTimeStamp( startTime )
val end = DateUtils.getLocalTimeStamp( endTime )
val days: Int = Days.daysBetween( start, end ).getDays
val files: Seq[String] = (0 to days)
      .map( start.plusDays )
      .map( d => s"$input_path${DateTimeFormat.forPattern( "yyyy/MM/dd" ).print( d )}/*/*" )
sqlSession.sparkContext.textFile( files.mkString( "," ) ).count

当我14天的时间运行时,我得到了197337380(count),然后分别跑了第15天,得到了27676788。但是当我查询15天的时间时,工作总共悬挂

更新:

工作正常:

  var df = sqlSession.createDataFrame(sc.emptyRDD[Row], schema)
    for(n <- files ){
      val tempDF = sqlSession.read.schema( schema ).json(n)
      df = df(tempDF)
    }
df.count

但是有人可以解释为什么现在起作用,但以前不起作用?

更新:设置mapReduce.input.fileinputformat.split.split.sminsize至256 GB之后,现在正常工作。

动态分配和最大化资源分配都是不同的设置,当另一个处于活动状态时,将会禁用一个设置。在EMR中最大化资源分配的情况下,每个节点启动了1个执行程序,并将所有内核和内存分配给该executor。

我建议采取不同的路线。您似乎有一个非常大的群集,其中有51个节点,不确定是否需要它。但是,遵循此经验法则首先,您将掌握如何调整这些配置。

  • 群集内存 - 至少2倍您要处理的数据。

现在假设您需要51个节点,请在以下尝试:

  • R3.4X具有16个CPU-因此,您可以将所有这些CPU放在OS和其他过程中。
  • 将您的执行者人数设置为150-这将为每个节点分配3个执行者。
  • 将每个执行程序的内核数设置为5(每个节点执行者3个执行者)
  • 将您的执行器内存设置为大约总主机内存/3 = 35G
  • 您必须控制并行性(默认分区),将其设置为〜800
  • 调整洗牌分区 - 使这两次核心数量-1600

上面的配置对我来说像魅力一样工作。您可以监视Spark UI上的资源利用率。

另外,在您的纱线config /etc/hadoop/conf/capacity-scheduler.xml文件中,将yarn.scheduler.capacity.resource-calculator设置为org.apache.hadoop.yarn.util.resource.DominantResourceCalculator-这将使Spark可以与这些CPU真正进行全油门。更改后重新启动纱线服务。

您应该增加执行者内存和#执行者,如果数据大量尝试增加驱动程序内存。

我的建议是不使用动态资源分配,让它运行并查看是否仍然悬挂(请注意,Spark Job可以消耗整个集群资源,并使其他应用程序饥饿以获取资源跑步)。如果不悬挂,那意味着您应该使用资源分配,然后开始硬编码资源并继续增加资源,以便您可以找到可以使用的最佳资源分配。

以下链接可以帮助您了解资源的资源分配和优化。

https://community.hortonworks.com/articles/42803/spark-on-yarn-executor-executor-resource-Allocation-Allocation-aptimiz.html

相关内容

  • 没有找到相关文章

最新更新