在Spark DataFrame上获取最后一个不同的数据



我在 Spark Dataframe

上有此数据
+------+-------+-----+------------+----------+---------+
|sernum|product|state|testDateTime|testResult|      msg|
+------+-------+-----+------------+----------+---------+
|     8|    PA1|  1.0|        1.18|      pass|testlog18|
|     7|    PA1|  1.0|        1.17|      fail|testlog17|
|     6|    PA1|  1.0|        1.16|      pass|testlog16|
|     5|    PA1|  1.0|        1.15|      fail|testlog15|
|     4|    PA1|  2.0|        1.14|      fail|testlog14|
|     3|    PA1|  1.0|        1.13|      pass|testlog13|
|     2|    PA1|  2.0|        1.12|      pass|testlog12|
|     1|    PA1|  1.0|        1.11|      fail|testlog11|
+------+-------+-----+------------+----------+---------+

我关心的是testResult == "fail",困难的部分是我需要将最后一个"通过"消息作为额外的列GROUP BY产品 状态:

+------+-------+-----+------------+----------+---------+---------+
|sernum|product|state|testDateTime|testResult|      msg|  passMsg|
+------+-------+-----+------------+----------+---------+---------+
|     7|    PA1|  1.0|        1.17|      fail|testlog17|testlog16|
|     5|    PA1|  1.0|        1.15|      fail|testlog15|testlog13|
|     4|    PA1|  2.0|        1.14|      fail|testlog14|testlog12|
|     1|    PA1|  1.0|        1.11|      fail|testlog11|     null|
+------+-------+-----+------------+----------+---------+---------+

如何使用DataFrameSQL

实现此目的

诀窍是定义每个组以传递测试开头的组。然后,再次将窗口功能与group用作附加分区列:

val df = Seq(
  (8, "PA1", 1.0, 1.18, "pass", "testlog18"),
  (7, "PA1", 1.0, 1.17, "fail", "testlog17"),
  (6, "PA1", 1.0, 1.16, "pass", "testlog16"),
  (5, "PA1", 1.0, 1.15, "fail", "testlog15"),
  (4, "PA1", 2.0, 1.14, "fail", "testlog14"),
  (3, "PA1", 1.0, 1.13, "pass", "testlog13"),
  (2, "PA1", 2.0, 1.12, "pass", "testlog12"),
  (1, "PA1", 1.0, 1.11, "fail", "testlog11")
).toDF("sernum", "product", "state", "testDateTime", "testResult", "msg")

df
  .withColumn("group", sum(when($"testResult" === "pass", 1)).over(Window.partitionBy($"product", $"state").orderBy($"testDateTime")))
  .withColumn("passMsg", when($"group".isNotNull,first($"msg").over(Window.partitionBy($"product", $"state", $"group").orderBy($"testDateTime"))))
  .drop($"group")
  .where($"testResult"==="fail")
  .orderBy($"product", $"state", $"testDateTime")
  .show()
+------+-------+-----+------------+----------+---------+---------+
|sernum|product|state|testDateTime|testResult|      msg|  passMsg|
+------+-------+-----+------------+----------+---------+---------+
|     7|    PA1|  1.0|        1.17|      fail|testlog17|testlog16|
|     5|    PA1|  1.0|        1.15|      fail|testlog15|testlog13|
|     4|    PA1|  2.0|        1.14|      fail|testlog14|testlog12|
|     1|    PA1|  1.0|        1.11|      fail|testlog11|     null|
+------+-------+-----+------------+----------+---------+---------+

这是一种替代方法,通过以前的时间加入传递的日志,并获取最新的"通行证"消息日志。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
Window.partitionBy($"msg").orderBy($"p_testDateTime".desc)
val fDf = df.filter($"testResult" === "fail")
var pDf = df.filter($"testResult" === "pass")
pDf.columns.foreach(x => pDf = pDf.withColumnRenamed(x, "p_"+x))
val jDf = fDf.join(
                   pDf, 
                   pDf("p_product") === fDf("product") && 
                   pDf("p_state") === fDf("state") &&
                   fDf("testDateTime") > pDf("p_testDateTime") ,
                   "left").
               select(fDf("*"),
                      pDf("p_testResult"), 
                      pDf("p_testDateTime"), 
                      pDf("p_msg")
                      )
jDf.withColumn(
               "rnk",
               row_number().
                   over(window)
               ).
               filter($"rnk" === 1).
               drop("rnk","p_testResult","p_testDateTime").
               show()
+---------+-------+------+-----+------------+----------+---------+              
|      msg|product|sernum|state|testDateTime|testResult|    p_msg|
+---------+-------+------+-----+------------+----------+---------+
|testlog14|    PA1|     4|    2|        1.14|      fail|testlog12|
|testlog11|    PA1|     1|    1|        1.11|      fail|     null|
|testlog15|    PA1|     5|    1|        1.15|      fail|testlog13|
|testlog17|    PA1|     7|    1|        1.17|      fail|testlog16|
+---------+-------+------+-----+------------+----------+---------+

相关内容

  • 没有找到相关文章

最新更新