使用 Spark 处理时间序列数据



我们的要求是对 Phoenix (HBase( 时间序列表进行一些分析操作。 我们在postgreSQL中有一个表,它有uniqueIds。

现在我们从postgresql表中获取所有uniqueId,并在Phoenix表中查询相应的uniqueId并应用分析函数。但在这里,所有 uniqueId 都以顺序方式处理。我们需要它并行运行。我们使用 Scala 和 Spark 来实现此功能。

下面是示例代码,

val optionsMap = Map("driver" -> config.jdbcDriver, "url" -> config.jdbcUrl,
      "user" -> config.jdbcUser, "password" -> config.jdbcPassword,
      "dbtable" -> query)
val uniqDF = sqlContext.read.format("jdbc").options(optionsMap).load()
val results = uniqDF.collect
results.foreach { uniqId => 
  val data = loadHbaseData(uniqId)
  data.map(func).save()
}
def loadHbaseData(id: String): DataFrame = {
  sqlContext.phoenixTableAsDataFrame("TIMESERIETABLE", Array("TIMESTAMP", "XXXX",""), predicate = Some(""ID" = '" + uniqueId + "' "), conf = configuration)
}

你能告诉我这样做的更好方法是什么吗?

您可以使用 scala 提供的并行收集功能。

results.par.foreach {
// Your code to be executed
}
创建一个DataFrame,该是

HBase DataFrame s的并集,然后将分析函数应用于此单个DataFrame。像这样:

val hbaseDFs = results.map(loadHbaseData)
val unitedDF = hbaseDFs.reduce(_ union _)
unitedDF.map(func).save()

这种方法调用union大量 DataFrame s(根据您在另一个答案中的注释为 30K+(,因此在执行union之前将 DataFrame s 转换为 RDD s 可能会更快,如此处所述。

最新更新