如何在java中转换spark数据集



我是spark java的新手,我想转置数据集。我已经检查了pivot函数来转置数据集,但头部对我来说是未知的,所以不能使用pivot功能来做这件事。在java中,我有任何方法可以转置数据集中吗。

这里的问题是SparkSQL没有转置函数。因此,您必须为数据集创建一个新的Schema,并创建一个映射器函数,以便将行的位置更改为列。

这将使用pivot列在spark中转置数据集。

private static Dataset<Row> transposeDF(Dataset<Row> df, String[] columns, String pivotCol) {
List<String> columnsValue =
Arrays.asList(columns).stream().map(x -> "'" + x + "', " + x).collect(Collectors.toList());
String stackCols = String.join(",", columnsValue);
Dataset<Row> df1 =
df.selectExpr(pivotCol, "stack(" + columns.length + "," + stackCols + ")")
.select(pivotCol, "col0", "col1");
Dataset<Row> finalDF =
df1.groupBy(functions.col("col0"))
.pivot(pivotCol)
.agg(functions.concat_ws("", functions.collect_list(functions.col("col1"))))
.withColumnRenamed("col0", pivotCol);
return finalDF;
}

最新更新