对于小型 s3 输入文件 (~10GB),粘附 ETL 作业工作正常,但对于较大的数据集 (~200GB),作业失败。
添加一部分 ETL 代码。
# Converting Dynamic frame to dataframe
df = dropnullfields3.toDF()
# create new partition column
partitioned_dataframe = df.withColumn('part_date', df['timestamp_utc'].cast('date'))
# store the data in parquet format on s3
partitioned_dataframe.write.partitionBy(['part_date']).format("parquet").save(output_lg_partitioned_dir, mode="append")
作业执行了 4 小时并抛出错误。
文件"script_2017-11-23-15-07-32.py",第 49 行,在 partitioned_dataframe.write.partitionBy(['part_date']).format("parquet").save(output_lg_partitioned_dir, 模式="追加")文件 "/mnt/yarn/usercache/root/appcache/application_1511449472652_0001/container_1511449472652_0001_02_000001/pyspark.zip/pyspark/sql/readwriter.py", 第 550 行,在保存文件中 "/mnt/yarn/usercache/root/appcache/application_1511449472652_0001/container_1511449472652_0001_02_000001/py4j-0.10.4-src.zip/py4j/java_gateway.py", 第 1133 行,在调用文件中 "/mnt/yarn/usercache/root/appcache/application_1511449472652_0001/container_1511449472652_0001_02_000001/pyspark.zip/pyspark/sql/utils.py", 第 63 行,在装饰文件中 "/mnt/yarn/usercache/root/appcache/application_1511449472652_0001/container_1511449472652_0001_02_000001/py4j-0.10.4-src.zip/py4j/protocol.py", 第 319 行,在 get_return_value py4j.protocol.Py4JJava错误:错误 在调用 O172.Save 时发生。: org.apache.spark.SparkException: 作业中止。在 org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:147) 在 org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121) 在 org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121) 在 org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) 在 org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:121) 在 org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101) 在 org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) 在 org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(command.scala:56) 在 org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(command.scala:74) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 在 org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) 在 org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) 在 org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87) 在 org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87) 在 org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492) 在 org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215) 在 org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at PY4J。Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.command.CallCommand.execute(CallCommand.java:79) at PY4J。GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) 由以下原因引起: org.apache.spark.SparkException:作业由于阶段故障而中止: 3385 个任务的序列化结果的总大小 (1024.1 MB) 更大 比 spark.driver.maxResultSize (1024.0 MB) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) 在 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 在 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 在斯卡拉。Option.foreach(Option.scala:257) atorg.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:127) ...30 更多
日志结束类型:标准输出
如果您能提供任何指导来解决此问题,我将不胜感激。
您只能在上下文实例化期间设置可配置的选项,例如maxResultSize
,glue 为您提供上下文(从内存中您无法实例化新上下文)。我认为您将无法更改此属性的值。
如果向驱动程序收集超过指定大小的结果,通常会收到此错误。在这种情况下,您没有这样做,因此错误令人困惑。
看起来您正在生成 3385 个任务,这些任务可能与输入文件中的日期相关(3385 个日期,~9 年?您可以尝试批量写入此文件,例如
partitioned_dataframe = df.withColumn('part_date', df['timestamp_utc'].cast('date'))
for year in range(2000,2018):
partitioned_dataframe = partitioned_dateframe.where(year(part_date) = year)
partitioned_dataframe.write.partitionBy(['part_date'])
.format("parquet")
.save(output_lg_partitioned_dir, mode="append")
我还没有检查过这段代码;你至少需要导入pyspark.sql.functions.year
才能工作。
当我使用 Glue 完成数据处理时,我只是发现批处理工作比尝试成功完成大型数据集更有效。系统很好,但很难调试;大数据的稳定性来之不易。