我想转换我的输入数据(XML文件)并产生3个不同的输出。
每个输出将采用镶木格式,并具有不同的模式/列数。
当前在我的解决方案中,数据存储在RDD[Row]
中,其中每一行属于三种类型之一,并且具有不同数量的字段。我现在正在做的是缓存RDD,然后对其进行过滤(使用字段告诉我记录类型),并使用以下方法保存数据:
var resultDF_1 = sqlContext.createDataFrame(filtered_data_1, schema_1)
resultDF_1.write.parquet(output_path_1)
...
// the same for filtered_data_2 and filtered_data_3
是否有任何方法可以更好地做到这一点,例如不要在内存中缓存整个数据?
在MapReduce中,我们有多输出类别类,我们可以这样做:
MultipleOutputs.addNamedOutput(job, "data_type_1", DataType1OutputFormat.class, Void.class, Group.class);
MultipleOutputs.addNamedOutput(job, "data_type_2", DataType2OutputFormat.class, Void.class, Group.class);
MultipleOutputs.addNamedOutput(job, "data_type_3", DataType3OutputFormat.class, Void.class, Group.class);
...
MultipleOutputs<Void, Group> mos = new MultipleOutputs<>(context);
mos.write("data_type_1", null, myRecordGroup1, filePath1);
mos.write("data_type_2", null, myRecordGroup2, filePath2);
...
我们恰好有一个问题,要重新征服:我们将1000秒的数据集读取为一个RDD,所有不同的模式(我们使用了嵌套的Map[String, Any]
),并想编写1000个数据集在各自的模式中进行不同的镶木隔板。全部都处于一个令人尴尬的平行火花阶段。
我们的最初方法确实做了缓存的技巧,但这意味着(a)1000通过缓存数据(b)遇到很多内存问题!
很长一段时间以来,我想绕过Spark提供的.parquet
方法,然后转到较低级别的库库,并将其包裹在一个不错的功能签名中。终于最近我们做到了!
代码在这里复制并粘贴所有这些代码太多了,因此我将粘贴代码的主要关键来解释其工作原理。我们打算在明年或两年内将此代码开源。
val successFiles: List[String] = successFilePaths(tableKeyToSchema, tableKeyToOutputKey, tableKeyToOutputKeyNprs)
// MUST happen first
info("Deleting success files")
successFiles.foreach(S3Utils.deleteObject(bucket, _))
if (saveMode == SaveMode.Overwrite) {
info("Deleting past files as in Overwrite mode")
parDeleteDirContents(bucket, allDirectories(tableKeyToOutputKey, tableKeyToOutputKeyNprs, partitions, continuallyRunStartTime))
} else {
info("Not deleting past files as in Append mode")
}
rdd.mapPartitionsWithIndex {
case (index, records) =>
records.toList.groupBy(_._1).mapValues(_.map(_._2)).foreach {
case (regularKey: RegularKey, data: List[NotProcessableRecord Either UntypedStruct]) =>
val (nprs: List[NotProcessableRecord], successes: List[UntypedStruct]) =
Foldable[List].partitionEither(data)(identity)
val filename = s"part-by-partition-index-$index.snappy.parquet"
Parquet.writeUntypedStruct(
data = successes,
schema = toMessageType(tableKeyToSchema(regularKey.tableKey)),
fsMode = fs,
path = s3 / bucket / tableKeyToOutputKey(regularKey.tableKey) / regularKey.partition.pathSuffix /?
continuallyRunStartTime.map(hourMinutePathSuffix) / filename
)
Parquet.writeNPRs(
nprs = nprs,
fsMode = fs,
path = s3 / bucket / tableKeyToOutputKeyNprs(regularKey.tableKey) / regularKey.partition.pathSuffix /?
continuallyRunStartTime.map(hourMinutePathSuffix) / filename
)
} pipe Iterator.single
}.count() // Just some action to force execution
info("Writing _SUCCESS files")
successFiles.foreach(S3Utils.uploadFileContent(bucket, "", _))
当然不能复制此代码,因为未提供许多方法和值。要点是:
- 我们将
_SUCCESS
文件的删除和以前的文件删除 - 每个火花分区都会产生一户外的输出文件(许多数据模式在同一分区中)
- 我们将
_SUCCESS
文件的写入曲柄
注意:
-
UntypedStruct
是我们任意模式的嵌套表示。有点像Row
,但要好得多,因为它基于Map[String, Any]
。 -
NotProcessableRecord
本质上只是死字母
Parquet.writeUntypedStruct
是编写镶木quet文件的逻辑的关键,因此我们将更详细地解释这一点。首先
val toMessageType: StructType => MessageType = new org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter().convert
应该是自我解释的。下一个fsMode
包含com.amazonaws.auth.AWSCredentials
,然后在writeUntypedStruct
内部使用它来构造org.apache.hadoop.conf.Configuration
设置fs.s3a.access.key
和fs.s3a.secret.key
。
writeUntypedStruct
基本上只是打电话给:
def writeRaw(
data: List[UntypedStruct],
schema: MessageType,
config: Configuration,
path: Path,
compression: CompressionCodecName = CompressionCodecName.SNAPPY
): Unit =
Using.resource(
ExampleParquetWriter.builder(path)
.withType(schema)
.withConf(config)
.withCompressionCodec(compression)
.withValidation(true)
.build()
)(writer => data.foreach(data => writer.write(transpose(data, new SimpleGroup(schema)))))
SimpleGroup
来自org.apache.parquet.example.data.simple
,而ExampleParquetWriter extends ParquetWriter<Group>
。该方法transpose
是通过UntypedStruct
填充Group
的非常乏味的自写递归(一些丑陋的Java Java可变级别的东西)。
信用必须转到https://github.com/davidainslie,以弄清楚这些基础库如何运作,并努力制定代码,就像我说的那样,我们打算尽快进行开源!
afaik,无法将一个RDD分为多个RDD本身。这就是Spark的DAG工作方式的方式:只有Child RDD从父rdds撤出数据。
但是,我们可以从同一父级RDD中读取多个子女RDD。为了避免重新计算父RDD,除了缓存之外,别无其他方法。我假设您想避免缓存,因为您害怕记忆不足。我们可以通过将RDD持续到MEMORY_AND_DISK
来避免出现记忆(OOM)问题在需要时。
让我们从您的原始数据开始:
val allDataRDD = sc.parallelize(Seq(Row(1,1,1),Row(2,2,2),Row(3,3,3)))
我们可以首先将其持续在内存中,但在内存不足的情况下,它可以溢出到磁盘上:
allDataRDD.persist(StorageLevel.MEMORY_AND_DISK)
然后,我们创建3个RDD输出:
filtered_data_1 = allDataRDD.filter(_.get(1)==1) // //
filtered_data_2 = allDataRDD.filter(_.get(2)==1) // use your own filter funcs here
filtered_data_3 = allDataRDD.filter(_.get(3)==1) // //
然后我们写出输出:
var resultDF_1 = sqlContext.createDataFrame(filtered_data_1, schema_1)
resultDF_1.write.parquet(output_path_1)
var resultDF_2 = sqlContext.createDataFrame(filtered_data_2, schema_2)
resultDF_2.write.parquet(output_path_2)
var resultDF_3 = sqlContext.createDataFrame(filtered_data_3, schema_3)
resultDF_3.write.parquet(output_path_3)
如果您真的真的想避免多次通过,则使用自定义分区器可以进行解决方法。您可以将数据重新分配到3个分区中,每个分区都会有其自己的任务,因此其输出文件/部分。警告是,并行性将大量降低到3个线程/任务,并且在单个分区中也存在> 2GB数据的风险(Spark每个分区的限制为2GB)。我没有为此方法提供详细的代码,因为我认为它可以编写具有不同架构的镶木quet文件。