我有一个csv文件,看起来像这样(它是从pyspark输出中保存的)
name_value
"[quality1 -> good, quality2 -> OK, quality3 -> bad]"
"[quality1 -> good, quality2 -> excellent]"
如何使用pyspark读取这个csv文件并将name_value列转换为映射类型?
类似下面的
data = {}
line = '[quality1 -> good, quality2 -> OK, quality3 -> bad]'
parts = line[1:-1].split(',')
for part in parts:
k,v = part.split('->')
data[k.strip()] = v.strip()
print(data)
输出
{'quality1': 'good', 'quality2': 'OK', 'quality3': 'bad'}
使用split
和regexp_replace
的组合将字符串切割成键值对。在第二步中,每个键值对首先被转换为结构,然后被转换为映射元素:
from pyspark.sql import functions as F
df=spark.read.option("header","true").csv(...)
df1=df.withColumn("name_value", F.split(F.regexp_replace("name_value", "[\[\]]", ""),","))
.withColumn("name_value", F.map_from_entries(F.expr("""transform(name_value, e -> (regexp_extract(e, '^(.*) ->',1),regexp_extract(e, '-> (.*)$',1)))""")))
df1
现在具有模式
root
|-- name_value: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
并且包含与原始csv文件相同的数据。