将RDD拆除到多个ORC表上



我正在尝试将存储在json-per-line TextFiles中的数据转换为s3上的orc或parquet之类的结构化柱状格式。

源文件包含多个方案的数据(例如HTTP请求,HTTP响应,...),需要将其解析为正确类型的不同SPARK数据范围。

示例模式:

  val Request = StructType(Seq(
    StructField("timestamp", TimestampType, nullable=false),
    StructField("requestId", LongType),
    StructField("requestMethod", StringType),
    StructField("scheme", StringType),
    StructField("host", StringType),
    StructField("headers", MapType(StringType, StringType, valueContainsNull=false)),
    StructField("path", StringType),
    StructField("sessionId", StringType),
    StructField("userAgent", StringType)
  ))
  val Response = StructType(Seq(
    StructField("timestamp", TimestampType, nullable=false),
    StructField("requestId", LongType),
    StructField("contentType", StringType),
    StructField("contentLength", IntegerType),
    StructField("statusCode", StringType),
    StructField("headers", MapType(keyType=StringType, valueType=StringType, valueContainsNull=false)),
    StructField("responseDuration", DoubleType),
    StructField("sessionId", StringType)
  ))

我让该部分正常工作,但是试图尽可能有效地将数据写回S3似乎是一个问题。

我尝试了3种方法:

  1. Silex项目的Muxpartitions
  2. 缓存解析的S3输入并多次循环
  3. 使每个方案键入RDD的单独分区

在第一种情况下,JVM用完了记忆,第二个机器用完了磁盘空间。

我尚未对第三次进行测试,但这似乎并没有有效地使用处理能力(因为群集的一个节点(该特定分区所在的一个节点)实际上是将数据写回去到S3)。

相关代码:

val allSchemes = Schemes.all().keys.toArray
if (false) {
  import com.realo.warehouse.multiplex.implicits._
  val input = readRawFromS3(inputPrefix) // returns RDD[Row]
    .flatMuxPartitions(allSchemes.length, data => {
      val buffers = Vector.tabulate(allSchemes.length) { j => ArrayBuffer.empty[Row] }
      data.foreach {
        logItem => {
          val schemeIndex = allSchemes.indexOf(logItem.logType)
          if (schemeIndex > -1) {
            buffers(schemeIndex).append(logItem.row)
          }
        }
      }
      buffers
    })
  allSchemes.zipWithIndex.foreach {
    case (schemeName, index) =>
      val rdd = input(index)
      writeColumnarToS3(rdd, schemeName)
  }
} else if (false) {
  // Naive approach
  val input = readRawFromS3(inputPrefix) // returns RDD[Row]
    .persist(StorageLevel.MEMORY_AND_DISK)
  allSchemes.foreach {
    schemeName =>
      val rdd = input
        .filter(x => x.logType == schemeName)
        .map(x => x.row)
      writeColumnarToS3(rdd, schemeName)
  }
  input.unpersist()
} else {
  class CustomPartitioner extends Partitioner {
    override def numPartitions: Int = allSchemes.length
    override def getPartition(key: Any): Int = allSchemes.indexOf(key.asInstanceOf[String])
  }
    val input = readRawFromS3(inputPrefix)
      .map(x => (x.logType, x.row))
      .partitionBy(new CustomPartitioner())
      .map { case (logType, row) => row }
      .persist(StorageLevel.MEMORY_AND_DISK)
    allSchemes.zipWithIndex.foreach {
      case (schemeName, index) =>
        val rdd = input
          .mapPartitionsWithIndex(
            (i, iter) => if (i == index) iter else Iterator.empty,
            preservesPartitioning = true
          )
        writeColumnarToS3(rdd, schemeName)
    }
    input.unpersist()
}

从概念上讲,我认为代码应具有每个方案类型的1个输出Dstream,并且输入RDD应将每个处理的项目都选中到正确的Dstream(带有批处理以获得更好的吞吐量)。

有人对如何实施这一点有任何指导吗?和/或有更好的解决这个问题的方法?

给定输入是JSON,您可以将其读取为字符串的数据框架(每行都是一个字符串)。然后,您可以从每个JSON中提取类型(通过使用UDF或使用诸如get_json_object或json_tuple之类的函数)。

现在您有两个列:类型和原始JSON。现在,您可以在编写DataFrame时使用partitionby dataframe选项。这将导致每种类型的目录,目录的内容将包括原始JSON。

现在您可以使用自己的模式读取每种类型。

您还可以使用映射使用RDD做类似的操作,该地图将输入RDD变成对RDD,其键为类型,值为JSON转换为目标架构。然后,您可以使用分区和映射分区将每个分区保存到文件中,也可以使用key用键写入到不同的文件(例如,使用键来设置文件名)。

您还可以看一下Key Spark的多个输出 - 一个Spark Job

请注意,我在这里假设目标是拆分到文件。根据您的特定用例,其他选项可能是可行的。例如,如果您的不同模式足够接近,则可以创建一个超级模式,该模式包含所有模式并直接从中创建数据框架。然后,您可以直接处理数据框架,也可以使用DataFrame分区将不同的子类型编写为不同目录(但这一次已经保存到Parquet)。

这是我最终提出的:

我使用自定义分区器根据其方案以及行的标签进行分区。

这里的原因是我们希望能够仅处理某些分区,但仍允许所有节点参与(出于绩效原因)。因此,我们不仅将数据传播到1个分区,而是通过X分区(x是节点时时间2的数量)。

)。

对于每个方案,我们都会修剪不需要的分区,因此我们只能处理我们所做的分区。

代码示例:

def process(date : ReadableInstant, schemesToProcess : Array[String]) = {
  // Tweak this based on your use case
  val DefaultNumberOfStoragePartitions = spark.sparkContext.defaultParallelism * 2
  class CustomPartitioner extends Partitioner {
    override def numPartitions: Int = schemesToProcess.length * DefaultNumberOfStoragePartitions
    override def getPartition(key: Any): Int = {
      // This is tightly coupled with how `input` gets transformed below
      val (logType, rowHashCode) = key.asInstanceOf[(String, Int)]
      (schemesToProcess.indexOf(logType) * DefaultNumberOfStoragePartitions) + Utils.nonNegativeMod(rowHashCode, DefaultNumberOfStoragePartitions)
    }
    /**
      * Internal helper function to retrieve all partition indices for the given key
      * @param key input key
      * @return
      */
    private def getPartitions(key: String): Seq[Int] = {
      val index = schemesToProcess.indexOf(key) * DefaultNumberOfStoragePartitions
      index until (index + DefaultNumberOfStoragePartitions)
    }
    /**
      * Returns an RDD which only traverses the partitions for the given key
      * @param rdd base RDD
      * @param key input key
      * @return
      */
    def filterRDDForKey[T](rdd: RDD[T], key: String): RDD[T] = {
      val partitions = getPartitions(key).toSet
      PartitionPruningRDD.create(rdd, x => partitions.contains(x))
    }
  }
  val partitioner = new CustomPartitioner()
  val input = readRawFromS3(date)
    .map(x => ((x.logType, x.row.hashCode), x.row))
    .partitionBy(partitioner)
    .persist(StorageLevel.MEMORY_AND_DISK_SER)
  // Initial stage: caches the processed data + gets an enumeration of all schemes in this RDD
  val schemesInRdd = input
    .map(_._1._1)
    .distinct()
    .collect()
  // Remaining stages: for each scheme, write it out to S3 as ORC
  schemesInRdd.zipWithIndex.foreach {
    case (schemeName, index) =>
      val rdd = partitioner.filterRDDForKey(input, schemeName)
        .map(_._2)
        .coalesce(DefaultNumberOfStoragePartitions)
      writeColumnarToS3(rdd, schemeName)
  }
  input.unpersist()
}

相关内容

  • 没有找到相关文章

最新更新