i有一个dataframe( Dataset<Row>
),其中有六列,六个列,六个,四个需要分组,对于其他两列,它可以基于n次重复分组的列n这两个列中的变化值。
所需的数据集如下:
id | batch | batch_Id | session_name | time | value
001| abc | 098 | course-I | 1551409926133 | 2.3
001| abc | 098 | course-I | 1551404747843 | 7.3
001| abc | 098 | course-I | 1551409934220 | 6.3
我疲倦了
以下Dataset<Row> df2 = df.select("*")
.groupBy(col("id"), col("batch_Id"), col("session_name"))
.agg(max("time"));
我添加了agg
以获取GroupBy输出,但不知道如何实现。
帮助您非常感谢...谢谢。
我认为你不太遥远。
给定第一个数据集:
+---+-----+--------+------------+-------------+-----+
| id|batch|batch_Id|session_name| time|value|
+---+-----+--------+------------+-------------+-----+
|001| abc| 098| course-I|1551409926133| 2.3|
|001| abc| 098| course-I|1551404747843| 7.3|
|001| abc| 098| course-I|1551409934220| 6.3|
|002| def| 097| course-II|1551409926453| 2.3|
|002| def| 097| course-II|1551404747843| 7.3|
|002| def| 097| course-II|1551409934220| 6.3|
+---+-----+--------+------------+-------------+-----+
假设您所需的输出为:
+---+--------+------------+-------------+
| id|batch_Id|session_name| max(time)|
+---+--------+------------+-------------+
|002| 097| course-II|1551409934220|
|001| 098| course-I|1551409934220|
+---+--------+------------+-------------+
我会为汇总编写以下代码:
Dataset<Row> maxValuesDf = rawDf.select("*")
.groupBy(col("id"), col("batch_id"), col("session_name"))
.agg(max("time"));
整个应用看起来像:
package net.jgp.books.spark.ch13.lab900_max_value;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.max;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class MaxValueAggregationApp {
/**
* main() is your entry point to the application.
*
* @param args
*/
public static void main(String[] args) {
MaxValueAggregationApp app = new MaxValueAggregationApp();
app.start();
}
/**
* The processing code.
*/
private void start() {
// Creates a session on a local master
SparkSession spark = SparkSession.builder()
.appName("Aggregates max values")
.master("local[*]")
.getOrCreate();
// Reads a CSV file with header, called books.csv, stores it in a
// dataframe
Dataset<Row> rawDf = spark.read().format("csv")
.option("header", true)
.option("sep", "|")
.load("data/misc/courses.csv");
// Shows at most 20 rows from the dataframe
rawDf.show(20);
// Performs the aggregation, grouping on columns id, batch_id, and
// session_name
Dataset<Row> maxValuesDf = rawDf.select("*")
.groupBy(col("id"), col("batch_id"), col("session_name"))
.agg(max("time"));
maxValuesDf.show(5);
}
}
它有帮助吗?