在火花驱动程序中处理 Hive 记录



在我的用例中,我有一个包含 10 万条记录的 Hive 表。每条记录表示必须处理的原始数据文件。处理每个原始数据文件会生成一个 csv 文件,其大小在 10MB 到 500MB 之间变化。最终,这些 CSV 文件随后作为单独的进程填充到 HIve 表中。在我的企业集群中,仍然不建议在 hdfs 中生成大量数据。因此,我更喜欢将这两个单独的进程合并为一个进程,以便它们处理 5000 条记录乘以 5000 条记录。

我的问题:-

鉴于我的 rdd 引用整个 Hive 表,如何为每 5000 条记录执行原始数据处理步骤?(类似于 for 循环,每次增加 5000 条记录)

一种方法是使用 RDD 的滑动功能。你可以在 apache spark 的 mllib 包中找到它。以下是使用它的方法。 假设我们有一个包含 1000 个元素的 rdd

val rdd = sc.parallelize(1 to 1000)
import org.apache.spark.mllib.rdd._
val newRdd = RDDFunctions.fromRDD(rdd)
// sliding by 10 (instead use 5000 or what you need)
val rddSlidedBy10 = newRdd.sliding(10, 10)

结果将如下所示

Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Array(11, 12, 13, 14, 15, 16, 17, 18, 19, 20), Array(21, 22, 23, 24, 25, 26, 27, 28, 29, 30), Array(31, 32, 33, 34, 35, 36, 37, 38, 39, 40), Array(41, 42, 43, 44, 45, 46, 47, 48, 49, 50), Array(51, 52, 53, 54, 55, 56, 57, 58, 59, 60), Array(61, 62, 63, 64, 65, 66, 67, 68, 69, 70), Array(71, 72, 73, 74, 75, 76, 77, 78, 79, 80)

您可以在数组上对 foreach 进行处理,并将原始数据处理为 CSV

相关内容

最新更新