如何一次将每个组发送给Spark执行者



我无法一次将每组数据帧发送给执行者。

我在company_model_vals_df dataframe中有以下数据。

 ----------------------------------------------------------------------------------------
 | model_id  |  fiscal_year  | fiscal_quarter | col1 | col2 | col3 | col4 | col5 | col6 |
 ----------------------------------------------------------------------------------------
 |    1      | 2018          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    1      | 2018          |   2             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    1      | 2018          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    1      | 2018          |   2             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    1      | 2018          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   3             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   3             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   3             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 ----------------------------------------------------------------------------------------

我想将每个分组的数据发送给执行人,一次处理每个数据。

为此我所做的如下:

var dist_company_model_vals_df =  company_model_vals_df.select("model_id","fiscal_quarter","fiscal_year").distinct()
// Want to send each group at a time to write by executors.
dist_company_model_vals_df.foreach(rowDf => {
  writeAsParquet(rowDf , parquet_file)    // this simply writes the data as parquet file
})

错误:

这将引发nullpoInterException,因为在执行程序端找不到ROWDF。使用Scala 2.11在Spark-SQL中处理此操作的正确方法是什么?

第2部分:问题

当我做Company_Model_vals_df.groupby(" model_id"," fiscal_quarter"," fiscal_year")时,即使我增加了内存后,数据也会在磁盘上溢出很多。IE。company_model_vals_df是巨大的dataframe ...进行groupby时发生的大量溢出。

同样的情况下,即partitionbyby

company_model_vals_df.write.partitionby(" model_id"," fiscal_quarter"," fiscal_year")

PSEDO代码:因此,为了避免,首先我会做val groups = company_model_vals_df.groupby(" model_id"," fiscal_quarter"," fiscal_year")。收集

groups.forEach{ group ->
   // I want to prepare child dataframes for each group from    company_model_vals_df
   val child_df = company_model_vals_df.where(model_id= group.model_id && fiscal_quarter === group.fiscal_quarter && etc)
 this child_df , i want wrote to a file i.e. saveAs(path)
}

无论如何都可以这样做。这里有没有火花功能或API在这里对我有用吗?请提出一种解决此问题的方法。

这里几乎没有选项 -

  • 您需要将数据集分配到几个数据集中并单独使用它们像,
var dist_company_model_vals_list =  company_model_vals_df
  .select("model_id","fiscal_quarter","fiscal_year").distinct().collectAsList

然后用dist_company_model_vals_list列表的输出过滤company_model_vals_df,它提供了几个可以独立工作的数据集,例如

def rowList = {
import org.apache.spark.sql._
var dfList:Seq[DataFrame] = Seq()
for (data <- dist_company_model_vals_list.zipWithIndex) {
val i = data._2
val row = data.-1
val filterCol = col($"model_id").equalTo(row.get(i).getInt(0).and($"fiscal_quarter").equalTo(row.get(i).getInt(1).and($"fiscal_year").equalTo(row.get(i).getInt(2))
   val resultDf = company_model_vals_df.filter(filterCol)    
dfList +: = resultDf
      }
dfList
}
  • 如果您的目标是编写数据,则可以使用 partitionBy("model_id","fiscal_quarter","fiscal_year")dataFrameWriterto上的方法分别写入。

如果我正确理解您的问题,则要为每个"model_id","fiscal_quarter","fiscal_year"分别操纵数据。

如果正确的话,您将使用groupBy()进行:

company_model_vals_df.groupBy("model_id","fiscal_quarter","fiscal_year").agg(avg($"col1") as "average")

如果您要寻找的是将每个逻辑组写入一个单独的文件夹,则可以通过写作来做到这一点:

company_model_vals_df.write.partitionBy("model_id","fiscal_quarter","fiscal_year").parquet("path/to/save")

相关内容

  • 没有找到相关文章

最新更新