在每次Spark作业运行中,我都会从HDFS读取大约700 GB的数据。我的工作读取这些数据,过滤大约60%的数据,对其进行分区:
val toBePublishedSignals = hiveCtx.sql("some query")
toBePublishedSignals.write.partitionBy("A", "B", "C").format(JSON_DATA_FORMAT)
.mode(SaveMode.Append).save(getS3DataPath())
val metadataFiles = hiveCtx.sql("some query")
metadataFiles.distinct().write.partitionBy("A", "C").format(JSON_DATA_FORMAT)
.mode(SaveMode.Append).save(getS3MetadataPath())
工作被司机耽误了。我把司机甩了一下,它卡在了下面:
at com.a9.trafficvalidation.hadoop.utils.fs.AWSS3FileSystem.retrieveObjectListing(AWSS3FileSystem.java:366)
at com.a9.trafficvalidation.hadoop.utils.fs.AWSS3FileSystem.getFileStatus(AWSS3FileSystem.java:335)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:402)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:362)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:334)
at org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:222)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:144)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60)
- locked <0x00000002d9b98288> (a org.apache.spark.sql.execution.command.ExecutedCommandExec)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
- locked <0x00000002d9b98330> (a org.apache.spark.sql.execution.QueryExecution)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:510)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194)
看起来S3的上市是一个很大的瓶颈。这项工作被困了好几个小时,没有完成。
或者有没有我可以像这样存储路径S3://bucket/A=dvfw/B=wfwef在数据帧中,按路径重新划分数据帧,然后只按"C"进行分区并写入路径?我不知道如何在不遍历整个数据帧并一次性保存DF的情况下做到这一点。
从早上开始就在上面!寻求一些关于如何处理/避免这种情况的建议!
TIA!
据我所知,这种情况发生在以追加模式编写时,并且在最终位置有很多分区。Spark检索现有的分区,可能还有模式。我建议两种可能的解决方案。
1) 如果每次执行没有很多分区要写,可以尝试以下操作:
// Prepare data and cache it
// There are a lot of data, so a part of it most probably will be written to disk
val toBePublishedSignals = hiveCtx.sql("some query").persist(StorageLevel.MEMORY_AND_DISK_SER_2)
// Get all unique combinations of partitions columns
val partitions = toBePublishedSignals.selectExpr("A", "B", "C").distinct().collect()
// Write each combination as a separate partition
partitions.foreach { p =>
val a = p.getAs[String]("A"))
val b = p.getAs[String]("B"))
val c = p.getAs[String]("C"))
val path = new Path(new Path(new Path(getS3DataPath(), s"A=$a"), s"B=$b"), s"C=$c")
toBePublishedSignals.filter(col("A") === a && col("B") === b && col("C") === c)
.write.format(JSON_DATA_FORMAT).mode(SaveMode.Append).save(path.toUri.toString)
}
元数据也是如此。
// Prepare data and cache it
val metadataFiles = hiveCtx.sql("some query").distinct().persist(StorageLevel.MEMORY_AND_DISK_SER_2)
// Get all unique combinations of partitions columns
val partitions = metadataFiles.selectExpr("A", "C").distinct().collect()
// Write each combination as a separate partition
partitions.foreach { p =>
val a = p.getAs[String]("A"))
val c = p.getAs[String]("C"))
val path = new Path(new Path(getS3MetadataPath(), s"A=$a"), s"C=$c")
metadataFiles.filter(col("A") === a && col("C") === c)
.write.format(JSON_DATA_FORMAT).mode(SaveMode.Append).save(path.toUri.toString)
}
我不知道分区列的数据类型,所以在我的例子中它们是字符串。上面的代码只是一个例子。使用折叠操作和从DataFrame模式检索数据类型,可以将其重写为更通用的方式。
2) 作为一种选择,可以从现有数据中要接触的分区读取记录,并与传入记录联合。让我们假设A/B/C
相应地是year/month/day
。我们有一些新的数据,df
数据帧是数据处理的结果。经过处理我们的以下数据
2018|10|11|f1|f2|f3
2018|11|14|f1|f2|f3
2018|11|15|f1|f2|f3
这意味着我们需要从包含最终数据的位置(getS3DataPath()
返回的位置)读取分区
year=2018/month=10/day=11
year=2018/month=11/day=14
year=2018/month=11/day=15
要做到这一点,我们需要创建一个过滤函数,它是其他几个函数的组合。我们使用reduce来使用以下逻辑组合它们:
year=2018 && month=10 && day=11
or
year=2018 && month=11 && day=14
or
year=2018 && month=11 && day=15
// Do processing
val toBePublishedSignalsNew = hiveCtx.sql("some query")
// Create a filter function for querying existing data
val partitions = toBePublishedSignalsNew.selectExpr("A", "B", "C").distinct().collect()
val filterFunction = partitions.map { partitionValues =>
partitionColumns.map { columnName =>
(input: Row) => input.getAs[String](columnName) == partitionValues.getAs[String](columnName)
}.reduceOption((f1, f2) => (row: Row) => f1(row) && f2(row)).getOrElse((_: Row) => false)
}.reduceOption((f1, f2) => (row: Row) => f1(row) || f2(row)).getOrElse((_: Row) => false)
// Read existing partitions that match incoming data
val toBePublishedSignalsExisting = sparkSession.read.json(getS3DataPath()).filter(filterFunction)
// Combine new and existing data and write the result to a temporary location
toBePublishedSignalsExisting
.union(toBePublishedSignalsNew)
.write
.partitionBy("A", "B", "C")
.format(JSON_DATA_FORMAT)
.mode(SaveMode.Overwrite)
.save(temporaryLocationS3)
之后,您需要将getS3DataPath()
返回的位置中的分区替换为temporaryLocationS3
中的分区。只有当分区列包含字符串时,上面的示例才会起作用。如果它们有其他数据类型,您可能需要为筛选函数添加一些映射。例如,对于IntegerType
,它看起来像
(input: Row) => input.getAs[Int](columnName) == partitionValues.getAs[Int](columnName)