我有一个有许多行的数据集,我正在该数据集中执行collect_list操作。我得到了像Cannot grow BufferHolder; exceeds size limitation
这样的误差。这是因为我的collect_list结果列的大小超过了2GB。因此,我希望将此数据集拆分为多个数据集,并尝试对其执行相同的collect_list操作(以减少冷大小)。尝试这个建议的修复。我怎样才能做到呢?
这是我的样本数据集和样本代码。
+----+----+
|col1|col2|
+----+----+
| abc| A|
| abc| B|
| cde| B|
| cde| C|
| efg| A|
+----+----+
public static Dataset<Row> getData(){
Dataset<Row> = myDataset;
return myDataset.groupBy(col("col1")).agg(collect_list(col("col2")));
}
,结果是
+----+-------+
|col1|col2 |
+----+-------+
| abc|[A,B] |
| cde|[B,C] |
| efg|[A] |
+----+-------+
我如何通过将其拆分为多个数据集来实现相同的逻辑?我使用spark 3.1和java。
感谢您可以使用randomSplit()或randomSplitAsList()方法将一个数据集拆分为多个数据集。您可以在这里详细了解此方法。
上述方法将返回数据集的数组/列表,您可以迭代并执行groupBy
和union
以获得所需的结果。
public static Dataset<Row> getData(Dataset<Row>[] myDataset) {
// Start Empty dataframe with col1 as string and col2 as array to hold union result
Dataset<Row> tempDS = SparkSession.active().emptyDataFrame().selectExpr("'' col1", "array() col2");
for (Dataset<Row> ds : myDataset) {
tempDS = tempDS.union(ds.groupBy("col1").agg(collect_list("col2").alias("col2")));
}
return tempDS;
}