如何使用Spark改善Kudu读取?



我有一个过程,给定一个新的输入,从我们的Kudu数据库中检索相关信息,然后进行一些计算。

问题在于数据检索,我们有1.201.524.092 行,对于任何计算,都需要永远开始处理所需的行,因为读者需要全力以赴。

要阅读 kudu 我们这样做:

def read(tableName: String): Try[DataFrame] = {
val kuduOptions: Map[String, String] = Map(
"kudu.table" -> tableName,
"kudu.master" -> kuduContext.kuduMaster)

SQLContext.read.options(kuduOptions).format("kudu").load

}

然后:

val newInputs = ??? // Dataframe with the new inputs
val currentInputs = read("inputsTable") // This takes too much time!!!!
val relatedCurrent = currentInputs.join(newInputs.select("commonId", Seq("commonId"), "inner")
doThings(newInputs, relatedCurrent)

例如,我们只想引入一个新输入。好吧,它必须扫描整个表才能找到当前输入,该输入使81.6 GB/1201524092行的随机写入。

我怎样才能改善这一点?

谢谢

您可以收集新输入,然后可以在 where 子句中使用它。 使用这种方式,您可以轻松命中OOM,但它可以使查询非常快,因为它将受益于谓词下推

val collectedIds = newInputs.select("commonId").collect
val filtredCurrentInputs = currentInputs.where($"commonId".isin(collectedIds))

最新更新