我试图在yarn-cluster模式下执行我的Spark作业。它在独立和纱线客户端模式下工作良好,但在集群模式下,它在pairs.saveAsTextFile(output);
上抛出FileAlreadyExistsException
这是我的工作实现:
SparkConf sparkConf = new SparkConf().setAppName("LIM Spark PolygonFilter").setMaster(master);
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
Broadcast<IGeometry> boundryBroadCaster = broadcastBoundry(javaSparkContext, boundaryPath);
JavaRDD<String> file = javaSparkContext.textFile(input);//.cache();
JavaRDD<String> pairs = file.filter(new FilterFunction(params , boundryBroadCaster));
pairs.saveAsTextFile(output);
根据日志,它在一个节点上工作,之后它开始为所有节点抛出这个异常。
有人能帮我修理一下吗?? 谢谢。
禁用输出规范后,它正在工作:(spark.hadoop.validateOutputSpecs=true)。
这看起来像是Hadoop的一个功能,通知用户指定的输出目录已经有一些数据,如果您将在下次迭代此作业时使用相同的目录,则该目录将丢失。
在我的应用程序中,我为job提供了一个额外的参数-overwrite,我们像这样使用它:
spark.hadoop.validateOutputSpecs = value of overwrite flag
如果用户想覆盖现有的输出,他可以提供"overwrite"标志的值为true。