在Spark 3.1中使用java将Spark Dataset拆分为相等数量的数据集



我有一个有许多行的数据集,我正在该数据集中执行collect_list操作。我得到了像Cannot grow BufferHolder; exceeds size limitation这样的误差。这是因为我的collect_list结果列的大小超过了2GB。因此,我希望将此数据集拆分为多个数据集,并尝试对其执行相同的collect_list操作(以减少冷大小)。尝试这个建议的修复。我怎样才能做到呢?

这是我的样本数据集和样本代码。

+----+----+
|col1|col2|
+----+----+
| abc|   A|
| abc|   B|
| cde|   B|
| cde|   C|
| efg|   A|
+----+----+
public static Dataset<Row> getData(){
Dataset<Row> = myDataset;
return myDataset.groupBy(col("col1")).agg(collect_list(col("col2")));
}

,结果是

+----+-------+
|col1|col2   |
+----+-------+
| abc|[A,B]  |
| cde|[B,C]  |
| efg|[A]    |
+----+-------+

我如何通过将其拆分为多个数据集来实现相同的逻辑?我使用spark 3.1和java。

感谢

您可以使用randomSplit()或randomSplitAsList()方法将一个数据集拆分为多个数据集。您可以在这里详细了解此方法。

上述方法将返回数据集的数组/列表,您可以迭代并执行groupByunion以获得所需的结果。

public static Dataset<Row> getData(Dataset<Row>[] myDataset) {
// Start Empty dataframe with col1 as string and col2 as array to hold union result
Dataset<Row> tempDS = SparkSession.active().emptyDataFrame().selectExpr("'' col1", "array() col2");

for (Dataset<Row> ds : myDataset) {
tempDS = tempDS.union(ds.groupBy("col1").agg(collect_list("col2").alias("col2")));
}
return  tempDS;
}

最新更新