如何在PySpark中基于Python dict重新映射和过滤MapType键



我有一个PySpark df,它的模式是:

root
|-- name: string (nullable = true)
|-- products: struct (nullable = true)
|    |-- product_hist: map (nullable = true)
|    |    |-- key: string
|    |    |-- value: integer (valueContainsNull = true)
|    |-- tot_visits: long (nullable = true)

一行示例:

Mary, {{A -> 2000, B -> 100, C -> 250}, 4}

给定一个python dict

my_dict = {'A': 1, 'C': 2}

我想使用Python dict更改MapType字段中的键,并过滤掉dict中没有的任何键。然后我会得到:

Mary, {{1 -> 2000, 2 -> 250}, 4} 

最好的方法是什么?

从Spark 3.1中,可以使用transform_keys更改映射键,使用map_filter过滤掉不需要的映射键。此外,withField。所有这些都在这个代码中使用。

from pyspark.sql import functions as F
df = spark.createDataFrame(
[('Mary', ({'A': 2000, 'B': 100, 'C': 250}, 4))],
'name string, products struct<product_hist:map<string,bigint>,tot_visits:bigint>')
my_dict = {'A': 1, 'C': 2}
map_col = F.create_map([F.lit(x) for i in my_dict.items() for x in i])
df = df.withColumn(
'products',
F.col('products').withField(
'product_hist',
F.transform_keys(
F.map_filter(
'products.product_hist',
lambda k, v: k.isin([*my_dict.keys()])
),
lambda k, v: map_col[k]
)
)
)
df.show(truncate=0)
# +----+--------------------------+
# |name|products                  |
# +----+--------------------------+
# |Mary|{{1 -> 2000, 2 -> 250}, 4}|
# +----+--------------------------+

效率很高:

df.explain()
# == Physical Plan ==
# Project [name#469, if (isnull(products#470)) null else named_struct(product_hist, transform_keys(map_filter(products#470.product_hist, lambdafunction(lambda x_50#475 IN (A,C), lambda x_50#475, lambda y_51#476L, false)), lambdafunction(map(keys: [A,C], values: [1,2])[lambda x_52#477], lambda x_52#477, lambda y_53#478L, false)), tot_visits, products#470.tot_visits) AS products#473]
# +- *(1) Scan ExistingRDD[name#469,products#470]

最新更新