我有一个数据帧,其中我有不同的参数作为列,每行参数都有一个时间戳。
我想做的是将数据帧拆分为窗口,其中每行的列值都附加到一行中。这将使我能够使用这些功能来运行群集。
例如,我想像这样转换数据帧(窗口大小 3):
2017-01-01 00:00:01, a1, b1, c1
2017-01-01 00:00:02, a2, b2, c2
2017-01-01 00:00:03, a3, b3, c3
2017-01-01 00:00:04, a4, b4, c4
2017-01-01 00:00:05, a5, b5, c5
2017-01-01 00:00:06, a6, b6, c6
2017-01-01 00:00:07, a7, b7, c7
变成这样的东西:
2017-01-01 00:00:01, 2017-01-01 00:00:03, a1, a2, a3, b1, b2, b3, c1, c2, c3
2017-01-01 00:00:04, 2017-01-01 00:00:06, a4, a5, a6, b4, b5, b6, c4, c5, c6
在聚类之后,我需要保留哪个时间间隔属于哪个集群的信息,这就是为什么我还需要保留时间范围。示例中的最后一个时刻被排除,因为没有足够的测量值来创建另一个窗口。
如何使用 Spark 执行此操作?
让我们从一些数据开始,根据您的描述:
from pyspark.sql.functions import unix_timestamp
df = sc.parallelize([("2017-01-01 00:00:01", 2.0, 2.0, 2.0),
("2017-01-01 00:00:08", 9.0, 9.0, 9.0),
("2017-01-01 00:00:02", 3.0, 3.0, 3.0),
("2017-01-01 00:00:03", 4.0, 4.0, 4.0),
("2017-01-01 00:00:04", 5.0, 5.0, 5.0),
("2017-01-01 00:00:05", 6.0, 6.0, 6.0),
("2017-01-01 00:00:06", 7.0, 7.0, 7.0),
("2017-01-01 00:00:07", 8.0, 8.0, 8.0)]).toDF(["time","a","b","c"])
df = df.withColumn("time", unix_timestamp("time", "yyyy-MM-dd HH:mm:ss").cast("timestamp"))
> 星火 2.0
我们可以生成一个新的interval
列ceil()
函数 ,然后我们可以对您的数据进行分组并将所有其他变量收集到一个平面列表中。
为了保证结果列表内的正确排序,无论初始顺序如何,我们将使用Window
函数,按date
对数据进行分区,创建一个按time
排序的rank
列。
from pyspark.sql.window import Window
from pyspark.sql.functions import ceil
df = df.withColumn("date", df["time"].cast("date"))
.withColumn("interval", ((ceil(df["time"].cast("long") / 3L))*3.0).cast("timestamp"))
window = Window.partitionBy(df['date']).orderBy(df['time'])
由于我们将rank
列收集到嵌套列表中以进行正确排序,因此我们将定义一个最终解压缩嵌套列表中所有值的udf
,但第一个值是rank
:
def unnest(col):
l = [item[1:] for item in col]
res = [item for sublist in l for item in sublist]
return(res)
unnest_udf = udf(unnest)
现在我们将所有内容放在一起:
from pyspark.sql.functions import rank
from pyspark.sql.functions import collect_list, array
df.select('*', rank().over(window).alias('rank'))
.groupBy("interval")
.agg(collect_list(array("rank","a", "b","c")).alias("vals"))
.withColumn("vals", unnest_udf("vals"))
.sort("interval")
.show(truncate = False)
+---------------------+---------------------------------------------+
|interval |vals |
+---------------------+---------------------------------------------+
|2017-01-01 00:00:03.0|[2.0, 2.0, 2.0, 3.0, 3.0, 3.0, 4.0, 4.0, 4.0]|
|2017-01-01 00:00:06.0|[5.0, 5.0, 5.0, 6.0, 6.0, 6.0, 7.0, 7.0, 7.0]|
|2017-01-01 00:00:09.0|[8.0, 8.0, 8.0, 9.0, 9.0, 9.0] |
+---------------------+---------------------------------------------+
星火 1.6
我们不能在collect_list()
中使用array
作为参数,所以我们只将collect_list()
调用包装在array
中,而不是相反。我们还将稍微修改我们的udf
,因为我们不会明确需要使用这种方法来rank
列。
unpack_udf = udf(
lambda l: [item for sublist in l for item in sublist]
)
df.select('*', rank().over(window).alias('rank'))
.groupBy("interval")
.agg(array(collect_list("a"),
collect_list("b"),
collect_list("c")).alias("vals"))
.withColumn("vals", unpack_udf("vals"))
.sort("interval")
.show(truncate = False)
+---------------------+---------------------------------------------+
|interval |vals |
+---------------------+---------------------------------------------+
|2017-01-01 00:00:03.0|[2.0, 3.0, 4.0, 2.0, 3.0, 4.0, 2.0, 3.0, 4.0]|
|2017-01-01 00:00:06.0|[5.0, 6.0, 7.0, 5.0, 6.0, 7.0, 5.0, 6.0, 7.0]|
|2017-01-01 00:00:09.0|[8.0, 9.0, 8.0, 9.0, 8.0, 9.0] |
+---------------------+---------------------------------------------+
请注意,vals
列现在以不同的方式排序,但由于我们之前定义的window
函数,该列始终如一。