我在写一个函数
- 以RDD作为输入
- 拆分逗号分隔的值
- 然后将每一行转换为标记点对象
-
最终获取输出作为数据帧
code: def parse_points(raw_rdd): cleaned_rdd = raw_rdd.map(lambda line: line.split(",")) new_df = cleaned_rdd.map(lambda line:LabeledPoint(line[0],[line[1:]])).toDF() return new_df output = parse_points(input_rdd)
到此为止,如果我运行代码,没有错误,它工作正常。
但是在添加行时,
output.take(5)
我得到错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 129.0 failed 1 times, most recent failure: Lost task 0.0 in s stage 129.0 (TID 152, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
Py4JJavaError Traceback (most recent call last)
<ipython-input-100-a68c448b64b0> in <module>()
20
21 output = parse_points(raw_rdd)
---> 22 print output.show()
请告诉我错误在哪里
在执行操作之前没有错误的原因:
output.take(5)
是由于火花的性质,即懒惰。例如,在执行"take(5)"操作之前,spark中没有执行任何操作。"
你的代码中有一些问题,我认为你失败的原因是[line[1:]]中额外的"["one_answers"]"
所以你需要在[line[1:]]中删除额外的"["one_answers"]"(只保留[1:]行)
您可能需要解决的另一个问题是缺少数据框架模式。
。取代"toDF()"与"toDF(["特性"、"标签"])"这将给数据框架一个模式。
尝试:
>>> raw_rdd.map(lambda line: line.split(","))
... .map(lambda line:LabeledPoint(line[0], [float(x) for x in line[1:]])