分组多个列而无需聚合



i有一个dataframe( Dataset<Row>),其中有六列,六个列,六个,四个需要分组,对于其他两列,它可以基于n次重复分组的列n这两个列中的变化值。

所需的数据集如下:

 id | batch  | batch_Id | session_name | time          | value
 001|  abc   |   098    |    course-I  | 1551409926133 |  2.3
 001|  abc   |   098    |    course-I  | 1551404747843 |  7.3
 001|  abc   |   098    |    course-I  | 1551409934220 |  6.3

我疲倦了

以下
Dataset<Row> df2 = df.select("*")
    .groupBy(col("id"), col("batch_Id"), col("session_name"))
    .agg(max("time"));

我添加了agg以获取GroupBy输出,但不知道如何实现。

帮助您非常感谢...谢谢。

我认为你不太遥远。

给定第一个数据集:

+---+-----+--------+------------+-------------+-----+
| id|batch|batch_Id|session_name|         time|value|
+---+-----+--------+------------+-------------+-----+
|001|  abc|     098|    course-I|1551409926133|  2.3|
|001|  abc|     098|    course-I|1551404747843|  7.3|
|001|  abc|     098|    course-I|1551409934220|  6.3|
|002|  def|     097|   course-II|1551409926453|  2.3|
|002|  def|     097|   course-II|1551404747843|  7.3|
|002|  def|     097|   course-II|1551409934220|  6.3|
+---+-----+--------+------------+-------------+-----+

假设您所需的输出为:

+---+--------+------------+-------------+
| id|batch_Id|session_name|    max(time)|
+---+--------+------------+-------------+
|002|     097|   course-II|1551409934220|
|001|     098|    course-I|1551409934220|
+---+--------+------------+-------------+

我会为汇总编写以下代码:

Dataset<Row> maxValuesDf = rawDf.select("*")
    .groupBy(col("id"), col("batch_id"), col("session_name"))
    .agg(max("time"));

整个应用看起来像:

package net.jgp.books.spark.ch13.lab900_max_value;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.max;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class MaxValueAggregationApp {
  /**
   * main() is your entry point to the application.
   * 
   * @param args
   */
  public static void main(String[] args) {
    MaxValueAggregationApp app = new MaxValueAggregationApp();
    app.start();
  }
  /**
   * The processing code.
   */
  private void start() {
    // Creates a session on a local master
    SparkSession spark = SparkSession.builder()
        .appName("Aggregates max values")
        .master("local[*]")
        .getOrCreate();
    // Reads a CSV file with header, called books.csv, stores it in a
    // dataframe
    Dataset<Row> rawDf = spark.read().format("csv")
        .option("header", true)
        .option("sep", "|")
        .load("data/misc/courses.csv");
    // Shows at most 20 rows from the dataframe
    rawDf.show(20);
    // Performs the aggregation, grouping on columns id, batch_id, and
    // session_name
    Dataset<Row> maxValuesDf = rawDf.select("*")
        .groupBy(col("id"), col("batch_id"), col("session_name"))
        .agg(max("time"));
    maxValuesDf.show(5);
  }
}

它有帮助吗?

相关内容

  • 没有找到相关文章

最新更新