我有一个 pyspark 数据帧,其中包含开始时间和停止时间列以及值更新的其他列
|startime |stoptime |hour |minute |sec |sip |dip |sport|dport|proto|pkt |byt |
|1504766585|1504801216|16 |20 |16 |192.168.0.11 |23.204.108.58 |51249|80 |6 |0 |0 |
|1504766585|1504801216|16 |20 |16 |192.168.0.11 |23.204.108.58 |51249|80 |6 |0 |0 |
|1504781751|1504801216|16 |20 |16 |192.168.0.11 |23.72.38.96 |51252|80 |6 |0 |0 |
|1504781751|1504801216|16 |20 |16 |192.168.0.11 |23.72.38.96 |51252|80 |6 |0 |0 |
|1504766585|1504801336|16 |22 |16 |192.168.0.11 |23.204.108.58 |51249|80 |6 |0 |0 |
|1504766585|1504801336|16 |22 |16 |192.168.0.11 |23.204.108.58 |51249|80 |6 |0 |0 |
|1504781751|1504801336|16 |22 |16 |192.168.0.11 |23.72.38.96 |51252|80 |6 |0 |0 |
|1504781751|1504801336|16 |22 |16 |192.168.0.11 |23.72.38.96 |51252|80 |6 |0 |0 |
在此示例中,我想选择具有最新停止时间的所有行,所有其他列值都是重复的。
我想你希望为每个sport
保留最新记录.您应该使用窗口函数来确定每个分区的最新记录:
import pyspark.sql.functions as psf
from pyspark.sql import Window
w = Window.partitionBy("sport").orderBy(psf.desc("stoptime"))
df.withColumn("rn", psf.row_number().over(w)).filter("rn = 1").drop("rn")
+----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
| startime| stoptime|hour|min|sec| sip| dip|sport|dport|proto|pkt|byt|
+----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
|1504781751|1504801336| 16| 22| 16|192.168.0.11| 23.72.38.96|51252| 80| 6| 0| 0|
|1504766585|1504801336| 16| 22| 16|192.168.0.11|23.204.108.58|51249| 80| 6| 0| 0|
+----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
您最终将得到与sport
的不同分区一样多的记录。
如果您希望整个表的最新stoptime
而不进行分区,则可以删除partitionBy
并改用dense_rank
(相同的值将具有相同的排名):
w = Window.orderBy(psf.desc("stoptime"))
df.withColumn("rn", psf.dense_rank().over(w)).filter("rn = 1").drop("rn").show()
+----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
| startime| stoptime|hour|min|sec| sip| dip|sport|dport|proto|pkt|byt|
+----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
|1504766585|1504801336| 16| 22| 16|192.168.0.11|23.204.108.58|51249| 80| 6| 0| 0|
|1504766585|1504801336| 16| 22| 16|192.168.0.11|23.204.108.58|51249| 80| 6| 0| 0|
|1504781751|1504801336| 16| 22| 16|192.168.0.11| 23.72.38.96|51252| 80| 6| 0| 0|
|1504781751|1504801336| 16| 22| 16|192.168.0.11| 23.72.38.96|51252| 80| 6| 0| 0|
+----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
from pyspark.sql.functions import col
df = sc.parallelize([(1504766585,1504801216,16,20,16,'192.168.0.11','23.204.108.58',51249,80,6,0,0),
(1504766585,1504801216,16,20,16,'192.168.0.11','23.204.108.58',51249,80,6,0,0),
(1504781751,1504801216,16,20,16,'192.168.0.11','23.72.38.96', 51252,80,6,0,0),
(1504781751,1504801216,16,20,16,'192.168.0.11','23.72.38.96', 51252,80,6,0,0),
(1504766585,1504801336,16,22,16,'192.168.0.11','23.204.108.58',51249,80,6,0,0),
(1504766585,1504801336,16,22,16,'192.168.0.11','23.204.108.58',51249,80,6,0,0),
(1504781751,1504801336,16,22,16,'192.168.0.11','23.72.38.96', 51252,80,6,0,0),
(1504781751,1504801336,16,22,16,'192.168.0.11','23.72.38.96', 51252,80,6,0,0)]).
toDF(["startime","stoptime","hour","min","sec","sip","dip","sport","dport","proto","pkt","byt"])
df1 = df.where(col("stoptime") == df.select("stoptime").rdd.max()[0]).distinct()
df1.show()
输出为
+----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
| startime| stoptime|hour|min|sec| sip| dip|sport|dport|proto|pkt|byt|
+----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
|1504766585|1504801336| 16| 22| 16|192.168.0.11|23.204.108.58|51249| 80| 6| 0| 0|
|1504781751|1504801336| 16| 22| 16|192.168.0.11| 23.72.38.96|51252| 80| 6| 0| 0|
+----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+