Scala版本:2.11.12
Spark版本:2.4.0
emr-5.23.0
运行以下命令创建AmazonEMR集群时,获得以下内容
spark-submit --class etl.SparkDataProcessor --master yarn --deploy-mode cluster --conf spark.yarn.appMasterEnv.ETL_NAME=foo --conf spark.yarn.appMasterEnv.ETL_SPARK_MASTER=yarn --conf spark.yarn.appMasterEnv.ETL_AWS_ACCESS_KEY_ID=123 --conf spark.yarn.appMasterEnv.ETL_AWS_SECRET_ACCESS_KEY=abc MY-Tool.jar
异常
ERROR ApplicationMaster: Uncaught exception:
java.lang.IllegalStateException: User did not initialize spark context!
at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:485)
at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:305)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:245)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:245)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:773)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:772)
at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:244)
at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:797)
at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
我如何创建我的火花会话(其中sparkMaster=纱线(
lazy val spark: SparkSession = {
val logger: Logger = Logger.getLogger("etl");
val sparkAppName = EnvConfig.ETL_NAME
val sparkMaster = EnvConfig.ETL_SPARK_MASTER
val sparkInstance = SparkSession
.builder()
.appName(sparkAppName)
.master(sparkMaster)
.getOrCreate()
val hadoopConf = sparkInstance.sparkContext.hadoopConfiguration
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoopConf.set("fs.s3a.access.key", EnvConfig.ETL_AWS_ACCESS_KEY_ID)
hadoopConf.set("fs.s3a.secret.key", EnvConfig.ETL_AWS_SECRET_ACCESS_KEY)
logger.info("Created My SparkSession")
logger.info(s"Spark Application Name: $sparkAppName")
logger.info(s"Spark Master: $sparkMaster")
sparkInstance
}
更新:
我确定,由于应用程序逻辑的原因,在某些情况下,我们没有初始化spark会话。正因为如此,当集群终止时,它似乎也试图对会话做一些事情(也许关闭它(,因此失败了。现在我已经解决了这个问题,应用程序运行但从未真正完成。目前,在集群模式下运行时,它似乎挂在一个涉及火花的特定部分:
val data: DataFrame = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(s"s3://$csvPath/$fileKey")
.toDF()
20/03/16 18:38:35 INFO Client: Application report for application_1584324418613_0031 (state: RUNNING)
AFAIKEnvConfig.ETL_AWS_ACCESS_KEY_ID
和ETL_AWS_SECRET_ACCESS_KEY
未被填充,因此不能用null或空值初始化sparksession。尝试打印和调试这些值。
同时从--conf spark.xxx 读取属性
应该像这个例子一样。我希望你关注这个。。。
spark.sparkContext.getConf.getOption("spark. ETL_AWS_ACCESS_KEY_ID")
一旦您检查了这一点,这个示例方式应该会起作用。。。
/**
* Hadoop-AWS Configuration
*/
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.proxy.host", proxyHost)
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.proxy.port", proxyPort)
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.server-side-encryption-algorithm", "AES256")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3n.server-side-encryption-algorithm", "AES256")
sparkSession.sparkContext.hadoopConfiguration.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem
另一件事是,使用
可以使用--master yarn
或--master local[*]
代替
-conf spark.yarn.appMasterEnv.ETL_SPARK_MASTER=yarn
更新:
--conf spark.driver.port=20002
可以解决这个问题。其中20002是轨道端口。看起来它等待特定端口一段时间,重试一段时间后失败,但出现了异常。
我是从这里浏览Sparks应用程序主代码得到这个想法的
和comment这有点麻烦,但我们需要等到执行用户类的线程设置了spark.driver.port属性。
你可以试试这个,然后告诉我。
进一步阅读:Apache Spark:如何更改Spark驱动程序侦听的端口
在我的情况下(解决应用程序问题后(,在集群模式下部署时,我需要包括核心AND任务节点类型。
我建议在中启动应用程序时初始化SparkSession