使用复杂的条件逻辑(可能使用映射代替)连接 Pyspark 数据帧



下午好。

我正在尝试在 Pyspark 中执行连接,该连接使用一组复杂的条件来生成单个值。

我试图实现的目标的最小示例如下所示。想象一下一组可以在离散时间(t=0t=40之间)发生的事件。每个事件都有一组三个独立的布尔属性,用于描述事件的性质。查找表中包含一些与触发每个属性相关联的时间相关值。对于每个事件,我想确定该事件的所有相关值的总和。

我的第一个数据帧df_1是事件的列表,事件发生的时间,并具有与之关联的布尔属性选择:

+-------------+------------+------------+------------+------------+
| EVENT_INDEX | EVENT_TIME | PROPERTY_1 | PROPERTY_2 | PROPERTY_3 |
+-------------+------------+------------+------------+------------+
|   Event_1   |     13     |     1      |      0     |     1      |
|   Event_2   |     24     |     0      |      1     |     1      |
|   Event_3   |     35     |     1      |      0     |     0      |
+-------------+------------+------------+------------+------------+

第二个数据帧df_2是查找表,用于描述在特定时间为特定属性具有 TRUE 的关联值。由于所有时间段中有许多重复值,因此此数据帧的格式是属性具有特定值的包含时间范围。时间范围的大小不一致,并且可能因不同的属性而有很大差异:

+------------+----------+---------------+-------+
| START_TIME | END_TIME | PROPERTY_NAME | VALUE |
+------------+----------+---------------+-------+
|      0     |    18    |  PROPERTY_1   |  0.1  |
|     19     |    40    |  PROPERTY_1   |  0.8  |
|      0     |    20    |  PROPERTY_2   |  0.7  |
|     20     |    24    |  PROPERTY_2   |  0.3  |
|     25     |    40    |  PROPERTY_2   |  0.7  |
|      0     |    40    |  PROPERTY_3   |  0.5  |
+------------+----------+---------------+-------+

期望输出: 由于Event_1发生在时间t=13时,触发了PROPERTY_1PROPERTY_3,因此根据df_2的值的预期总和应为 0.1(从 0-18 存储桶PROPERTY_1)+ 0.5(从PROPERTY_30-40 存储桶)= 0.6。同样,Event_2的值应为 0.3(请记住,存储桶开始/结束时间包括在内,因此它来自 20-24 存储桶)+ 0.5 = 0.8。最后,Event_3= 0.8。

+-------------+------------+------------+------------+------------+-------------+
| EVENT_INDEX | EVENT_TIME | PROPERTY_1 | PROPERTY_2 | PROPERTY_3 | TOTAL_VALUE |
+-------------+------------+------------+------------+------------+-------------+
|   Event_1   |     13     |     1      |      0     |     1      |     0.6     |
|   Event_2   |     24     |     0      |      1     |     1      |     0.8     |
|   Event_3   |     35     |     1      |      0     |     0      |     0.8     |
+-------------+------------+------------+------------+------------+-------------+

对于我的初始测试数据集,在事件数据帧df_1中有 ~20,000 个事件分布在 2000 个时间段内。每个事件有 ~44 个属性,查找表df_2的长度为 ~53,000。由于我想将此过程扩展到更多的数据(可能有几个数量级),因此我对此问题的可并行解决方案非常感兴趣。例如,我觉得将df_2总结为 python 字典并将其广播给我的执行者是不可能的,因为数据量很大。

由于我尝试在df_1中的每一行添加一列,因此我尝试使用类似于以下内容的嵌套映射来完成任务:

def calculate_value(df_2):
def _calculate_value(row):
row_dict = row.asDict()
rolling_value = 0.0
for property_name in [key for key in row_dict.keys() if "PROPERTY" in key]:
additional_value = (
df_2
.filter(
(pyspark.sql.functions.col("PROPERTY_NAME") == property_name)
& (pyspark.sql.functions.col("START_BUCKET") <= row_dict["EVENT_TIME"])
& (pyspark.sql.functions.col("END_BUCKET") >= row_dict["EVENT_TIME"])
)
.select("VALUE")
.collect()
)[0][0]
rolling_value += additional_value
return pyspark.sql.Row(**row_dict)
return _calculate_value

这段代码能够在驱动程序上执行连接(通过运行calculate_value(df_2)(df_1.rdd.take(1)[0])),但是当我尝试执行并行化映射时:

(
df_1
.rdd
.map(calculate_value(df_2))
)

我收到一个 Py4JError 指示它无法将数据帧对象锯齿化df_2。这在 StackOverflow 的其他地方得到了验证,例如 Pyspark: PicklingError: 无法序列化对象:。

我选择使用映射而不是连接,因为我在df_1中向每一行添加一列,并且考虑到编码识别正确行所需的复杂逻辑的困难,df_2为每个给定事件相加(首先,检查哪些属性触发并在df_1中为 TRUE, 然后在df_2中选择这些属性,向下选择仅给定事件时间相关的属性和值,然后将所有事件相加)。

我正在尝试想一种方法以可持续、可扩展的方式重新配置df_2,以允许更简单的连接/映射,但我不确定如何最好地做到这一点。

任何建议将不胜感激。

Sample DataFrames:

df1.show()
+-----------+----------+----------+----------+----------+
|EVENT_INDEX|EVENT_TIME|PROPERTY_1|PROPERTY_2|PROPERTY_3|
+-----------+----------+----------+----------+----------+
|    Event_1|        13|         1|         0|         1|
|    Event_2|        24|         0|         1|         1|
|    Event_3|        35|         1|         0|         0|
+-----------+----------+----------+----------+----------+
df2.show()
+----------+--------+-------------+-----+
|START_TIME|END_TIME|PROPERTY_NAME|VALUE|
+----------+--------+-------------+-----+
|         0|      18|   PROPERTY_1|  0.1|
|        19|      40|   PROPERTY_1|  0.8|
|         0|      20|   PROPERTY_2|  0.7|
|        20|      24|   PROPERTY_2|  0.3|
|        25|      40|   PROPERTY_2|  0.7|
|         0|      40|   PROPERTY_3|  0.5|
+----------+--------+-------------+-----+

这适用于使用DataframeAPISpark2.4+(非常可扩展,因为它只使用内置函数,并且对于尽可能多的属性列是动态的)

只要属性列以'PROPERTY_'开头,它就可以适用于尽可能多的属性,因为它对它们来说是动态的。首先,我将使用arrays_ziparrayexplode将所有属性列折叠成包含 2 列的行,使用element_at为我们提供PROPERY_NAME,PROPERTY_VALUEjoin之前,我将过滤以仅保留PROPERY_VALUE=1的所有行。联接将在range of time上进行,其中PROPERTY(包含所有折叠的属性行)=PROPERTY_NAMES(df2)。这将确保我们只获得总和所需的所有行。然后,我执行一个groupByagg以选择所有必需的列,并将我们的总和作为TOTAL_VALUE.

from pyspark.sql import functions as F
df1.withColumn("PROPERTIES",
F.explode(F.arrays_zip(F.array([F.array(F.lit(x),F.col(x)) for x in df1.columns if x.startswith("PROPERTY_")]))))
.select("EVENT_INDEX", "EVENT_TIME","PROPERTIES.*",
*[x for x in df1.columns if x.startswith("PROPERTY_")]).withColumn("PROPERTY", F.element_at("0",1))
.withColumn("PROPERTY_VALUE", F.element_at("0",2)).drop("0")
.filter('PROPERTY_VALUE=1').join(df2, (df1.EVENT_TIME>=df2.START_TIME) & (df1.EVENT_TIME<=df2.END_TIME)& 
(F.col("PROPERTY")==df2.PROPERTY_NAME)).groupBy("EVENT_INDEX").agg(F.first("EVENT_TIME").alias("EVENT_TIME"),
*[F.first(x).alias(x) for x in df1.columns if x.startswith("PROPERTY_")],
(F.sum("VALUE").alias("TOTAL_VALUE"))).orderBy("EVENT_TIME").show()
+-----------+----------+----------+----------+----------+-----------+
|EVENT_INDEX|EVENT_TIME|PROPERTY_1|PROPERTY_2|PROPERTY_3|TOTAL_VALUE|
+-----------+----------+----------+----------+----------+-----------+
|    Event_1|        13|         1|         0|         1|        0.6|
|    Event_2|        24|         0|         1|         1|        0.8|
|    Event_3|        35|         1|         0|         0|        0.8|
+-----------+----------+----------+----------+----------+-----------+

最新更新