我认为AWS Glue在写入镶木地板输出失败后内存不足。。。
调用o126镶木地板时出错。由于阶段原因,作业中止失败:阶段9.0中的任务82失败4次,最近一次失败:在阶段9.0中丢失任务82.3(TID 17400,ip-172-31-8-70.ap-东南-1.compute.internal,executor 1):ExecutorLostFailure(由于某个正在运行的tasks)原因:容器因超过内存限制而被YARN杀死。已使用5.5 GB物理内存中的5.5 GB。考虑提升spark.yarn.exexecute.memoryOverhead.
下方的更完整日志
Traceback(上次调用):文件"script_2019-01-29-06-53-53.py",第71行,在.parquet("s3://…/flights2")文件"/mnt/yarn/usercache/root/appcache/application_1548744646207_0001/container_1548744646207_0001_01_000001/pyspark.zip/pyspark/sql/redwriter.py",第691行,在镶木地板文件中"/mnt/yarn/usercache/root/appcache/application_1548744646207_0001/container_1548744646207_0001_01_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py",第1133行,调用文件"/mnt/yarn/usercache/root/appcache/application_1548744646207_0001/container_1548744646207_0001_01_000001/pyspark.zip/pyspark/sql/utils.py",第63行,deco文件"/mnt/yarn/usercache/root/appcache/application_1548744646207_0001/container_1548744646207_0001_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py",第319行,在get_return_value py4j.protocol.Py4JJava错误:错误呼叫o126镶木地板时发生。:org.apache.spark.SparkException:作业已中止。在org.apache.spark.sql.exexecution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:213)在org.apache.spark.sql.exexecution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)在org.apache.spark.sql.exexecution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)在org.apache.spark.sql.exexecution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)在org.apache.spark.sql.exexecution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:166)在org.apache.spark.sql.exexecution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHardoopFsRelationCommand.scala:145)在org.apache.spark.sql.exexecution.commandExec.sideEffectResult$lzycompute(command.scala:58)在org.apache.spark.sql.exexecution.commandExec.sideEffectResult(command.scala:56)在org.apache.spark.sql.exexecution.commandExec.doExecute(command.scala:74)在org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)在org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)在org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)在org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperation Scope.scala:151)在org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)在org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)在org.apache.spark.sql.exexecution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)在org.apache.spark.sql.exexecution.QueryExecution.toRdd(QueryExecution.scala:92)在org.apache.spark.sql.exexecution.datasources.DataSource.writeInFileFormat(DataSource.scala:435)在org.apache.spark.sql.exexecution.datasources.DataSource.write(DataSource.scala:471)在org.apache.spark.sql.exexecution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50)在org.apache.spark.sql.exexecution.commandExec.sideEffectResult$lzycompute(command.scala:58)在org.apache.spark.sql.exexecution.commandExec.sideEffectResult(command.scala:56)在org.apache.spark.sql.exexecution.commandExec.doExecute(command.scala:74)在org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)在org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)在org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)在org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperation Scope.scala:151)在org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)在org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)在org.apache.spark.sql.exexecution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)在org.apache.spark.sql.exexecution.QueryExecution.toRdd(QueryExecution.scala:92)在org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609)在org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)在org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)在org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:508)位于的sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)位于java.lang.reflect.Method.ioke(Method.java:498)位于的py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)位于的py4j.reflection.ReflectionEngine.reinvoke(ReflectionEngine.java:357)py4j。Gateway.invoke(Gateway.java:280)位于py4j.commands.AbstractCommand.invokeMethod(AbstractCmd.java:132)在py4j.commands.CallCommand.execute(CallCommand.java:79)py4j。GatewayConnection.run(GatewayConnection.java:214)位于java.lang.Thread.run(Thread.java:748)原因:org.apache.spark.SparkException:由于阶段失败,作业中止:9.0阶段的任务82失败4次,最近一次失败:丢失任务82.3在9.0阶段(TID 17400,ip-172-31-8-70.ap-东南-1.compute.internal,executor 1):ExecutorLostFailure(由于某个正在运行的tasks)原因:容器因超过内存限制而被YARN杀死。已使用5.5 GB物理内存中的5.5 GB。考虑提升spark.yarn.exexecute.memoryOverhead。驱动程序堆栈:在org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGSchedler$$failJobAndIndependentStages(DAGScheuler.scala:1517)在org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)在org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)在scala.collection.mutable.RizableArray$class.foreach(ResizableArray.scala:59)位于scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)在org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)在org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)在org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)在scala。Option.foreach(Option.scala:257)org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheudler.scala:1732)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheudler.scala:1687)在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheudler.scala:1676)在org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)网址:org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)org.apache.spark.sql.exexecution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:186)
失败的行似乎是:
.parquet("s3://pinfare-glue/flights2")
我的Glue作业如下所示:有什么方法可以解决这个问题吗?我正在考虑从S3中删除一些文件夹,以便Glue批量处理数据。。。但这是不可扩展的。。。
另一件事是,也许我为每个日期创建一个数据帧,并在循环中编写这些较小的分区。。。但这会很慢吗?
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import regexp_replace, to_timestamp
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
print(">>> READING ...")
inputGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "flights", transformation_ctx="inputGDF")
# inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://pinfare-actuary-storage-csv"], "recurse": True}, format = "csv", format_options = {"withHeader": True}, transformation_ctx="inputGDF")
print(">>> DONE READ ...")
flightsDf = inputGDF.toDF()
if bool(flightsDf.head(1)):
df = flightsDf
.drop("createdat")
.drop("updatedat")
.withColumn("agent", flightsDf["agent"].cast("int"))
.withColumn("querydestinationplace", flightsDf["querydestinationplace"].cast("int"))
.withColumn("querydatetime", regexp_replace(flightsDf["querydatetime"], "-", "").cast("int"))
.withColumn("queryoutbounddate", regexp_replace(flightsDf["queryoutbounddate"], "-", "").cast("int"))
.withColumn("queryinbounddate", regexp_replace(flightsDf["queryinbounddate"], "-", "").cast("int"))
.withColumn("outdeparture", to_timestamp(flightsDf["outdeparture"], "yyyy-MM-ddTHH:mm:ss"))
.withColumn("outarrival", to_timestamp(flightsDf["outarrival"], "yyyy-MM-ddTHH:mm:ss"))
.withColumn("indeparture", to_timestamp(flightsDf["indeparture"], "yyyy-MM-ddTHH:mm:ss"))
.withColumn("inarrival", to_timestamp(flightsDf["inarrival"], "yyyy-MM-ddTHH:mm:ss"))
df.createOrReplaceTempView("flights")
airportsGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "airports")
airportsDF = airportsGDF.toDF()
airportsDF.createOrReplaceTempView("airports")
agentsGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "agents")
agentsRawDF = agentsGDF.toDF()
agentsRawDF.createOrReplaceTempView("agents_raw")
agentsDF = spark.sql("""
SELECT id, name, type FROM agents_raw
WHERE type IN ('Airline', 'TravelAgent')
""")
agentsDF.createOrReplaceTempView("agents")
finalDf = spark.sql("""
SELECT /*+ BROADCAST(agents) */ /*+ BROADCAST(airports) */
f.*, countryName, cityName, airportName, a.name AS agentName,
CONCAT(f.outboundlegid, '-', f.inboundlegid, '-', f.agent) AS key
FROM flights f
LEFT JOIN agents a
ON f.agent = a.id
LEFT JOIN airports p
ON f.querydestinationplace = p.airportId
""")
print(">>> DONE PROCESS FLIGHTS")
print("Writing ...")
finalDf
.write
.mode("append")
.partitionBy(["countryName", "querydatetime"])
.parquet("s3://.../flights2")
else:
print("Nothing to write ...")
job.commit()
import boto3
glue_client = boto3.client('glue', region_name='ap-southeast-1')
glue_client.start_crawler(Name='...')
如果ur LEFT JOIN有1:N映射,它将在DF中产生指数级的大行,这可能会导致OOM。在胶水中,没有设置您自己的基础设施配置的规定,例如每个vCPU 64GB内存。如果是这种情况,请首先尝试使用spark.yarn.execule.memoryOverhead选项或/并增加DPU。否则,您必须使用下推谓词对数据进行桶处理,然后在所有数据上运行for循环