Pyspark - saveAsTable 抛出索引错误,而 show() 数据帧运行良好



正在尝试将数据帧另存为表。

我能够创建数据帧并创建临时表。但是使用 saveAsTable(( 保存相同的数据帧会引发索引错误。

我检查了数据帧的架构,这似乎没问题。

不确定问题是什么,除了索引错误之外,无法从日志中获取任何内容。

>

>>> sqlContext.sql('select * from bx_users limit 2').show()
+-------+--------------------+----+
|User-ID|            Location| Age|
+-------+--------------------+----+
|      1|  nyc, new york, usa|NULL|
|      2|stockton, califor...|  18|
+-------+--------------------+----+
>>> bx_users_df.show(2)
+-------+--------------------+----+
|User-ID|            Location| Age|
+-------+--------------------+----+
|      1|  nyc, new york, usa|NULL|
|      2|stockton, califor...|  18|
+-------+--------------------+----+
only showing top 2 rows
>>> bx_users_df.printSchema()
root
|-- User-ID: string (nullable = true)
|-- Location: string (nullable = true)
|-- Age: string (nullable = true)
>>> bx_users_df.write.format('parquet').mode('overwrite').saveAsTable('bx_user')
18/05/19 00:12:36 ERROR util.Utils: Aborting task
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "<stdin>", line 1, in <lambda>
IndexError: list index out of range
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129)
at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:261)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:260)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:260)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1279)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:266)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
18/05/19 00:12:37 ERROR datasources.DefaultWriterContainer: Task attempt attempt_201805190012_0222_m_000000_0 aborted.
18/05/19 00:12:37 ERROR executor.Executor: Exception in task 0.0 in stage 222.0 (TID 245)
org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:269)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Spark最大的问题之一是操作是懒惰的,即使调用操作,Spark也会尝试尽可能少地做工作。

例如,show将尝试仅评估 20 个前行 - 如果管道中没有广泛的转换,它将不会处理所有数据。这就是为什么show可以工作,而saveAsTable失败。

您的代码在 lambda 表达式中失败:

File "<stdin>", line 1, in <lambda>

结果:

IndexError: list index out of range

这几乎总是用户错误,因为缺乏对格式错误的数据的处理。我怀疑您的代码包含类似于

(sc.textFile(...)
.map(lambda line: line.split(...)
.map(lambda xs: (xs[0], xs[1], xs[3]))) 

当行不包含预期数量的参数时,您的代码将失败。

通常更喜欢处理可能异常的标准函数,或使用其他方法来避免故障。

如果它只是解析分隔数据文件(CSV,TSV(,请使用Spark CSV阅读器。

最新更新