我有一个带有计数的Shuffle异常,我需要帮助,这是错误:
21/12/17 11:01:47 INFO DAGScheduler: Job 20 failed: count at /home/spark/jobs.zip/tca/jobs/participation_on_volume_metric/participation_on_volume_metric_job.py:388, took 1283.109346 s
21/12/17 11:01:47 INFO DAGScheduler: Resubmitting ShuffleMapStage 130 (leftOuterJoin at /home/spark/jobs.zip/tca/jobs/participation_on_volume_metric/participation_on_volume_metric_job.py:261) and ShuffleMapStage 132 (leftOuterJoin at /home/spark/jobs.zip/tca/jobs/participation_on_volume_metric/participation_on_volume_metric_job.py:277) due to fetch failure
Traceback (most recent call last):
File "/home/spark/pywrap.py", line 53, in <module>
app.run(main=main, argv=[sys.argv[0]] + unparsed)
File "/home/spark/jobs.zip/tca/platform/app.py", line 20, in run
File "/home/spark/libs.zip/absl/app.py", line 300, in run
File "/home/spark/libs.zip/absl/app.py", line 251, in _run_main
File "/home/spark/pywrap.py", line 32, in main
job.analyze(spark_context, arguments, {'config': job_conf})
File "/home/spark/jobs.zip/tca/jobs/participation_on_volume_metric/participation_on_volume_metric_job.py", line 388, in analyze
File "/home/spark/libs.zip/pyspark/rdd.py", line 1055, in count
File "/home/spark/libs.zip/pyspark/rdd.py", line 1046, in sum
File "/home/spark/libs.zip/pyspark/rdd.py", line 917, in fold
File "/home/spark/libs.zip/pyspark/rdd.py", line 816, in collect
File "/home/spark/libs.zip/py4j/java_gateway.py", line 1257, in __call__
File "/home/spark/libs.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: A shuffle map stage with indeterminate output was failed and retried. However, Spark cannot rollback the ShuffleMapStage 132 to re-process the input data, and has to fail this job. Please eliminate the indeterminacy by checkpointing the RDD before repartition and try again.
它在rdd联合后的计数中失败:
orders_metric_rdd = sc.union([orders_with_mic_metric_rdd,
orders_with_childs_metric_rdd,
orders_without_childs_metric_rdd])
orders_metric_rdd.cache()
partitions = max(1, orders_metric_rdd.count())
partitions = min(partitions, max_partitions)
从错误日志中,您似乎需要添加一个检查点。你可以这样做。
orders_metric_rdd = sc.union([orders_with_mic_metric_rdd,
orders_with_childs_metric_rdd,
orders_without_childs_metric_rdd])
sc.setCheckpointDir("/tmp/checkpoint_dir/")
orders_metric_rdd.checkpoint()
partitions = max(1, orders_metric_rdd.count())
partitions = min(partitions, max_partitions)