删除Spark数组列中的重复项



我有一个给定的数据集:

+-------------------+--------------------+
|               date|            products|
+-------------------+--------------------+
|2017-08-31 22:00:00|[361, 361, 361, 3...|
|2017-09-22 22:00:00|[361, 362, 362, 3...|
|2017-09-21 22:00:00|[361, 361, 361, 3...|
|2017-09-28 22:00:00|[360, 361, 361, 3...|

其中products列是包含可能重复项的字符串数组。

我想删除这个重复(在一行内)

我所做的基本上是编写一个UDF函数,就像一样

val removeDuplicates: WrappedArray[String] => WrappedArray[String] = _.distinct
val udfremoveDuplicates = udf(removeDuplicates)

这个解决方案给了我一个合适的结果:

+-------------------+--------------------+--------------------+
|               date|            products|       rm_duplicates|
+-------------------+--------------------+--------------------+
|2017-08-31 22:00:00|[361, 361, 361, 3...|[361, 362, 363, 3...|
|2017-09-22 22:00:00|[361, 362, 362, 3...|[361, 362, 363, 3...|

我的问题是:

  1. Spark是否提供了更好/更有效的方法来获得此结果?

  2. 我曾考虑过使用映射,但如何将所需列作为List,以便能够使用"distinct"方法,如我的removeDuplicateslambda中的方法?

Edit:我用java标记了这个主题,因为我将用哪种语言(scala或java)获得answear并不重要:)第2版:打字错误

现在的答案已经过时,因此这个新的答案。

有了Spark 2.4数组函数,你可以做这样的事情,其他一些方面也可以显示出来:但你可以了解它的要点:

val res4 = res3.withColumn("_f", array_distinct(sort_array(flatten($"_e"))))

顺便说一句:https://www.waitingforcode.com/apache-spark-sql/apache-spark-2.4.0-features-array-higher-order-functions/read

问题中提出的方法——使用UDF——是最好的方法,因为spark-sql没有内置的基元来统一数组。

如果您正在处理大量数据和/或数组值具有唯一属性,那么值得考虑UDF的实现。

WrappedArray.distinct在幕后构建mutable.HashSet,然后遍历它以构建不同元素的阵列。从性能角度来看,这可能存在两个问题:

  1. Scala的可变集合并不是非常高效,这就是为什么在Spark的内部你会发现很多Java集合和while循环。如果您需要极高的性能,您可以使用更快的数据结构来实现自己的通用distinct。

  2. distinct的通用实现不会利用数据的任何属性。例如,如果数组的平均值很小,那么直接构建到数组中并线性搜索重复项的简单实现可能比构建复杂数据结构的代码执行得更好,尽管它的理论复杂性为O(n^2)。再举一个例子,如果值只能是小范围内的数字,或者是小集合中的字符串,则可以通过位集实现统一。

同样,只有当你有大量荒谬的数据时,才应该考虑这些策略。您的简单实现几乎适用于所有情况。

您可以使用一个简单的UDF。

val dedup = udf((colName: scala.collection.mutable.WrappedArray[String]) => colName.distinct)

df.withColumn("DeDupColumn", dedup($"colName"))

给定您当前的dataframeschema作为

root
|-- date: string (nullable = true)
|-- products: array (nullable = true)
|    |-- element: integer (containsNull = false)

您可以使用以下方法删除重复项。

df.map(row => DuplicateRemoved(row(0).toString, row(1).asInstanceOf[mutable.WrappedArray[Int]], row(1).asInstanceOf[mutable.WrappedArray[Int]].distinct)).toDF()

这个当然需要case class

case class DuplicateRemoved(date: String, products: mutable.WrappedArray[Int], rm_duplicates: mutable.WrappedArray[Int])

你应该得到以下输出

+-------------------+------------------------------+-------------------------+
|date               |products                      |rm_duplicates            |
+-------------------+------------------------------+-------------------------+
|2017-08-31 22:00:00|[361, 361, 361, 362, 363, 364]|[361, 362, 363, 364]     |
|2017-09-22 22:00:00|[361, 362, 362, 362, 363, 364]|[361, 362, 363, 364]     |
|2017-09-21 22:00:00|[361, 361, 361, 362, 363, 364]|[361, 362, 363, 364]     |
|2017-09-28 22:00:00|[360, 361, 361, 362, 363, 364]|[360, 361, 362, 363, 364]|
+-------------------+------------------------------+-------------------------+

我希望这个答案对有帮助

相关内容

  • 没有找到相关文章

最新更新