我有一个csv文件;它在pyspark中转换为DataFrame(df);经过一些改造;我想在df中添加一列;应该是简单的行号(从0或1到N)。
我将df转换为rdd并使用"zipwithindex"。我将得到的rdd转换回df。这种方法是有效的,但它生成了250k个任务,并且在执行过程中花费了大量时间。我想知道是否有其他的方法来做到这一点,需要更少的运行时间。
下面是我的代码片段;我正在处理的csv文件是大的;包含数十亿行。
debug_csv_rdd = (sc.textFile("debug.csv")
.filter(lambda x: x.find('header') == -1)
.map(lambda x : x.replace("NULL","0")).map(lambda p: p.split(','))
.map(lambda x:Row(c1=int(x[0]),c2=int(x[1]),c3=int(x[2]),c4=int(x[3]))))
debug_csv_df = sqlContext.createDataFrame(debug_csv_rdd)
debug_csv_df.registerTempTable("debug_csv_table")
sqlContext.cacheTable("debug_csv_table")
r0 = sqlContext.sql("SELECT c2 FROM debug_csv_table WHERE c1 = 'str'")
r0.registerTempTable("r0_table")
r0_1 = (r0.flatMap(lambda x:x)
.zipWithIndex()
.map(lambda x: Row(c1=x[0],id=int(x[1]))))
r0_df=sqlContext.createDataFrame(r0_2)
r0_df.show(10)
也可以使用sql包中的函数。它将生成一个唯一的id,但是它不是顺序的,因为它取决于分区的数量。我相信它在Spark 1.5 +中是可用的
from pyspark.sql.functions import monotonicallyIncreasingId
# This will return a new DF with all the columns + id
res = df.withColumn("id", monotonicallyIncreasingId())
编辑:19/1/2017
@Sean评论
使用monotonically_increasing_id()
代替Spark 1.6和