根据键的单独数组列解析键列表字典的PySpark字符串列



我目前正试图根据下面' keys '列中的键的有序数组提取下面'keyValue'列中的值。

>>> df.select('reference_nbr', 'keyValue', 'Keys').show()
+--------------+-------------------------------------------------------------+---------------------+
|    ref_number|                                                     keyValue|                 Keys|
+--------------+-------------------------------------------------------------+---------------------+
|          AZQ5|{key39=[TBAX3, TBAX6, TBAXN], key46=[TBARO, TBAZ4, TBABN],...|[key1, key2, key3,...|
|          NXY3|{key39=[TBAX8, TBAX2, TBAXZ], key46=[TBARD, TBAZK, TBAX9],...|[key1, key2, key3,...|
|          QSW6|{key39=[TBAX5, TBAX3, TBAX8], key46=[TBARB, TBAZN, TBAX4],...|[key1, key2, key3,...|
|          LJB7|{key39=[TBAX3, TBAXN, TBAXL], key46=[TBARM, TBAZ2, TBAX3],...|[key1, key2, key3,...|
|          MKH9|{key39=[TBAX4, TBAX9, TBAXV], key46=[TBARB, TBAZB, TBAX1],...|[key1, key2, key3,...|
|          UFG1|{key39=[TBAX3, TBAX6, TBAXQ], key46=[TBARL, TBAZB, TBAX0],...|[key1, key2, key3,...|
|          WDE4|{key39=[TBAX6, TBAX7, TBAX9], key46=[TBARX, TBAX6, TBAX8],...|[key1, key2, key3,...|
|          VRX8|{key39=[TBAX3, TBAX1, TBAX0], key46=[TBARQ, TBAX9, TBAX3],...|[key1, key2, key3,...|
|          CIZ2|{key39=[TBAX3, TBAXC, TBAX2], key46=[TBARA, TBAXQ, TBAX1],...|[key1, key2, key3,...|
|          BEO3|{key39=[TBAX9, TBAXQ, TBAX4], key46=[TBARP, TBAXV, TBAX2],...|[key1, key2, key3,...|
+--------------+-------------------------------------------------------------+---------------------+
only showing top 20 rows

如果我应用下面的UDF和withColumn()步骤,我可以很容易地根据一个特定的键查询'keyValue'列,并将键值数组插入到一个新列中。

getKey4 = udf(lambda ar1: ar1.get('key4'))
df = df.withColumn("key4Values", getKey4(df["keyValue"]))

我试图执行与上面相同的步骤,但对于每个键在"键"列的顺序。期望输出值:

+--------------+-------------------------------------------------------------+---------------------+------------------------------------------------+
|    ref_number|                                                     keyValue|                 Keys|                                          Values|
+--------------+-------------------------------------------------------------+---------------------+------------------------------------------------+
|          AZQ5|{key39=[TBAX3, TBAX6, TBAXN], key46=[TBARO, TBAZ4, TBABN],...|[key1, key2, key3,...|[[TBAX4, TBAXQ, TBAXD],[TBAR1, TBAZA, TBABW],...|
|          NXY3|{key39=[TBAX8, TBAX2, TBAXZ], key46=[TBARD, TBAZK, TBAX9],...|[key1, key2, key3,...|[[TBAX5, TBAXA, TBAXC],[TBAR2, TBAZS, TBABE],...|
|          QSW6|{key39=[TBAX5, TBAX3, TBAX8], key46=[TBARB, TBAZN, TBAX4],...|[key1, key2, key3,...|[[TBAX6, TBAXZ, TBAXF],[TBAR3, TBAZD, TBABR],...|
|          LJB7|{key39=[TBAX3, TBAXN, TBAXL], key46=[TBARM, TBAZ2, TBAX3],...|[key1, key2, key3,...|[[TBAX7, TBAXC, TBAXG],[TBAR4, TBAZF, TBABT],...|
|          MKH9|{key39=[TBAX4, TBAX9, TBAXV], key46=[TBARB, TBAZB, TBAX1],...|[key1, key2, key3,...|[[TBAX8, TBAXV, TBAXH],[TBAR5, TBAZG, TBABY],...|
|          UFG1|{key39=[TBAX3, TBAX6, TBAXQ], key46=[TBARL, TBAZB, TBAX0],...|[key1, key2, key3,...|[[TBAX9, TBAXB, TBAXJ],[TBAR6, TBAZH, TBABU],...|
|          WDE4|{key39=[TBAX6, TBAX7, TBAX9], key46=[TBARX, TBAX6, TBAX8],...|[key1, key2, key3,...|[[TBAX0, TBAXN, TBAXK],[TBAR7, TBAZJ, TBABI],...|
|          VRX8|{key39=[TBAX3, TBAX1, TBAX0], key46=[TBARQ, TBAX9, TBAX3],...|[key1, key2, key3,...|[[TBAX2, TBAXM, TBAXL],[TBAR8, TBAZK, TBABO],...|
|          CIZ2|{key39=[TBAX3, TBAXC, TBAX2], key46=[TBARA, TBAXQ, TBAX1],...|[key1, key2, key3,...|[[TBAX3, TBAXA, TBAXO],[TBAR9, TBAZL, TBABP],...|
|          BEO3|{key39=[TBAX9, TBAXQ, TBAX4], key46=[TBARP, TBAXV, TBAX2],...|[key1, key2, key3,...|[[TBAX1, TBAXS, TBAXI],[TBAR0, TBAZQ, TBABZ],...|
+--------------+-------------------------------------------------------------+---------------------+------------------------------------------------+

我已经尝试了下面的方法,但我得到以下错误:

getKeyValues = udf(lambda ar1, ar2: {ar1.get(x) for x in ar2})
df.withColumn("Values", getKeyValues(df["keyValue"], df["Keys"])).show()
AttributeError: 'unicode' object has no attribute 'get'

我在下面尝试了一个不同的UDF,也给出了相同的unicode错误:

def getKeyVals(ar1, ar2): 
arr = []
for x in ar2: 
arr.append(ar1.get(x, None))
return arr 

udf_split = udf(split, ArrayType(StringType()))
df.withColumn("test", udf_getKeyVals(df['keyValue'], df['Keys'])).show()

我也尝试了下面的函数,但我得到类似的错误。这个函数来自下面的muningdata网页

def working_fun(mapping):
def f(ar1):
for x in ar1:
return mapping.get(x)
return F.udf(f)
df.withColumn("test", working_fun(df["keyValue"])(F.col('Keys'))).show()

感谢任何提示或建议——谢谢!

已更新,包括架构和Spark版本详细信息

>>> df.select('reference_nbr', 'keyValue', 'Keys').schema.simpleString()
'struct<reference_nbr:string,keyValue:string,Keys:array<string>>'
>>> spark.version
u'2.3.2.3.1.5.6030-1'

如果将键值列从字符串类型转换为映射类型,则可以使用map_values函数提取值:

我使用UDF将键值中的=替换为:以便我们可以使用ast模块将类型更改为映射。

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.master('local[*]').getOrCreate()

def get_dict(c):
c = c.replace("=", ":")
import ast
dict_value = ast.literal_eval(c)
return dict_value

get_dict_udf = udf(lambda c: get_dict(c), MapType(StringType(), ArrayType(IntegerType())))
# Sample dataframe
df = spark.createDataFrame(
[('{"k3"= [6, 5, 4], "k1"= [4, 5, 1], "k8"= [8, 5, 6], "k5"= [7, 4, 3]}',
["k1", "k3", "k5", "k8"])]).toDF("keyvalue", "key")
df.withColumn("keyvalue", get_dict_udf("keyvalue")). 
withColumn("values", sort_array(map_values("keyvalue"))).show(truncate=False)
+--------------------------------------------------------------------+----------------+--------------------------------------------+
|keyvalue                                                            |key             |values                                      |
+--------------------------------------------------------------------+----------------+--------------------------------------------+
|[k3 -> [6, 5, 4], k5 -> [7, 4, 3], k8 -> [8, 5, 6], k1 -> [4, 5, 1]]|[k1, k3, k5, k8]|[[4, 5, 1], [6, 5, 4], [7, 4, 3], [8, 5, 6]]|
+--------------------------------------------------------------------+----------------+--------------------------------------------+

最新更新