我正在尝试将Spark RDD转换为数据帧。虽然RDD在我将其转换为数据帧时很好,但出现索引超出范围的错误。
alarms = sc.textFile("hdfs://nanalyticsedge.com:8020/hdp/oneday.csv")
alarms = alarms.map(lambda line: line.split(","))
header = alarms.first()
alarms = alarms.filter(lambda line:line != header)
alarms = alarms.filter(lambda line: len(line)>1)
alarms_df = alarms.map(lambda line: Row(IDENTIFIER=line[0],SERIAL=line[1],NODE=line[2],NODEALIAS=line[3],MANAGER=line[4],AGENT=line[5],ALERTGROUP=line[6],ALERTKEY=line[7],SEVERITY=line[8],SUMMARY=line[9])).toDF()
alarms_df.take(100)
在这里 alarms.count() 工作正常,而 alarms_df.count() 给出的索引超出范围。它是从预言机导出的数据
从@Dikei的回答中我发现:
alarms = alarms.filter(lambda line: len(line) == 10)
给了我正确的数据帧,但为什么数据帧在数据库导出时会丢失,我该如何防止它?
我认为
问题是您的某些行不包含 10 个元素。很容易检查,尝试更改
alarms = alarms.filter(lambda line: len(line)>1)
自
alarms = alarms.filter(lambda line: len(line) == 10)
没有提到索引的数据。尝试类似的东西,如果数组有超过 9 个打印第 10 个元素
myData.foreach { x => if(x.size.!=(9)){println(x(10))} }