我尝试将Spark应用程序部署到由纱线控制的Kerberized Hadoop群集中。Spark的版本为1.5.0-CDH5.5.2。
我正面临奇怪的例外,在闲置了10秒后停止SparkContext并初始化了新的SparkContext。
我尝试做类似于该开发人员的事情,并明确指定了HDFS Namenode地址,但这无济于事。
如果我根本不重置SparkContext,或者在此Spark上下命令执行后不到10秒内将一切正常运行,那么一切正常。
我该如何修复?
这是满足问题的最小情况:
package demo;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
public class App
{
public static void main( String[] args ) throws Exception {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("demo");
sparkConf.set("spark.yarn.access.namenodes", "hdfs://hdp:8020");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
int waiting_time = 10;
System.out.println("Waiting time: " + Integer.toString(waiting_time));
Thread.sleep(waiting_time * 1000);
jsc.stop();
jsc = new JavaSparkContext(sparkConf); // "Delegation token ..." exception here
}
}
堆栈跟踪升高时: https://gist.github.com/anonymous/18E15010010069B119AA0934D6F42726
Spark-Submit命令:
spark-submit --principal mp@LAGOON --keytab mp.keytab --master yarn-client --class demo.App demo.jar
问题是由这个问题引起的:https://issues.apache.org/jira/browse/browse/spark-15754
在Spark 1.6.2中已修复。
对我来说, relogin 每次解决问题
def main(args: Array[String]): Unit = {
val timer = new Timer()
timer.schedule(new TimerTask {
override def run(): Unit = {
UserGroupInformation.reset()
UserGroupInformation.loginUserFromKeytab("xxx", "/path/to/keytab")
val spark = SparkSession.builder()
.appName("TokenRenew")
.getOrCreate()
spark.read.csv("/tmp/test.txt").show
spark.stop()
}
}, 0, 1000 * 60)
}