在Spark DataFrame中布局时序数据的最佳方法-Scala



我正在从美联储经济数据集API导入数据。每个请求返回一个每日、每周、每月或每年的时间序列。我的最终目标是进行变量选择,并建立一个基于贝叶斯的模型,使用所选的时间序列作为特定时间序列的预测因子。将这些数据结构化为数据帧的最佳方式是什么?

根据这份文档,我认为我的数据应该以"即时"格式排列。然而,在尝试加入超过20万个专栏后,我为实现这一目标所做的努力都慢得令人望而却步。以下文档参考中的另一种格式是"TimeSeriesRDD",但导入的时间序列通常没有日期重叠,范围从1930年到现在。那么,将这些数据结构化为数据帧的最佳方法是什么呢?

一个如何将数据从FRED加载到您推荐的格式的例子将不胜感激!

这是我的第一个慢得令人望而却步的方法

for (seriesId <- allSeries) {
val series = loadSeriesFromAPI(seriesId, spark)
allSeries = allSeries.join(series, allSeries.col("date") === series.col(seriesId + "_date"), "outer")
allSeries = allSeries.drop(seriesId + "_date")
}

第二次,我必须在数据中一次加载1列和1行

for(row <- series) {
val insertStr = "%s, %g".
format(
row.asInstanceOf[Map[String, Date]]("date").asInstanceOf[String],
parseDoubleOrZero(row.asInstanceOf[Map[String, Double]]("value").asInstanceOf[String])
)
}

拥有一个包含200000列的DataFrame不是一个好主意。我建议的一件事是在不混合太多技术的情况下稍微解决问题:

  1. 数据摄入:您的系列实际上有多大?尽可能避免加入(加入意味着洗牌,洗牌意味着网络,这会使一切变得缓慢)。如果合适的话,我会用Scala收集数据并将其保存在内存中,如果不合适,我仍然会在Scala中收集一批系列,并在Spark DataFrame中转换每一批
  2. 数据帧创建:如果你设法在内存中获得数据,那么你可以尝试以下片段,它将为你创建一个数据帧:
case class Point(timestamp: Long, value: Long)
case class Series(id: String, points: List[Point])
val s1 = Series("s1", List(Point(1, 100), Point(2, 200), Point(3, 100)))
val s2 = Series("s2", List(Point(1, 1000), Point(3, 100)))
val seriesDF = sc.parallelize(Array(s1, s2)).toDF
seriesDF.show()
seriesDF.select($"id", explode($"points").as("point"))
.select($"id", $"point.timestamp", $"point.value")
.show()

输出:

+---+--------------------+
| id|              points|
+---+--------------------+
| s1|[[1,100], [2,200]...|
| s2| [[1,1000], [3,100]]|
+---+--------------------+
+---+---------+-----+
| id|timestamp|value|
+---+---------+-----+
| s1|        1|  100|
| s1|        2|  200|
| s1|        3|  100|
| s2|        1| 1000|
| s2|        3|  100|
+---+---------+-----+

对于处理时间序列的更奇特的方法,我推荐以下项目:https://github.com/twosigma/flint

最新更新