我在使用火花簇处理蜂巢表时遇到了困难(Yarn 已就位(。我需要连接一些 7 个表,然后替换一些空值并将结果写回 Hive 最终 DF。
我使用SparkSQL(Scala(,首先创建6个不同的数据帧,然后联接所有数据帧并将结果写回Hive Table。
五分钟后,我的代码抛出以下错误,我知道这是由于没有正确设置资源分配。
19/10/13 06:46:53 ERROR client.TransportResponseHandler: Still have 2 requests outstanding when connection from /100.66.0.1:36467 is closed
19/10/13 06:46:53 ERROR cluster.YarnScheduler: Lost executor 401 on aaaa-bd10.pq.internal.myfove.com: Container container_e33_1570683426425_4555_01_000414 exited from explicit termination request.
19/10/13 06:47:02 ERROR cluster.YarnScheduler: Lost executor 391 on aaaa-bd10.pq.internal.myfove.com: Container marked as failed: container_e33_1570683426425_4555_01_000403 on host: aaaa-bd10.pq.internal.myfove.com. Exit status: 143. Diagnostics: Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
Killed by external signal
我的硬件规格
HostName Memory in GB CPU Memory for Yarn CPU For Yarn
Node 1 126 32 90 26
Node 2 126 32 90 26
Node 3 126 32 90 26
Node 4 126 32 90 26
如何正确设置下面的变量,以便我的代码不会抛出错误(容器标记为失败 - 被请求 143 杀死(?
我正在尝试不同的配置,但没有任何帮助。
val spark = (SparkSession.builder
.appName("Final Table")
.config("spark.driver.memory", "5g")
.config("spark.executor.memory", "15g")
.config("spark.dynamicAllocation.maxExecutors","6")
.config("spark.executor.cores", "5")
.enableHiveSupport()
.getOrCreate())
DF1 = spark.sqk("Select * from table_1") //1.4 million records and 10 var
DF2 = spark.sqk("Select * from table_2") //1.4 million records and 3000
DF3 = spark.sqk("Select * from table_3") //1.4 million records and 300
DF4 = spark.sqk("Select * from table_4") //1.4 million records and 600
DF5 = spark.sqk("Select * from table_5") //1.4 million records and 150
DF6 = spark.sqk("Select * from table_6") //1.4 million records and 2
DF7 = spark.sqk("Select * from table_7") //1.4 million records and 12
val joinDF1 = df1.join(df2, df1("number") === df2("number"), "left_outer").drop(df2("number"))
val joinDF2 = joinDF1.join(df3,joinDF1("number") === df3("number"), "left_outer").drop(df3("number"))
val joinDF3 = joinDF2.join(df4,joinDF2("number") === df4("number"), "left_outer").drop(df4("number"))
val joinDF4 = joinDF3.join(df5,joinDF3("number") === df5("number"), "left_outer").drop(df5("number"))
val joinDF5 = joinDF4.join(df6,joinDF4("number") === df6("number"), "left_outer").drop(df6("number")).drop("Dt")
val joinDF6 = joinDF5.join(df7,joinDF5("number") === df7("number"), "left_outer").drop(df7("number")).drop("Dt")
joinDF6.createOrReplaceTempView("joinDF6")
spark.sql("create table hive table as select * from joinDF6")
如果您使用的是Ambari,请在Ambari中检查您的yarn.nodemanager.log-dirs。如果没有,请尝试找出此属性,如果它指向空间非常小的目录,请将其更改为其他空间较大的目录。
在运行任务时,cotainers创建存储在yarn.nodemanger.log-dirs位置的块,如果不足以存储块容器开始失败。