如何在Spark Java API中应用GroupBy/PartitionBy中的多列



如果我有一个列表/Seq的列Scala如:

val partitionsColumns = "p1,p2"
val partitionsColumnsList = partitionsColumns.split(",").toList

我可以很容易地在partitionBygroupBy中使用它,如

val windowFunction = Window.partitionBy(partitionsColumnsList:_*)
.orderBy(df("some_date").desc)

但是如果我想在Spark Java API中做同样的事情,我应该怎么做?

List<String> partitions = new ArrayList<>();
partitions.add("p1");
partitions.add("p2");
WindowSpec windowSpec  = Window.partitionBy(.....)
.orderBy(desc("some_date"));

partitionBy有两个签名:

partitionBy(Seq<Column> cols)
partitionBy(String colName, Seq<String> colNames)

所以你可以在两者中选择一个。假设partitions是字符串的列表。它会像这样:

import scala.collection.JavaConversions;
import scala.collection.Seq;
List<Column> columns = partitions.stream()
.map(functions::col)
.collect(Collectors.toList());
Seq<Column> columnSeq = JavaConversions.asScalaBuffer(columns).toSeq();
WindowSpec windowSpec  = Window.partitionBy(columnSeq);
// OR
Seq<String> columnSeq2 = JavaConversions.asScalaBuffer(partitions).toSeq();
WindowSpec windowSpec  = Window
.partitionBy(partitions.get(0), columnSeq2.tail().toSeq());

一些ide,如IntelliJIdea支持Scala和Java,当你在Scala类中复制Java代码时,它会优雅地转换代码。

但是,您可以在Java中使用以下操作:

WindowSpec windowSpec  = Window.partitionBy("p1","p2").orderBy(col("some_date").desc());

如果你需要一个列的列表,你可以发送一个Seq到partitionBy方法:

List<Column> partitions = new ArrayList<>();
partitions.add(col("p1"));
partitions.add(col("p2"));
Seq<Column> seqPartitions = JavaConverters.asScalaIteratorConverter(partitions.iterator()).asScala().toSeq();
WindowSpec windowSpec  = Window.partitionBy(seqPartitions).orderBy(col("some_date").desc());

最新更新