我有一个DataFrame格式如下:
+---+------------------------------------------------------+
|Id |DateInfos |
+---+------------------------------------------------------+
|B |[[3, 19/06/2012-02.42.01], [4, 17/06/2012-18.22.21]] |
|A |[[1, 15/06/2012-18.22.16], [2, 15/06/2012-09.22.35]] |
|C |[[5, 14/06/2012-05.20.01]] |
+---+------------------------------------------------------+
我想按日期对dateinfo列的每个元素进行排序,时间戳在我的数组的第二个元素
+---+------------------------------------------------------+
|Id |DateInfos |
+---+------------------------------------------------------+
|B |[[4, 17/06/2012-18.22.21], [3, 19/06/2012-02.42.01]] |
|A |[[2, 15/06/2012-09.22.35], [1, 15/06/2012-18.22.16]] |
|C |[[5, 14/06/2012-05.20.01]] |
+---+------------------------------------------------------+
my DataFrame的模式打印如下:
root
|-- C1: string (nullable = true)
|-- C2: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _1: integer (nullable = false)
| | |-- _2: string (nullable = false)
我假设我必须创建一个udf,它使用具有以下签名的函数:
def sort_by_date(mouvements : Array[Any]) : Array[Any]
你知道吗?
这确实有点棘手-因为尽管UDF的输入和输出类型似乎相同,但我们不能真正这样定义它-因为输入实际上是mutable.WrappedArray[Row]
,而输出不能使用Row
,否则Spark将无法将其解码为一行…
所以我们定义了一个UDF,它接受一个mutable.WrappedArray[Row]
并返回一个Array[(Int, String)]
:
val sortDates = udf { arr: mutable.WrappedArray[Row] =>
arr.map { case Row(i: Int, s: String) => (i, s) }.sortBy(_._2)
}
val result = input.select($"Id", sortDates($"DateInfos") as "DateInfos")
result.show(truncate = false)
// +---+--------------------------------------------------+
// |Id |DateInfos |
// +---+--------------------------------------------------+
// |B |[[4,17/06/2012-18.22.21], [3,19/06/2012-02.42.01]]|
// |A |[[2,15/06/2012-09.22.35], [1,15/06/2012-18.22.16]]|
// |C |[[5,14/06/2012-05.20.01]] |
// +---+--------------------------------------------------+