如何根据标准从RDD中的一系列记录中获取记录



我是Spark的新手,我正在努力解决Spark的以下问题。我有一个包含大量记录的表。Table包含Student_ID,Course_id,firk_date,first_name,last_name。根据业务方案,一个student_id和course_id可能会有多个风险。因此,对于特定的student_id和course_id,我需要获取student_id,course_id,firk_date,并使用最新的风险_date。

如果我在sql查询中提到我的scanario,那就像

select student_id, course_id, max(risk_date) from
students group by  student_id, course_id

我的Scala代码就像下面。

val sqlCaller = sparkSession.read.format("jdbc")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("url", url)
  .option("dbtable", "student_risk")
  .option("user", "dmin")
  .option("password", "admin123")
  .load()
sqlCaller.cache();
val studentRDD = sqlCaller.rdd.map(r => (r.getString(r.fieldIndex("course_id")), r.getString(r.fieldIndex("student_id")), r.getTimestamp(r.fieldIndex("risk_date"))))

我可以使用过滤器执行此操作吗?我不想使用SQL语句以我的要求获取数据匹配。有人可以帮我做这个吗?

您可以尝试此(更新):

import org.apache.spark.sql.SparkSession
val spark = SparkSession
  .builder()
  .getOrCreate()
import spark.implicits._
val df = Seq(
  (1, 1, "2017-01-01"),
  (1, 1, "2017-01-02"),
  (1, 2, "2017-01-04"),
  (1, 2, "2017-01-05"),
  (2, 1, "2017-01-01")
).toDF("student_id", "course_id", "risk_date")
df.groupBy($"student_id", $"course_id").agg(max("risk_date")).show

最新更新