我目前正试图根据下面' keys '列中的键的有序数组提取下面'keyValue'列中的值。
>>> df.select('reference_nbr', 'keyValue', 'Keys').show()
+--------------+-------------------------------------------------------------+---------------------+
| ref_number| keyValue| Keys|
+--------------+-------------------------------------------------------------+---------------------+
| AZQ5|{key39=[TBAX3, TBAX6, TBAXN], key46=[TBARO, TBAZ4, TBABN],...|[key1, key2, key3,...|
| NXY3|{key39=[TBAX8, TBAX2, TBAXZ], key46=[TBARD, TBAZK, TBAX9],...|[key1, key2, key3,...|
| QSW6|{key39=[TBAX5, TBAX3, TBAX8], key46=[TBARB, TBAZN, TBAX4],...|[key1, key2, key3,...|
| LJB7|{key39=[TBAX3, TBAXN, TBAXL], key46=[TBARM, TBAZ2, TBAX3],...|[key1, key2, key3,...|
| MKH9|{key39=[TBAX4, TBAX9, TBAXV], key46=[TBARB, TBAZB, TBAX1],...|[key1, key2, key3,...|
| UFG1|{key39=[TBAX3, TBAX6, TBAXQ], key46=[TBARL, TBAZB, TBAX0],...|[key1, key2, key3,...|
| WDE4|{key39=[TBAX6, TBAX7, TBAX9], key46=[TBARX, TBAX6, TBAX8],...|[key1, key2, key3,...|
| VRX8|{key39=[TBAX3, TBAX1, TBAX0], key46=[TBARQ, TBAX9, TBAX3],...|[key1, key2, key3,...|
| CIZ2|{key39=[TBAX3, TBAXC, TBAX2], key46=[TBARA, TBAXQ, TBAX1],...|[key1, key2, key3,...|
| BEO3|{key39=[TBAX9, TBAXQ, TBAX4], key46=[TBARP, TBAXV, TBAX2],...|[key1, key2, key3,...|
+--------------+-------------------------------------------------------------+---------------------+
only showing top 20 rows
如果我应用下面的UDF和withColumn()步骤,我可以很容易地根据一个特定的键查询'keyValue'列,并将键值数组插入到一个新列中。
getKey4 = udf(lambda ar1: ar1.get('key4'))
df = df.withColumn("key4Values", getKey4(df["keyValue"]))
我试图执行与上面相同的步骤,但对于每个键在"键"列的顺序。期望输出值:
+--------------+-------------------------------------------------------------+---------------------+------------------------------------------------+
| ref_number| keyValue| Keys| Values|
+--------------+-------------------------------------------------------------+---------------------+------------------------------------------------+
| AZQ5|{key39=[TBAX3, TBAX6, TBAXN], key46=[TBARO, TBAZ4, TBABN],...|[key1, key2, key3,...|[[TBAX4, TBAXQ, TBAXD],[TBAR1, TBAZA, TBABW],...|
| NXY3|{key39=[TBAX8, TBAX2, TBAXZ], key46=[TBARD, TBAZK, TBAX9],...|[key1, key2, key3,...|[[TBAX5, TBAXA, TBAXC],[TBAR2, TBAZS, TBABE],...|
| QSW6|{key39=[TBAX5, TBAX3, TBAX8], key46=[TBARB, TBAZN, TBAX4],...|[key1, key2, key3,...|[[TBAX6, TBAXZ, TBAXF],[TBAR3, TBAZD, TBABR],...|
| LJB7|{key39=[TBAX3, TBAXN, TBAXL], key46=[TBARM, TBAZ2, TBAX3],...|[key1, key2, key3,...|[[TBAX7, TBAXC, TBAXG],[TBAR4, TBAZF, TBABT],...|
| MKH9|{key39=[TBAX4, TBAX9, TBAXV], key46=[TBARB, TBAZB, TBAX1],...|[key1, key2, key3,...|[[TBAX8, TBAXV, TBAXH],[TBAR5, TBAZG, TBABY],...|
| UFG1|{key39=[TBAX3, TBAX6, TBAXQ], key46=[TBARL, TBAZB, TBAX0],...|[key1, key2, key3,...|[[TBAX9, TBAXB, TBAXJ],[TBAR6, TBAZH, TBABU],...|
| WDE4|{key39=[TBAX6, TBAX7, TBAX9], key46=[TBARX, TBAX6, TBAX8],...|[key1, key2, key3,...|[[TBAX0, TBAXN, TBAXK],[TBAR7, TBAZJ, TBABI],...|
| VRX8|{key39=[TBAX3, TBAX1, TBAX0], key46=[TBARQ, TBAX9, TBAX3],...|[key1, key2, key3,...|[[TBAX2, TBAXM, TBAXL],[TBAR8, TBAZK, TBABO],...|
| CIZ2|{key39=[TBAX3, TBAXC, TBAX2], key46=[TBARA, TBAXQ, TBAX1],...|[key1, key2, key3,...|[[TBAX3, TBAXA, TBAXO],[TBAR9, TBAZL, TBABP],...|
| BEO3|{key39=[TBAX9, TBAXQ, TBAX4], key46=[TBARP, TBAXV, TBAX2],...|[key1, key2, key3,...|[[TBAX1, TBAXS, TBAXI],[TBAR0, TBAZQ, TBABZ],...|
+--------------+-------------------------------------------------------------+---------------------+------------------------------------------------+
我已经尝试了下面的方法,但我得到以下错误:
getKeyValues = udf(lambda ar1, ar2: {ar1.get(x) for x in ar2})
df.withColumn("Values", getKeyValues(df["keyValue"], df["Keys"])).show()
AttributeError: 'unicode' object has no attribute 'get'
我在下面尝试了一个不同的UDF,也给出了相同的unicode错误:
def getKeyVals(ar1, ar2):
arr = []
for x in ar2:
arr.append(ar1.get(x, None))
return arr
udf_split = udf(split, ArrayType(StringType()))
df.withColumn("test", udf_getKeyVals(df['keyValue'], df['Keys'])).show()
我也尝试了下面的函数,但我得到类似的错误。这个函数来自下面的muningdata网页
def working_fun(mapping):
def f(ar1):
for x in ar1:
return mapping.get(x)
return F.udf(f)
df.withColumn("test", working_fun(df["keyValue"])(F.col('Keys'))).show()
感谢任何提示或建议——谢谢!
已更新,包括架构和Spark版本详细信息
>>> df.select('reference_nbr', 'keyValue', 'Keys').schema.simpleString()
'struct<reference_nbr:string,keyValue:string,Keys:array<string>>'
>>> spark.version
u'2.3.2.3.1.5.6030-1'
如果将键值列从字符串类型转换为映射类型,则可以使用map_values函数提取值:
我使用UDF将键值中的=替换为:以便我们可以使用ast模块将类型更改为映射。
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.master('local[*]').getOrCreate()
def get_dict(c):
c = c.replace("=", ":")
import ast
dict_value = ast.literal_eval(c)
return dict_value
get_dict_udf = udf(lambda c: get_dict(c), MapType(StringType(), ArrayType(IntegerType())))
# Sample dataframe
df = spark.createDataFrame(
[('{"k3"= [6, 5, 4], "k1"= [4, 5, 1], "k8"= [8, 5, 6], "k5"= [7, 4, 3]}',
["k1", "k3", "k5", "k8"])]).toDF("keyvalue", "key")
df.withColumn("keyvalue", get_dict_udf("keyvalue")).
withColumn("values", sort_array(map_values("keyvalue"))).show(truncate=False)
+--------------------------------------------------------------------+----------------+--------------------------------------------+
|keyvalue |key |values |
+--------------------------------------------------------------------+----------------+--------------------------------------------+
|[k3 -> [6, 5, 4], k5 -> [7, 4, 3], k8 -> [8, 5, 6], k1 -> [4, 5, 1]]|[k1, k3, k5, k8]|[[4, 5, 1], [6, 5, 4], [7, 4, 3], [8, 5, 6]]|
+--------------------------------------------------------------------+----------------+--------------------------------------------+