如何处理Spark中强烈不同的数据大小



我想知道在设计火花乔布斯的最佳实践中,该数据量不知道(或强烈变化(。就我而言,该应用程序应既应处理初始负载,又要处理增量数据。

我想知道如何设置数据中的分区数(例如,使用repartition或设置参数(例如spark.sql.shuffle.partitions(,以避免执行者中的OOM exception(每个执行者给出固定的分配内存(。我可以

  1. 定义了很高的分区,以确保即使在很高的工作量上,该作业也不会失败
  2. 在运行时设置分区数,具体取决于源数据的大小
  3. 在独立的数据(即循环(
  4. 上引入迭代

从所有选项中,我都会看到问题:

1:我认为这对于小数据大小而效率低,因为Taks变得很小

2:需要其他查询(例如count(和E.G.对于设置spark.sql.shuffle.partitionsSparkContext需要是重新启动,我想避免

3:似乎与Spark的精神相矛盾

所以我想知道最有效的数据量最有效的策略是什么。

编辑:我对设置spark.sql.shuffle.partitions是错误的,可以在运行时设置此设置

  1. 不要在不知道需要的情况下设置大量分区。您绝对会杀死工作的表现。
  2. 正如您所说的,不要循环!

您提到的是,您引入了一个额外的步骤,该步骤是计算您的数据,乍一看似乎是错误的。但是,您不应该认为这是错误的计算。通常,计算数据所需的时间大大少于如果您对数据进行划分,则需要进行进一步处理所需的时间。将计数操作视为一项投资,这肯定值得。

您无需通过配置和重新启动Spark设置分区。相反,请执行以下操作:

  1. 注意RDD/DataFrame/DataSet的当前分区
  2. 计算数据中的条目/行数
  3. 基于平均行大小的估计值,计算分区的目标数
  4. 如果#targetPartitions<<#actualpartitions然后合并
  5. 否则如果#targetPartitions>> #actualPartitions,则repartition
  6. 其他#targetPartitions〜 = #actualPartitions,然后什么都不做

cocece操作将在不改组的情况下重新分配您的数据,因此,可用时效率要高得多。

理想情况下,您可以估计遗嘱产生的行数,而不是计算它们。另外,您将需要仔细考虑执行此操作的何时适合。借助长RDD谱系,您可以杀死性能,因为由于Scala Lazy的执行,您可以无意间减少可以执行复杂代码的内核数。查看检查点以减轻此问题。

最新更新