从多列创建MAPTYPE字段-Spark SQL



我有一个用例,其中多个键以JSON格式分布在数据集上,需要将其聚合到一个合并的结果集中进行进一步处理。

我已经能够使用CCD_ 1(PySpark(&Spark SQL,但后者涉及更复合的&做这件事的迟缓,涉及到中间的对话,这些对话在未来可能会导致错误。

使用下面的代码段,有没有更好的方法可以使用Spark SQL,通过使用keyvalue创建MAP<STRING,ARRAY<STRING>来实现这一点?

数据准备

from pyspark.sql.types import *
import pandas as pd
from io import StringIO
s = StringIO("""
id|json_struct
1|{"a":["tyeqb","",""],"e":["qwrqc","",""]}
1|{"t":["sartq","",""],"r":["fsafsq","",""]}
1|{"b":["puhqiqh","",""],"e":["hjfsaj","",""]}
2|{"b":["basajhjwa","",""],"e":["asfafas","",""]}
2|{"n":["gaswq","",""],"r":["sar","",""],"l":["sar","",""],"s":["rqqrq","",""],"m":["wrqwrq","",""]}
2|{"s":["tqqwjh","",""],"t":["afs","",""],"l":["fsaafs","",""]}
""")
df = pd.read_csv(s,delimiter='|')
sparkDF = spark.createDataFrame(df)
sparkDF.registerTempTable("INPUT")
sparkDF = sparkDF.withColumn('json_struct', F.from_json(F.col('json_struct')
,schema=MapType(StringType(),ArrayType(StringType()),True)
))
sparkDF.show(truncate=False)
+---+---------------------------------------------------------------------------------------+
|id |json_struct                                                                            |
+---+---------------------------------------------------------------------------------------+
|1  |{a -> [tyeqb, , ], e -> [qwrqc, , ]}                                                   |
|1  |{t -> [sartq, , ], r -> [fsafsq, , ]}                                                  |
|1  |{b -> [puhqiqh, , ], e -> [hjfsaj, , ]}                                                |
|2  |{b -> [basajhjwa, , ], e -> [asfafas, , ]}                                             |
|2  |{n -> [gaswq, , ], r -> [sar, , ], l -> [sar, , ], s -> [rqqrq, , ], m -> [wrqwrq, , ]}|
|2  |{s -> [tqqwjh, , ], t -> [afs, , ], l -> [fsaafs, , ]}                                 |
+---+---------------------------------------------------------------------------------------+

Python API(PySpark(-实现

正如您所看到的,从explode得到的key本质上是STRING类型,并且由于PySpark具有Python API0,而Spark SQL中没有,因此它可以很容易地用于生成最终的json_struct列,以确保单个密钥具有可变长度的ARRAYTYPE<STRING>

sparkDF.select(
F.col('id')
,F.explode(F.col('json_struct'))
).withColumn('value',F.filter(F.col('value'), lambda x: x != '')
).withColumn('value',F.concat_ws(',', F.col('value'))
).groupBy('id', 'key'
).agg(F.collect_set(F.col('value')).alias('value')
).withColumn('json_struct',F.to_json(F.create_map("key","value"))
).orderBy('id'
).show(truncate=False)
+---+---+---------------+------------------------+
|id |key|value          |json_struct             |
+---+---+---------------+------------------------+
|1  |a  |[tyeqb]        |{"a":["tyeqb"]}         |
|1  |e  |[hjfsaj, qwrqc]|{"e":["hjfsaj","qwrqc"]}|
|1  |r  |[fsafsq]       |{"r":["fsafsq"]}        |
|1  |b  |[puhqiqh]      |{"b":["puhqiqh"]}       |
|1  |t  |[sartq]        |{"t":["sartq"]}         |
|2  |b  |[basajhjwa]    |{"b":["basajhjwa"]}     |
|2  |n  |[gaswq]        |{"n":["gaswq"]}         |
|2  |t  |[afs]          |{"t":["afs"]}           |
|2  |s  |[tqqwjh, rqqrq]|{"s":["tqqwjh","rqqrq"]}|
|2  |e  |[asfafas]      |{"e":["asfafas"]}       |
|2  |l  |[sar, fsaafs]  |{"l":["sar","fsaafs"]}  |
|2  |r  |[sar]          |{"r":["sar"]}           |
|2  |m  |[wrqwrq]       |{"m":["wrqwrq"]}        |
+---+---+---------------+------------------------+

Spark SQL-实现

在这个实现中,我必须采取额外的步骤来确保keyvalue列都是ARRAYTYPE,并且长度一致,因为map_from_arrays将数组作为输入。

有没有办法绕过这些,创建一个类似于使用Python API描述的模式

sql.sql("""
SELECT
id,
KEY,
VALUE,
TO_JSON(MAP_FROM_ARRAYS(KEY,VALUE)) as json_struct
FROM (
SELECT
id,
key,
ARRAY(COLLECT_SET( value )) as value -- <------- ### Ensuring Value is NESTED ARRAY
FROM (
SELECT
id,
SPLIT(k,'|',1) as key,   -- <------- ### Ensuring Key is Array
CONCAT_WS(',',FILTER(v,x -> x != '')) as value
FROM (
SELECT 
id,
EXPLODE(FROM_JSON(json_struct,'MAP<STRING,ARRAY<STRING>>')) as (k,v)
FROM INPUT
)
)
GROUP BY 1,2
)
ORDER BY 1
""").show(truncate=False)
+---+---+-----------------+------------------------+
|id |KEY|VALUE            |json_struct             |
+---+---+-----------------+------------------------+
|1  |[a]|[[tyeqb]]        |{"a":["tyeqb"]}         |
|1  |[e]|[[hjfsaj, qwrqc]]|{"e":["hjfsaj","qwrqc"]}|
|1  |[b]|[[puhqiqh]]      |{"b":["puhqiqh"]}       |
|1  |[r]|[[fsafsq]]       |{"r":["fsafsq"]}        |
|1  |[t]|[[sartq]]        |{"t":["sartq"]}         |
|2  |[n]|[[gaswq]]        |{"n":["gaswq"]}         |
|2  |[b]|[[basajhjwa]]    |{"b":["basajhjwa"]}     |
|2  |[t]|[[afs]]          |{"t":["afs"]}           |
|2  |[s]|[[tqqwjh, rqqrq]]|{"s":["tqqwjh","rqqrq"]}|
|2  |[e]|[[asfafas]]      |{"e":["asfafas"]}       |
|2  |[l]|[[sar, fsaafs]]  |{"l":["sar","fsaafs"]}  |
|2  |[r]|[[sar]]          |{"r":["sar"]}           |
|2  |[m]|[[wrqwrq]]       |{"m":["wrqwrq"]}        |
+---+---+-----------------+------------------------+

Spark SQL而不是create_map具有map。你的PySpark代码可以翻译成:

df = spark.sql("""
WITH
TBL2 (SELECT id, EXPLODE(FROM_JSON(json_struct,'MAP<STRING,ARRAY<STRING>>')) from INPUT),
TBL3 (SELECT id, key, FLATTEN(COLLECT_SET(FILTER(value, x -> x != ''))) value
FROM TBL2
GROUP BY id, key)
SELECT *, TO_JSON(MAP(key, value)) json_struct
FROM TBL3
""")
df.show(truncate=0)
# +---+---+---------------+------------------------+
# |id |key|value          |json_struct             |
# +---+---+---------------+------------------------+
# |1  |a  |[tyeqb]        |{"a":["tyeqb"]}         |
# |1  |e  |[qwrqc, hjfsaj]|{"e":["qwrqc","hjfsaj"]}|
# |1  |b  |[puhqiqh]      |{"b":["puhqiqh"]}       |
# |1  |r  |[fsafsq]       |{"r":["fsafsq"]}        |
# |1  |t  |[sartq]        |{"t":["sartq"]}         |
# |2  |b  |[basajhjwa]    |{"b":["basajhjwa"]}     |
# |2  |n  |[gaswq]        |{"n":["gaswq"]}         |
# |2  |s  |[rqqrq, tqqwjh]|{"s":["rqqrq","tqqwjh"]}|
# |2  |t  |[afs]          |{"t":["afs"]}           |
# |2  |e  |[asfafas]      |{"e":["asfafas"]}       |
# |2  |l  |[fsaafs, sar]  |{"l":["fsaafs","sar"]}  |
# |2  |r  |[sar]          |{"r":["sar"]}           |
# |2  |m  |[wrqwrq]       |{"m":["wrqwrq"]}        |
# +---+---+---------------+------------------------+

最新更新