我有一个用例,其中多个键以JSON格式分布在数据集上,需要将其聚合到一个合并的结果集中进行进一步处理。
我已经能够使用CCD_ 1(PySpark(&Spark SQL
,但后者涉及更复合的&做这件事的迟缓,涉及到中间的对话,这些对话在未来可能会导致错误。
使用下面的代码段,有没有更好的方法可以使用Spark SQL
,通过使用key
和value
创建MAP<STRING,ARRAY<STRING>
来实现这一点?
数据准备
from pyspark.sql.types import *
import pandas as pd
from io import StringIO
s = StringIO("""
id|json_struct
1|{"a":["tyeqb","",""],"e":["qwrqc","",""]}
1|{"t":["sartq","",""],"r":["fsafsq","",""]}
1|{"b":["puhqiqh","",""],"e":["hjfsaj","",""]}
2|{"b":["basajhjwa","",""],"e":["asfafas","",""]}
2|{"n":["gaswq","",""],"r":["sar","",""],"l":["sar","",""],"s":["rqqrq","",""],"m":["wrqwrq","",""]}
2|{"s":["tqqwjh","",""],"t":["afs","",""],"l":["fsaafs","",""]}
""")
df = pd.read_csv(s,delimiter='|')
sparkDF = spark.createDataFrame(df)
sparkDF.registerTempTable("INPUT")
sparkDF = sparkDF.withColumn('json_struct', F.from_json(F.col('json_struct')
,schema=MapType(StringType(),ArrayType(StringType()),True)
))
sparkDF.show(truncate=False)
+---+---------------------------------------------------------------------------------------+
|id |json_struct |
+---+---------------------------------------------------------------------------------------+
|1 |{a -> [tyeqb, , ], e -> [qwrqc, , ]} |
|1 |{t -> [sartq, , ], r -> [fsafsq, , ]} |
|1 |{b -> [puhqiqh, , ], e -> [hjfsaj, , ]} |
|2 |{b -> [basajhjwa, , ], e -> [asfafas, , ]} |
|2 |{n -> [gaswq, , ], r -> [sar, , ], l -> [sar, , ], s -> [rqqrq, , ], m -> [wrqwrq, , ]}|
|2 |{s -> [tqqwjh, , ], t -> [afs, , ], l -> [fsaafs, , ]} |
+---+---------------------------------------------------------------------------------------+
Python API(PySpark(-实现
正如您所看到的,从explode
得到的key
本质上是STRING
类型,并且由于PySpark具有Python API
0,而Spark SQL
中没有,因此它可以很容易地用于生成最终的json_struct
列,以确保单个密钥具有可变长度的ARRAYTYPE<STRING>
值
sparkDF.select(
F.col('id')
,F.explode(F.col('json_struct'))
).withColumn('value',F.filter(F.col('value'), lambda x: x != '')
).withColumn('value',F.concat_ws(',', F.col('value'))
).groupBy('id', 'key'
).agg(F.collect_set(F.col('value')).alias('value')
).withColumn('json_struct',F.to_json(F.create_map("key","value"))
).orderBy('id'
).show(truncate=False)
+---+---+---------------+------------------------+
|id |key|value |json_struct |
+---+---+---------------+------------------------+
|1 |a |[tyeqb] |{"a":["tyeqb"]} |
|1 |e |[hjfsaj, qwrqc]|{"e":["hjfsaj","qwrqc"]}|
|1 |r |[fsafsq] |{"r":["fsafsq"]} |
|1 |b |[puhqiqh] |{"b":["puhqiqh"]} |
|1 |t |[sartq] |{"t":["sartq"]} |
|2 |b |[basajhjwa] |{"b":["basajhjwa"]} |
|2 |n |[gaswq] |{"n":["gaswq"]} |
|2 |t |[afs] |{"t":["afs"]} |
|2 |s |[tqqwjh, rqqrq]|{"s":["tqqwjh","rqqrq"]}|
|2 |e |[asfafas] |{"e":["asfafas"]} |
|2 |l |[sar, fsaafs] |{"l":["sar","fsaafs"]} |
|2 |r |[sar] |{"r":["sar"]} |
|2 |m |[wrqwrq] |{"m":["wrqwrq"]} |
+---+---+---------------+------------------------+
Spark SQL-实现
在这个实现中,我必须采取额外的步骤来确保key
和value
列都是ARRAYTYPE
,并且长度一致,因为map_from_arrays
将数组作为输入。
有没有办法绕过这些,创建一个类似于使用Python API
描述的模式
sql.sql("""
SELECT
id,
KEY,
VALUE,
TO_JSON(MAP_FROM_ARRAYS(KEY,VALUE)) as json_struct
FROM (
SELECT
id,
key,
ARRAY(COLLECT_SET( value )) as value -- <------- ### Ensuring Value is NESTED ARRAY
FROM (
SELECT
id,
SPLIT(k,'|',1) as key, -- <------- ### Ensuring Key is Array
CONCAT_WS(',',FILTER(v,x -> x != '')) as value
FROM (
SELECT
id,
EXPLODE(FROM_JSON(json_struct,'MAP<STRING,ARRAY<STRING>>')) as (k,v)
FROM INPUT
)
)
GROUP BY 1,2
)
ORDER BY 1
""").show(truncate=False)
+---+---+-----------------+------------------------+
|id |KEY|VALUE |json_struct |
+---+---+-----------------+------------------------+
|1 |[a]|[[tyeqb]] |{"a":["tyeqb"]} |
|1 |[e]|[[hjfsaj, qwrqc]]|{"e":["hjfsaj","qwrqc"]}|
|1 |[b]|[[puhqiqh]] |{"b":["puhqiqh"]} |
|1 |[r]|[[fsafsq]] |{"r":["fsafsq"]} |
|1 |[t]|[[sartq]] |{"t":["sartq"]} |
|2 |[n]|[[gaswq]] |{"n":["gaswq"]} |
|2 |[b]|[[basajhjwa]] |{"b":["basajhjwa"]} |
|2 |[t]|[[afs]] |{"t":["afs"]} |
|2 |[s]|[[tqqwjh, rqqrq]]|{"s":["tqqwjh","rqqrq"]}|
|2 |[e]|[[asfafas]] |{"e":["asfafas"]} |
|2 |[l]|[[sar, fsaafs]] |{"l":["sar","fsaafs"]} |
|2 |[r]|[[sar]] |{"r":["sar"]} |
|2 |[m]|[[wrqwrq]] |{"m":["wrqwrq"]} |
+---+---+-----------------+------------------------+
Spark SQL而不是create_map
具有map
。你的PySpark代码可以翻译成:
df = spark.sql("""
WITH
TBL2 (SELECT id, EXPLODE(FROM_JSON(json_struct,'MAP<STRING,ARRAY<STRING>>')) from INPUT),
TBL3 (SELECT id, key, FLATTEN(COLLECT_SET(FILTER(value, x -> x != ''))) value
FROM TBL2
GROUP BY id, key)
SELECT *, TO_JSON(MAP(key, value)) json_struct
FROM TBL3
""")
df.show(truncate=0)
# +---+---+---------------+------------------------+
# |id |key|value |json_struct |
# +---+---+---------------+------------------------+
# |1 |a |[tyeqb] |{"a":["tyeqb"]} |
# |1 |e |[qwrqc, hjfsaj]|{"e":["qwrqc","hjfsaj"]}|
# |1 |b |[puhqiqh] |{"b":["puhqiqh"]} |
# |1 |r |[fsafsq] |{"r":["fsafsq"]} |
# |1 |t |[sartq] |{"t":["sartq"]} |
# |2 |b |[basajhjwa] |{"b":["basajhjwa"]} |
# |2 |n |[gaswq] |{"n":["gaswq"]} |
# |2 |s |[rqqrq, tqqwjh]|{"s":["rqqrq","tqqwjh"]}|
# |2 |t |[afs] |{"t":["afs"]} |
# |2 |e |[asfafas] |{"e":["asfafas"]} |
# |2 |l |[fsaafs, sar] |{"l":["fsaafs","sar"]} |
# |2 |r |[sar] |{"r":["sar"]} |
# |2 |m |[wrqwrq] |{"m":["wrqwrq"]} |
# +---+---+---------------+------------------------+