我们的要求是对 Phoenix (HBase( 时间序列表进行一些分析操作。 我们在postgreSQL中有一个表,它有uniqueIds。
现在我们从postgresql表中获取所有uniqueId,并在Phoenix表中查询相应的uniqueId并应用分析函数。但在这里,所有 uniqueId 都以顺序方式处理。我们需要它并行运行。我们使用 Scala 和 Spark 来实现此功能。
下面是示例代码,
val optionsMap = Map("driver" -> config.jdbcDriver, "url" -> config.jdbcUrl,
"user" -> config.jdbcUser, "password" -> config.jdbcPassword,
"dbtable" -> query)
val uniqDF = sqlContext.read.format("jdbc").options(optionsMap).load()
val results = uniqDF.collect
results.foreach { uniqId =>
val data = loadHbaseData(uniqId)
data.map(func).save()
}
def loadHbaseData(id: String): DataFrame = {
sqlContext.phoenixTableAsDataFrame("TIMESERIETABLE", Array("TIMESTAMP", "XXXX",""), predicate = Some(""ID" = '" + uniqueId + "' "), conf = configuration)
}
你能告诉我这样做的更好方法是什么吗?
您可以使用 scala 提供的并行收集功能。
results.par.foreach {
// Your code to be executed
}
DataFrame
,该是HBase DataFrame
s的并集,然后将分析函数应用于此单个DataFrame
。像这样:
val hbaseDFs = results.map(loadHbaseData)
val unitedDF = hbaseDFs.reduce(_ union _)
unitedDF.map(func).save()
这种方法调用union
大量 DataFrame
s(根据您在另一个答案中的注释为 30K+(,因此在执行union
之前将 DataFrame
s 转换为 RDD
s 可能会更快,如此处所述。