下午好。
我正在尝试在 Pyspark 中执行连接,该连接使用一组复杂的条件来生成单个值。
我试图实现的目标的最小示例如下所示。想象一下一组可以在离散时间(t=0
到t=40
之间)发生的事件。每个事件都有一组三个独立的布尔属性,用于描述事件的性质。查找表中包含一些与触发每个属性相关联的时间相关值。对于每个事件,我想确定该事件的所有相关值的总和。
我的第一个数据帧df_1
是事件的列表,事件发生的时间,并具有与之关联的布尔属性选择:
+-------------+------------+------------+------------+------------+
| EVENT_INDEX | EVENT_TIME | PROPERTY_1 | PROPERTY_2 | PROPERTY_3 |
+-------------+------------+------------+------------+------------+
| Event_1 | 13 | 1 | 0 | 1 |
| Event_2 | 24 | 0 | 1 | 1 |
| Event_3 | 35 | 1 | 0 | 0 |
+-------------+------------+------------+------------+------------+
第二个数据帧df_2
是查找表,用于描述在特定时间为特定属性具有 TRUE 的关联值。由于所有时间段中有许多重复值,因此此数据帧的格式是属性具有特定值的包含时间范围。时间范围的大小不一致,并且可能因不同的属性而有很大差异:
+------------+----------+---------------+-------+
| START_TIME | END_TIME | PROPERTY_NAME | VALUE |
+------------+----------+---------------+-------+
| 0 | 18 | PROPERTY_1 | 0.1 |
| 19 | 40 | PROPERTY_1 | 0.8 |
| 0 | 20 | PROPERTY_2 | 0.7 |
| 20 | 24 | PROPERTY_2 | 0.3 |
| 25 | 40 | PROPERTY_2 | 0.7 |
| 0 | 40 | PROPERTY_3 | 0.5 |
+------------+----------+---------------+-------+
期望输出: 由于Event_1
发生在时间t=13
时,触发了PROPERTY_1
和PROPERTY_3
,因此根据df_2
的值的预期总和应为 0.1(从 0-18 存储桶PROPERTY_1
)+ 0.5(从PROPERTY_3
0-40 存储桶)= 0.6。同样,Event_2
的值应为 0.3(请记住,存储桶开始/结束时间包括在内,因此它来自 20-24 存储桶)+ 0.5 = 0.8。最后,Event_3
= 0.8。
+-------------+------------+------------+------------+------------+-------------+
| EVENT_INDEX | EVENT_TIME | PROPERTY_1 | PROPERTY_2 | PROPERTY_3 | TOTAL_VALUE |
+-------------+------------+------------+------------+------------+-------------+
| Event_1 | 13 | 1 | 0 | 1 | 0.6 |
| Event_2 | 24 | 0 | 1 | 1 | 0.8 |
| Event_3 | 35 | 1 | 0 | 0 | 0.8 |
+-------------+------------+------------+------------+------------+-------------+
对于我的初始测试数据集,在事件数据帧df_1
中有 ~20,000 个事件分布在 2000 个时间段内。每个事件有 ~44 个属性,查找表df_2
的长度为 ~53,000。由于我想将此过程扩展到更多的数据(可能有几个数量级),因此我对此问题的可并行解决方案非常感兴趣。例如,我觉得将df_2
总结为 python 字典并将其广播给我的执行者是不可能的,因为数据量很大。
由于我尝试在df_1
中的每一行添加一列,因此我尝试使用类似于以下内容的嵌套映射来完成任务:
def calculate_value(df_2):
def _calculate_value(row):
row_dict = row.asDict()
rolling_value = 0.0
for property_name in [key for key in row_dict.keys() if "PROPERTY" in key]:
additional_value = (
df_2
.filter(
(pyspark.sql.functions.col("PROPERTY_NAME") == property_name)
& (pyspark.sql.functions.col("START_BUCKET") <= row_dict["EVENT_TIME"])
& (pyspark.sql.functions.col("END_BUCKET") >= row_dict["EVENT_TIME"])
)
.select("VALUE")
.collect()
)[0][0]
rolling_value += additional_value
return pyspark.sql.Row(**row_dict)
return _calculate_value
这段代码能够在驱动程序上执行连接(通过运行calculate_value(df_2)(df_1.rdd.take(1)[0])
),但是当我尝试执行并行化映射时:
(
df_1
.rdd
.map(calculate_value(df_2))
)
我收到一个 Py4JError 指示它无法将数据帧对象锯齿化df_2
。这在 StackOverflow 的其他地方得到了验证,例如 Pyspark: PicklingError: 无法序列化对象:。
我选择使用映射而不是连接,因为我在df_1
中向每一行添加一列,并且考虑到编码识别正确行所需的复杂逻辑的困难,df_2
为每个给定事件相加(首先,检查哪些属性触发并在df_1
中为 TRUE, 然后在df_2
中选择这些属性,向下选择仅给定事件时间相关的属性和值,然后将所有事件相加)。
我正在尝试想一种方法以可持续、可扩展的方式重新配置df_2
,以允许更简单的连接/映射,但我不确定如何最好地做到这一点。
任何建议将不胜感激。
Sample DataFrames:
df1.show()
+-----------+----------+----------+----------+----------+
|EVENT_INDEX|EVENT_TIME|PROPERTY_1|PROPERTY_2|PROPERTY_3|
+-----------+----------+----------+----------+----------+
| Event_1| 13| 1| 0| 1|
| Event_2| 24| 0| 1| 1|
| Event_3| 35| 1| 0| 0|
+-----------+----------+----------+----------+----------+
df2.show()
+----------+--------+-------------+-----+
|START_TIME|END_TIME|PROPERTY_NAME|VALUE|
+----------+--------+-------------+-----+
| 0| 18| PROPERTY_1| 0.1|
| 19| 40| PROPERTY_1| 0.8|
| 0| 20| PROPERTY_2| 0.7|
| 20| 24| PROPERTY_2| 0.3|
| 25| 40| PROPERTY_2| 0.7|
| 0| 40| PROPERTY_3| 0.5|
+----------+--------+-------------+-----+
这适用于使用DataframeAPI
Spark2.4+
。(非常可扩展,因为它只使用内置函数,并且对于尽可能多的属性列是动态的)
只要属性列以'PROPERTY_'
开头,它就可以适用于尽可能多的属性,因为它对它们来说是动态的。首先,我将使用arrays_zip
和array
和explode
将所有属性列折叠成包含 2 列的行,使用element_at
为我们提供PROPERY_NAME,PROPERTY_VALUE
。在join
之前,我将过滤以仅保留PROPERY_VALUE=1
的所有行。联接将在range of time
上进行,其中PROPERTY
(包含所有折叠的属性行)=PROPERTY_NAMES
(df2)。这将确保我们只获得总和所需的所有行。然后,我执行一个groupBy
agg
以选择所有必需的列,并将我们的总和作为TOTAL_VALUE.
from pyspark.sql import functions as F
df1.withColumn("PROPERTIES",
F.explode(F.arrays_zip(F.array([F.array(F.lit(x),F.col(x)) for x in df1.columns if x.startswith("PROPERTY_")]))))
.select("EVENT_INDEX", "EVENT_TIME","PROPERTIES.*",
*[x for x in df1.columns if x.startswith("PROPERTY_")]).withColumn("PROPERTY", F.element_at("0",1))
.withColumn("PROPERTY_VALUE", F.element_at("0",2)).drop("0")
.filter('PROPERTY_VALUE=1').join(df2, (df1.EVENT_TIME>=df2.START_TIME) & (df1.EVENT_TIME<=df2.END_TIME)&
(F.col("PROPERTY")==df2.PROPERTY_NAME)).groupBy("EVENT_INDEX").agg(F.first("EVENT_TIME").alias("EVENT_TIME"),
*[F.first(x).alias(x) for x in df1.columns if x.startswith("PROPERTY_")],
(F.sum("VALUE").alias("TOTAL_VALUE"))).orderBy("EVENT_TIME").show()
+-----------+----------+----------+----------+----------+-----------+
|EVENT_INDEX|EVENT_TIME|PROPERTY_1|PROPERTY_2|PROPERTY_3|TOTAL_VALUE|
+-----------+----------+----------+----------+----------+-----------+
| Event_1| 13| 1| 0| 1| 0.6|
| Event_2| 24| 0| 1| 1| 0.8|
| Event_3| 35| 1| 0| 0| 0.8|
+-----------+----------+----------+----------+----------+-----------+