如何在一行中LEFT JOIN两个结构数组



我正在处理从Facebook的Ads Insights API 中提取的"动作分解"数据

Facebook没有将action(购买次数(和action_value(购买金额(放在同一列中,所以我需要根据动作的标识符(在我的情况下是id#+设备类型(加入我端的那些。

如果每个操作都只是自己的一行,那么将它们与SQL连接起来当然是微不足道的。但在这种情况下,我需要将每行中的两个structs连接起来。我想要做的是在两个结构上生成一个LEFT JOIN,在两列上匹配。理想情况下,我可以单独使用SQL(而不是PySpark/Scala/等(来实现这一点。

到目前为止,我已经尝试过:

  • SparkSQLinline生成器。这让我可以在自己的行上执行每个操作,但由于原始数据集中的父行没有唯一的标识符,因此无法按行联接这些结构。还尝试在两列上使用inline(),但一次只能使用一个"生成器"函数
  • 使用SparkSQLarrays_zip函数将它们组合在一起。但这不起作用,因为顺序并不总是一样的,而且他们有时也没有相同的密钥
  • 我曾考虑在PySpark中编写一个map函数。但映射函数似乎只通过索引而不是名称来识别列,如果列稍后发生更改(可能是在使用第三方API时(,这似乎很脆弱
  • 我考虑过编写PySpark UDF,这似乎是最好的选择,但需要我没有的权限(SELECT on anonymous function(。如果这真的是最好的选择,我会努力争取许可

为了更好地说明:数据集中的每一行都有一个actionsaction_values列,其中的数据如下。

actions = [
{
"action_device": "desktop",
"action_type": "offsite_conversion.custom.123",
"value": "1"
},
{
"action_device": "desktop", /* Same conversion ID; different device. */
"action_type": "offsite_conversion.custom.321",
"value": "1"
},
{
"action_device": "iphone", /* Same conversion ID; different device. */
"action_type": "offsite_conversion.custom.321",
"value": "2"
}
{
"action_device": "iphone", /* has "actions" but not "actions_values" */
"action_type": "offsite_conversion.custom.789",
"value": "1"
},
]
action_values = [
{
"action_device": "desktop",
"action_type": "offsite_conversion.custom.123",
"value": "49.99"
},
{
"action_device": "desktop",
"action_type": "offsite_conversion.custom.321",
"value": "19.99"
},
{
"action_device": "iphone",
"action_type": "offsite_conversion.custom.321",
"value": "99.99"
}
]

我希望每一行在一个结构中都有两个数据点,如下所示:

my_desired_result = [
{
"action_device": "desktop",
"action_type": "offsite_conversion.custom.123",
"count": "1", /* This comes from the "action" struct */
"value": "49.99" /* This comes from the "action_values" struct */
},
{
"action_device": "desktop",
"action_type": "offsite_conversion.custom.321",
"count": "1",
"value": "19.99"
},
{
"action_device": "iphone",
"action_type": "offsite_conversion.custom.321",
"count": "2",
"value": "99.99"
},
{
"action_device": "iphone",
"action_type": "offsite_conversion.custom.789",
"count": "1",
"value": null /* NULL because there is no value for conversion#789 AND iphone */
}
]

IIUC,您可以尝试转换,然后使用filter通过匹配action_device和action_type:从action_values中找到第一个匹配项

df.printSchema()
root
|-- action_values: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- action_device: string (nullable = true)
|    |    |-- action_type: string (nullable = true)
|    |    |-- value: string (nullable = true)
|-- actions: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- action_device: string (nullable = true)
|    |    |-- action_type: string (nullable = true)
|    |    |-- value: string (nullable = true)
df.createOrReplaceTempView("df_table")
spark.sql("""
SELECT       
transform(actions, x -> named_struct(
'action_device', x.action_device,
'action_type', x.action_type,
'count', x.value,
'value', filter(action_values, y -> y.action_device = x.action_device AND y.action_type = x.action_type)[0].value
)) as result
FROM df_table
""").show(truncate=False)
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|result                                                                                                                                                                                                  |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[[desktop, offsite_conversion.custom.123, 1, 49.99], [desktop, offsite_conversion.custom.321, 1, 19.99], [iphone, offsite_conversion.custom.321, 2, 99.99], [iphone, offsite_conversion.custom.789, 1,]]|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

UPDATE:在FULLJOIN的情况下,您可以尝试以下SQL:

spark.sql("""
SELECT
concat(
/* actions left join action_values with potentially multiple matched values */
flatten(
transform(actions, x ->
transform(
filter(action_values, y -> y.action_device = x.action_device AND y.action_type = x.action_type),
z -> named_struct(
'action_device', x.action_device,
'action_type', x.action_type,
'count', x.value,
'value', z.value
)
)
)
),
/* action_values missing from actions */
transform(
filter(action_values, x -> !exists(actions, y -> x.action_device = y.action_device AND x.action_type = y.action_type)),
z -> named_struct(
'action_device', z.action_device,
'action_type', z.action_type,
'count', NULL,
'value', z.value
)
)
) as result
FROM df_table
""").show(truncate=False)

最新更新