Spark 时间序列 - 如何并行化代码?



我有一个具有以下架构的数据集。

inputData.printSchema()
|-- seriesKeys: string (nullable = true)
|-- timestamp: long (nullable = true)
|-- targetSeries: double (nullable = true)

使用 sparkts 库,我想以分布式方式生成大量时间序列的 ARIMA 预测。通常,我可以通过使用 TimeSeriesRDD 来解决这个问题,如 Sandy Ryza 的这篇博客中所述。根据我的理解,TimeSeriesRDD要求所有系列具有相同的开始和结束日期,而我的数据集并非如此。所以我需要编写一个可以处理这种情况的代码。

目前,我的代码编写方式是所有内容都在驱动程序上执行,而不是在执行器之间分发:

val seriesKeys: Array[Row] = inputData.select("seriesKeys").distinct().collect()
val forecastArray : Array[Array[Double]] = seriesKeys.map(key => {
val inputDataTemp : Dataset[mySchema] = inputData.where(s"seriesKeys ='$key'")
val dfTS : DataFrame = inputDataTemp.select("targetSeries")
val rddTS : RDD[Double] = dfTS.rdd.map(row => row.getAs[Double]("targetSeries"))
val dvTS: Vector = Vectors.dense(rddTS.collect())
val dvForecast: Vector = ARIMA.autoFit(dvTS, 5, 2, 5).forecast(dvTS, 12)
dvForecast.toArray
})

如何重新设计代码以允许并行执行以提高性能?

这不是电话双关语,但不要打电话给collect。您需要使用分布式抽象进行操作,例如RDDDataset/DataFrame。您现在拥有它的方式seriesKeys没有分发,因为它是一个Array

最新更新