我无法一次将每组数据帧发送给执行者。
我在company_model_vals_df dataframe
中有以下数据。
----------------------------------------------------------------------------------------
| model_id | fiscal_year | fiscal_quarter | col1 | col2 | col3 | col4 | col5 | col6 |
----------------------------------------------------------------------------------------
| 1 | 2018 | 1 | r1 | r2 | r3 | r4 | r5 | r6 |
| 1 | 2018 | 2 | r1 | r2 | r3 | r4 | r5 | r6 |
| 1 | 2018 | 1 | r1 | r2 | r3 | r4 | r5 | r6 |
| 1 | 2018 | 2 | r1 | r2 | r3 | r4 | r5 | r6 |
| 1 | 2018 | 1 | r1 | r2 | r3 | r4 | r5 | r6 |
| 2 | 2017 | 3 | r1 | r2 | r3 | r4 | r5 | r6 |
| 2 | 2017 | 1 | r1 | r2 | r3 | r4 | r5 | r6 |
| 2 | 2017 | 3 | r1 | r2 | r3 | r4 | r5 | r6 |
| 2 | 2017 | 3 | r1 | r2 | r3 | r4 | r5 | r6 |
| 2 | 2017 | 1 | r1 | r2 | r3 | r4 | r5 | r6 |
----------------------------------------------------------------------------------------
我想将每个分组的数据发送给执行人,一次处理每个数据。
为此我所做的如下:
var dist_company_model_vals_df = company_model_vals_df.select("model_id","fiscal_quarter","fiscal_year").distinct()
// Want to send each group at a time to write by executors.
dist_company_model_vals_df.foreach(rowDf => {
writeAsParquet(rowDf , parquet_file) // this simply writes the data as parquet file
})
错误:
这将引发nullpoInterException,因为在执行程序端找不到ROWDF。使用Scala 2.11在Spark-SQL中处理此操作的正确方法是什么?
第2部分:问题
当我做Company_Model_vals_df.groupby(" model_id"," fiscal_quarter"," fiscal_year")时,即使我增加了内存后,数据也会在磁盘上溢出很多。IE。company_model_vals_df是巨大的dataframe ...进行groupby时发生的大量溢出。
同样的情况下,即partitionbyby
company_model_vals_df.write.partitionby(" model_id"," fiscal_quarter"," fiscal_year")
PSEDO代码:因此,为了避免,首先我会做val groups = company_model_vals_df.groupby(" model_id"," fiscal_quarter"," fiscal_year")。收集
groups.forEach{ group ->
// I want to prepare child dataframes for each group from company_model_vals_df
val child_df = company_model_vals_df.where(model_id= group.model_id && fiscal_quarter === group.fiscal_quarter && etc)
this child_df , i want wrote to a file i.e. saveAs(path)
}
无论如何都可以这样做。这里有没有火花功能或API在这里对我有用吗?请提出一种解决此问题的方法。
这里几乎没有选项 -
- 您需要将数据集分配到几个数据集中并单独使用它们像,
var dist_company_model_vals_list = company_model_vals_df
.select("model_id","fiscal_quarter","fiscal_year").distinct().collectAsList
然后用dist_company_model_vals_list
列表的输出过滤company_model_vals_df
,它提供了几个可以独立工作的数据集,例如
def rowList = {
import org.apache.spark.sql._
var dfList:Seq[DataFrame] = Seq()
for (data <- dist_company_model_vals_list.zipWithIndex) {
val i = data._2
val row = data.-1
val filterCol = col($"model_id").equalTo(row.get(i).getInt(0).and($"fiscal_quarter").equalTo(row.get(i).getInt(1).and($"fiscal_year").equalTo(row.get(i).getInt(2))
val resultDf = company_model_vals_df.filter(filterCol)
dfList +: = resultDf
}
dfList
}
- 如果您的目标是编写数据,则可以使用
partitionBy("model_id","fiscal_quarter","fiscal_year")
dataFrameWriterto上的方法分别写入。
如果我正确理解您的问题,则要为每个"model_id","fiscal_quarter","fiscal_year"
分别操纵数据。
如果正确的话,您将使用groupBy()
进行:
company_model_vals_df.groupBy("model_id","fiscal_quarter","fiscal_year").agg(avg($"col1") as "average")
如果您要寻找的是将每个逻辑组写入一个单独的文件夹,则可以通过写作来做到这一点:
company_model_vals_df.write.partitionBy("model_id","fiscal_quarter","fiscal_year").parquet("path/to/save")