我正在尝试将数据框架转换为rdd,以便将映射(带有键值对)爆炸为其他行。
Info = sqlContext.read.format("csv").
option("delimiter","t").
option("header", "True").
option("inferSchema", "True").
load("file.tsv")
DataFrame[ID: int, Date: timestamp, Comments: string]
DF中的示例数据如下。
ID Date Comments
1 2015-04-30 22:42:49.0 {44:'xxxxxxxx'}
2 2015-05-06 08:53:18.0 {83:'aaaaaaaaa', 175:'bbbbbbbbb', 86:'cccccccccc'}
3 2015-05-13 19:57:13.0 {487:'yyyyyyyyyyy', 48:'zzzzzzzzzzzzzz'}
现在,评论已经在键值配对中,但是它被读为字符串,我想将每个键值对爆炸成不同的行。例如
Expected OUTPUT
ID Date Comments
1 2015-04-30 22:42:49.0 {44:'xxxxxxxx'}
2 2015-05-06 08:53:18.0 {83:'aaaaaaaaa'}
2 2015-05-06 08:53:18.0 {175:'bbbbbbbbb'}
2 2015-05-06 08:53:18.0 {86:'cccccccccc'}
3 2015-05-13 19:57:13.0 {487:'yyyyyyyyyyy'}
3 2015-05-13 19:57:13.0 {48:'zzzzzzzzzzzzzz'}
我试图将其转换为RDD并应用flatMap
,但没有成功。我希望将所有列返回。我已经尝试过:
Info.rdd.flatMap(lambda x: (x['SearchParams'].split(':'), x))
使用DataFrame API中提供的split
和explode
功能将数据拆分为"。要创建地图,您要使用create_map
。此功能期望两个单独的列作为输入。以下是创建两个临时列的示例(再次使用split
):
Info.withColumn("Comments", explode(split(col("Comments"), ", ")))
.withColumn("key", split(col("Comments"), ":").getItem(0))
.withColumn("value", split(col("Comments"), ":").getItem(1))
.withColumn("Comments", create_map(col("key"), col("value")))
应该可以这样简短(未测试):
Info.withColumn("Comments", split(explode(split(col("Comments), ", ")), ":")
.withColumn("Comments", create_map(col("Comments".getItem(0)), col("Comments").getItem(1)))