假设我有这样的数据集:
Name | Subject | Y1 | Y2
A | math | 1998| 2000
B | | 1996| 1999
| science | 2004| 2005
我想分割这个数据集的行,这样Y2列将被消除,如:
Name | Subject | Y1
A | math | 1998
A | math | 1999
A | math | 2000
B | | 1996
B | | 1997
B | | 1998
B | | 1999
| science | 2004
| science | 2005
有人能提点建议吗?我希望我的问题已经说清楚了。
我认为您只需要创建一个udf
来创建范围。然后,您可以使用爆炸来创建必要的行:
val createRange = udf { (yearFrom: Int, yearTo: Int) =>
(yearFrom to yearTo).toList
}
df.select($"Name", $"Subject", functions.explode(createRange($"Y1", $"Y2"))).show()
编辑:这段代码的python版本应该是:
from pyspark.sql import Row
from pyspark.sql.functions import udf, explode
from pyspark.sql.types import IntegerType
createRange=udf( lambda (yearFrom, yearTo): list(range(yearFrom, yearTo)), IntegerType())
df.select($"Name", $"Subject", explode(createRange($"Y1", $"Y2"))).show()
我在pyspark中测试了这段代码,它按预期工作:
data= sc.parallelize([["A","math",1998,2000],["B","",1996,1999],["","science",2004,2005]]
data.map(lambda reg: ((reg[0],reg[1]),(range(reg[2],reg[3]+1))) )
.flatMapValues(lambda reg: reg).collect()
更详细地说,您需要将输入数据转换为(key,value)形式的一对RDD,其中key由前两个字段组成,因为结果将被平化,并保持flatMapValues
中键的完整性。要映射的值被构造为从Y1
到Y2
的range
。所有这些都在第一个map
中完成。
flatMapValues
将返回与key
相关联的range
中的每个值。
输出如下所示:
[(('A', 'math'), 1998),
(('A', 'math'), 1999),
(('A', 'math'), 2000),
(('B', ''), 1996),
(('B', ''), 1997),
(('B', ''), 1998),
(('B', ''), 1999),
(('', 'science'), 2004),
(('', 'science'), 2005)]
您可以这样实现:
val resultantDF= df.rdd.flatMap{row =>
val rangeInitial = row.getInt(2)
val rangeEnd = row.getInt(3)
val array = rangeInitial to rangeEnd
(List.fill(array.size)(row.getString(0)),List.fill(array.size)(row.getString(1)),array).zipped.toList
}.toDF("Name","Subject","Y1")
resultantDF.show()
你可以使用spark select很容易地得到你想要的数据帧,甚至在RDD。
Dataset<Row> sqlDF = spark.sql("SELECT Name,Subject,Y1 FROM tableName");
如果你从已经存在的数据帧开始,比如用户,你可以使用这样的东西:
resultDF = usersDF.select("Name","Subject","Y1");