我正在尝试将存储在json-per-line TextFiles中的数据转换为s3上的orc或parquet之类的结构化柱状格式。
源文件包含多个方案的数据(例如HTTP请求,HTTP响应,...),需要将其解析为正确类型的不同SPARK数据范围。
示例模式:
val Request = StructType(Seq(
StructField("timestamp", TimestampType, nullable=false),
StructField("requestId", LongType),
StructField("requestMethod", StringType),
StructField("scheme", StringType),
StructField("host", StringType),
StructField("headers", MapType(StringType, StringType, valueContainsNull=false)),
StructField("path", StringType),
StructField("sessionId", StringType),
StructField("userAgent", StringType)
))
val Response = StructType(Seq(
StructField("timestamp", TimestampType, nullable=false),
StructField("requestId", LongType),
StructField("contentType", StringType),
StructField("contentLength", IntegerType),
StructField("statusCode", StringType),
StructField("headers", MapType(keyType=StringType, valueType=StringType, valueContainsNull=false)),
StructField("responseDuration", DoubleType),
StructField("sessionId", StringType)
))
我让该部分正常工作,但是试图尽可能有效地将数据写回S3似乎是一个问题。
我尝试了3种方法:
- Silex项目的Muxpartitions
- 缓存解析的S3输入并多次循环
- 使每个方案键入RDD的单独分区
在第一种情况下,JVM用完了记忆,第二个机器用完了磁盘空间。
我尚未对第三次进行测试,但这似乎并没有有效地使用处理能力(因为群集的一个节点(该特定分区所在的一个节点)实际上是将数据写回去到S3)。
相关代码:
val allSchemes = Schemes.all().keys.toArray
if (false) {
import com.realo.warehouse.multiplex.implicits._
val input = readRawFromS3(inputPrefix) // returns RDD[Row]
.flatMuxPartitions(allSchemes.length, data => {
val buffers = Vector.tabulate(allSchemes.length) { j => ArrayBuffer.empty[Row] }
data.foreach {
logItem => {
val schemeIndex = allSchemes.indexOf(logItem.logType)
if (schemeIndex > -1) {
buffers(schemeIndex).append(logItem.row)
}
}
}
buffers
})
allSchemes.zipWithIndex.foreach {
case (schemeName, index) =>
val rdd = input(index)
writeColumnarToS3(rdd, schemeName)
}
} else if (false) {
// Naive approach
val input = readRawFromS3(inputPrefix) // returns RDD[Row]
.persist(StorageLevel.MEMORY_AND_DISK)
allSchemes.foreach {
schemeName =>
val rdd = input
.filter(x => x.logType == schemeName)
.map(x => x.row)
writeColumnarToS3(rdd, schemeName)
}
input.unpersist()
} else {
class CustomPartitioner extends Partitioner {
override def numPartitions: Int = allSchemes.length
override def getPartition(key: Any): Int = allSchemes.indexOf(key.asInstanceOf[String])
}
val input = readRawFromS3(inputPrefix)
.map(x => (x.logType, x.row))
.partitionBy(new CustomPartitioner())
.map { case (logType, row) => row }
.persist(StorageLevel.MEMORY_AND_DISK)
allSchemes.zipWithIndex.foreach {
case (schemeName, index) =>
val rdd = input
.mapPartitionsWithIndex(
(i, iter) => if (i == index) iter else Iterator.empty,
preservesPartitioning = true
)
writeColumnarToS3(rdd, schemeName)
}
input.unpersist()
}
从概念上讲,我认为代码应具有每个方案类型的1个输出Dstream,并且输入RDD应将每个处理的项目都选中到正确的Dstream(带有批处理以获得更好的吞吐量)。
。有人对如何实施这一点有任何指导吗?和/或有更好的解决这个问题的方法?
给定输入是JSON,您可以将其读取为字符串的数据框架(每行都是一个字符串)。然后,您可以从每个JSON中提取类型(通过使用UDF或使用诸如get_json_object或json_tuple之类的函数)。
现在您有两个列:类型和原始JSON。现在,您可以在编写DataFrame时使用partitionby dataframe选项。这将导致每种类型的目录,目录的内容将包括原始JSON。
现在您可以使用自己的模式读取每种类型。
您还可以使用映射使用RDD做类似的操作,该地图将输入RDD变成对RDD,其键为类型,值为JSON转换为目标架构。然后,您可以使用分区和映射分区将每个分区保存到文件中,也可以使用key用键写入到不同的文件(例如,使用键来设置文件名)。
您还可以看一下Key Spark的多个输出 - 一个Spark Job
请注意,我在这里假设目标是拆分到文件。根据您的特定用例,其他选项可能是可行的。例如,如果您的不同模式足够接近,则可以创建一个超级模式,该模式包含所有模式并直接从中创建数据框架。然后,您可以直接处理数据框架,也可以使用DataFrame分区将不同的子类型编写为不同目录(但这一次已经保存到Parquet)。
这是我最终提出的:
我使用自定义分区器根据其方案以及行的标签进行分区。
。这里的原因是我们希望能够仅处理某些分区,但仍允许所有节点参与(出于绩效原因)。因此,我们不仅将数据传播到1个分区,而是通过X分区(x是节点时时间2的数量)。
)。对于每个方案,我们都会修剪不需要的分区,因此我们只能处理我们所做的分区。
代码示例:
def process(date : ReadableInstant, schemesToProcess : Array[String]) = {
// Tweak this based on your use case
val DefaultNumberOfStoragePartitions = spark.sparkContext.defaultParallelism * 2
class CustomPartitioner extends Partitioner {
override def numPartitions: Int = schemesToProcess.length * DefaultNumberOfStoragePartitions
override def getPartition(key: Any): Int = {
// This is tightly coupled with how `input` gets transformed below
val (logType, rowHashCode) = key.asInstanceOf[(String, Int)]
(schemesToProcess.indexOf(logType) * DefaultNumberOfStoragePartitions) + Utils.nonNegativeMod(rowHashCode, DefaultNumberOfStoragePartitions)
}
/**
* Internal helper function to retrieve all partition indices for the given key
* @param key input key
* @return
*/
private def getPartitions(key: String): Seq[Int] = {
val index = schemesToProcess.indexOf(key) * DefaultNumberOfStoragePartitions
index until (index + DefaultNumberOfStoragePartitions)
}
/**
* Returns an RDD which only traverses the partitions for the given key
* @param rdd base RDD
* @param key input key
* @return
*/
def filterRDDForKey[T](rdd: RDD[T], key: String): RDD[T] = {
val partitions = getPartitions(key).toSet
PartitionPruningRDD.create(rdd, x => partitions.contains(x))
}
}
val partitioner = new CustomPartitioner()
val input = readRawFromS3(date)
.map(x => ((x.logType, x.row.hashCode), x.row))
.partitionBy(partitioner)
.persist(StorageLevel.MEMORY_AND_DISK_SER)
// Initial stage: caches the processed data + gets an enumeration of all schemes in this RDD
val schemesInRdd = input
.map(_._1._1)
.distinct()
.collect()
// Remaining stages: for each scheme, write it out to S3 as ORC
schemesInRdd.zipWithIndex.foreach {
case (schemeName, index) =>
val rdd = partitioner.filterRDDForKey(input, schemeName)
.map(_._2)
.coalesce(DefaultNumberOfStoragePartitions)
writeColumnarToS3(rdd, schemeName)
}
input.unpersist()
}