基于RDD/Spark DataFrame中的特定列从行中删除重复项



假设我有一个相当大的数据集,格式如下:

data = sc.parallelize([('Foo',41,'US',3),
                       ('Foo',39,'UK',1),
                       ('Bar',57,'CA',2),
                       ('Bar',72,'CA',2),
                       ('Baz',22,'US',6),
                       ('Baz',36,'US',6)])

我想做的是仅根据第一、第三和第四列的值删除重复的行。

删除完全重复的行很简单:

data = data.distinct()

,第5行或第6行将被删除

但是我如何只删除基于列1,3和4重复行?即删除以下任意一个:

('Baz',22,'US',6)
('Baz',36,'US',6)

在Python中,这可以通过使用.drop_duplicates()指定列来完成。我如何在Spark/Pyspark中实现相同的功能?

Pyspark 确实包含一个dropDuplicates()方法,该方法在1.4中引入。https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.dropDuplicates.html

>>> from pyspark.sql import Row
>>> df = sc.parallelize([ 
...     Row(name='Alice', age=5, height=80), 
...     Row(name='Alice', age=5, height=80), 
...     Row(name='Alice', age=10, height=80)]).toDF()
>>> df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+
>>> df.dropDuplicates(['name', 'height']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
+---+------+-----+

从您的问题来看,不清楚要使用哪些列来确定重复项。该解决方案背后的一般思想是基于标识重复项的列的值创建一个键。然后,您可以使用reduceByKey或reduce操作来消除重复项。

下面是一些帮助你开始的代码:

def get_key(x):
    return "{0}{1}{2}".format(x[0],x[2],x[3])
m = data.map(lambda x: (get_key(x),x))

现在,您有一个键值RDD,它由列1,3和4作为键值。下一步将是reduceByKeygroupByKeyfilter。这将消除重复。

r = m.reduceByKey(lambda x,y: (x))

我知道你已经接受了另一个答案,但如果你想这样做DataFrame,只使用groupBy和agg。假设您已经创建了一个DF(列名为"col1","col2"等),您可以这样做:

myDF.groupBy($"col1", $"col3", $"col4").agg($"col1", max($"col2"), $"col3", $"col4")

注意,在这个例子中,我选择了col2的最大值,但你也可以选择平均值,最小值等

同意David的观点。此外,如果我们想要groupBy除聚合函数中的列之外的所有列,也就是说,如果我们想要纯粹基于列的子集删除重复并保留原始数据框中的所有列,则可能不是的情况。因此,更好的方法是使用Spark 1.4.0中提供的 dropduplicate Dataframe api

参考:https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrame

我使用了内置函数dropduplicate()。Scala代码如下

val data = sc.parallelize(List(("Foo",41,"US",3),
("Foo",39,"UK",1),
("Bar",57,"CA",2),
("Bar",72,"CA",2),
("Baz",22,"US",6),
("Baz",36,"US",6))).toDF("x","y","z","count")
data.dropDuplicates(Array("x","count")).show()
输出:

+---+---+---+-----+
|  x|  y|  z|count|
+---+---+---+-----+
|Baz| 22| US|    6|
|Foo| 39| UK|    1|
|Foo| 41| US|    3|
|Bar| 57| CA|    2|
+---+---+---+-----+

下面的程序将帮助您从整体上删除重复项,或者如果您想基于某些列删除重复项,您甚至可以这样做:

import org.apache.spark.sql.SparkSession
object DropDuplicates {
def main(args: Array[String]) {
val spark =
  SparkSession.builder()
    .appName("DataFrame-DropDuplicates")
    .master("local[4]")
    .getOrCreate()
import spark.implicits._
// create an RDD of tuples with some data
val custs = Seq(
  (1, "Widget Co", 120000.00, 0.00, "AZ"),
  (2, "Acme Widgets", 410500.00, 500.00, "CA"),
  (3, "Widgetry", 410500.00, 200.00, "CA"),
  (4, "Widgets R Us", 410500.00, 0.0, "CA"),
  (3, "Widgetry", 410500.00, 200.00, "CA"),
  (5, "Ye Olde Widgete", 500.00, 0.0, "MA"),
  (6, "Widget Co", 12000.00, 10.00, "AZ")
)
val customerRows = spark.sparkContext.parallelize(custs, 4)
// convert RDD of tuples to DataFrame by supplying column names
val customerDF = customerRows.toDF("id", "name", "sales", "discount", "state")
println("*** Here's the whole DataFrame with duplicates")
customerDF.printSchema()
customerDF.show()
// drop fully identical rows
val withoutDuplicates = customerDF.dropDuplicates()
println("*** Now without duplicates")
withoutDuplicates.show()
val withoutPartials = customerDF.dropDuplicates(Seq("name", "state"))
println("*** Now without partial duplicates too")
withoutPartials.show()
 }
 }

以上方法都很好,我觉得dropduplicates是最好的方法

下面是另一种使用dropduplicates删除重复的方法(按agg等分组)但是如果你注意到时间/性能dropduplicates按列是冠军(时间花费:1563毫秒)

下面是完整的列表和时间

import org.apache.spark.sql.SparkSession
object DropDups {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("ReadFromUrl")
      .master("local[*]")
      .getOrCreate()
    val sc = spark.sparkContext
    import spark.implicits._
    spark.sparkContext.setLogLevel("Error")
    val data = sc.parallelize(List(
      ("Foo", 41, "US", 3),
      ("Foo", 39, "UK", 1),
      ("Bar", 57, "CA", 2),
      ("Bar", 72, "CA", 2),
      ("Baz", 22, "US", 6),
      ("Baz", 36, "US", 6)
    )).toDF("x", "y", "z", "count")
    spark.time
    {
      import org.apache.spark.sql.functions.first
      val data = sc.parallelize(List(
        ("Foo", 41, "US", 3),
        ("Foo", 39, "UK", 1),
        ("Bar", 57, "CA", 2),
        ("Bar", 72, "CA", 2),
        ("Baz", 22, "US", 6),
        ("Baz", 36, "US", 6)
      )).toDF("x", "y", "z", "count")
      val deduped = data
        .groupBy("x", "count")
        .agg(
          first("y").as("y"),
          first("z").as("z")
        )
      deduped.show()
    }
    spark.time {
      data.dropDuplicates(Array("x","count")).show()
    }
    spark.stop()
  }
}
  
结果:

 +---+-----+---+---+
|  x|count|  y|  z|
+---+-----+---+---+
|Baz|    6| 22| US|
|Foo|    1| 39| UK|
|Bar|    2| 57| CA|
|Foo|    3| 41| US|
+---+-----+---+---+
Time taken: 7086 ms
+---+---+---+-----+
|  x|  y|  z|count|
+---+---+---+-----+
|Baz| 22| US|    6|
|Foo| 39| UK|    1|
|Bar| 57| CA|    2|
|Foo| 41| US|    3|
+---+---+---+-----+
Time taken: 1563 ms

这是我的Df包含4重复两次,所以这里将删除重复的值。

scala> df.show
+-----+
|value|
+-----+
|    1|
|    4|
|    3|
|    5|
|    4|
|   18|
+-----+
scala> val newdf=df.dropDuplicates
scala> newdf.show
+-----+
|value|
+-----+
|    1|
|    3|
|    5|
|    4|
|   18|
+-----+

相关内容

  • 没有找到相关文章

最新更新