使用Spark 1.6,我有一个Spark DataFrame column
(命名为col1
(,其值为A,B,C,DS,DNS,E,F,G和H。我想创建一个新列(比如col2
(,其中包含下面dict
的值。我该如何映射?(例如,"A"需要映射到"S"等(
dict = {'A': 'S', 'B': 'S', 'C': 'S', 'DS': 'S', 'DNS': 'S', 'E': 'NS', 'F': 'NS', 'G': 'NS', 'H': 'NS'}
UDF 的低效解决方案(与版本无关(:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
def translate(mapping):
def translate_(col):
return mapping.get(col)
return udf(translate_, StringType())
df = sc.parallelize([('DS', ), ('G', ), ('INVALID', )]).toDF(['key'])
mapping = {
'A': 'S', 'B': 'S', 'C': 'S', 'DS': 'S', 'DNS': 'S',
'E': 'NS', 'F': 'NS', 'G': 'NS', 'H': 'NS'}
df.withColumn("value", translate(mapping)("key"))
结果是:
+-------+-----+
| key|value|
+-------+-----+
| DS| S|
| G| NS|
|INVALID| null|
+-------+-----+
更有效(Spark>= 2.0,Spark <3.0(是创建一个MapType
文字:
from pyspark.sql.functions import col, create_map, lit
from itertools import chain
mapping_expr = create_map([lit(x) for x in chain(*mapping.items())])
df.withColumn("value", mapping_expr.getItem(col("key")))
结果相同:
+-------+-----+
| key|value|
+-------+-----+
| DS| S|
| G| NS|
|INVALID| null|
+-------+-----+
但更有效的执行计划:
== Physical Plan ==
*Project [key#15, keys: [B,DNS,DS,F,E,H,C,G,A], values: [S,S,S,NS,NS,NS,S,NS,S][key#15] AS value#53]
+- Scan ExistingRDD[key#15]
与UDF版本相比:
== Physical Plan ==
*Project [key#15, pythonUDF0#61 AS value#57]
+- BatchEvalPython [translate_(key#15)], [key#15, pythonUDF0#61]
+- Scan ExistingRDD[key#15]
在Spark>= 3.0中,getItem
应替换为__getitem__
([]
(,即:
from pyspark.sql.functions import col, create_map, lit
from itertools import chain
mapping_expr = create_map([lit(x) for x in chain(*mapping.items())])
df.withColumn("value", mapping_expr[col("key")])
听起来最简单的解决方案是使用 replace 函数:http://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.replace
mapping= {
'A': '1',
'B': '2'
}
df2 = df.replace(to_replace=mapping, subset=['yourColName'])
如果没有itertools
导入,列表推导可以很好地处理它。
地图来自字典:
F.create_map([F.lit(x) for i in dic.items() for x in i])
提取值:
F.create_map([F.lit(x) for i in dic.items() for x in i])[F.col('col1')]
<小时 />全面测试:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[('A',), ('E',), ('INVALID',)],
['col1']
)
dic = {'A': 'S', 'B': 'S', 'E': 'NS'}
map_col = F.create_map([F.lit(x) for i in dic.items() for x in i])
df = df.withColumn('col2', map_col[F.col('col1')])
df.show()
# +-------+----+
# | col1|col2|
# +-------+----+
# | A| S|
# | E| NS|
# |INVALID|null|
# +-------+----+
如果要从嵌套字典创建地图列,可以使用以下内容:
def create_map(d,):
if type(d) != dict:
return F.lit(d)
level_map = []
for k in d:
level_map.append(F.lit(k))
level_map.append(create_map(d[k]))
return F.create_map(level_map)
d = {'a': 1, 'b': {'c': 2, 'd': 'blah'}}
print(create_map(d)) # <- Column<b'map(a, 1, b, map(c, 2, d, blah))'>
在Spark SQL中将字典转换为大小写语法的函数
func_mapper = lambda dic,col,default : f"(CASE {col} WHEN " + " WHEN ".join([ f"'{k}' THEN '{v}'" for (k,v) in dic.items() ]) + f" ELSE '{default}' END)"
如果有人也需要映射空值,接受的答案对我不起作用。映射类型的问题是它无法处理空值键。
但是我们可以用生成的 CASE WHEN 语句替换它并使用 isNull
而不是 == None
:
from pyspark.sql import functions as F
from functools import reduce
d = spark.sparkContext.parallelize([('A', ), ('B', ), (None, ), ('INVALID', )]).toDF(['key'])
mapping = {'A': '1', 'B': '2', None: 'empty'}
map_tuples = list(mapping.items())
def email_eq_null_safe(key):
if key is None:
return F.col('key').isNull()
else:
return F.col('key') == key
'''
F.when(
F.col('key') == key1,
value1
).when(
F.col('key') == key2,
value2
)....
'''
whens = reduce(
lambda prev, nxt: prev.when(email_eq_null_safe(nxt[0]), nxt[1]),
map_tuples[1:],
F.when(email_eq_null_safe(map_tuples[0][0]), map_tuples[0][1])
)
d.select(
'key',
whens.alias('value')
).show()
+-------+-----+
| key|value|
+-------+-----+
| A| 1|
| B| 2|
| null|empty|
|INVALID| null|
+-------+-----+