Dataframe foreach循环-更好的方法来提取结果?



我在scala中有一个DataFrame,从中我需要为SourceHash字段的不同值创建一个新的DataFrame。

var myProductsList =  List[ProductInfo]()
val distinctFiles = dfDateFiltered.select(col("SourceHash")).distinct()
distinctFiles.foreach(rowFilter => {
val productInfo = createProductInfo(validFrom, validTo, dfDateFiltered, rowFilter.getString(0))
myProductsList = myProductsList :+ productInfo
})
myProductsList.toDF()

问题是,这段代码在createProductInfo中抛出java.lang.NullPointerException,用于对数据框架dfdatefilters的任何调用。我可以克服这个问题的唯一方法是在foreach之前使用collect(),例如:

distinctFiles.collect().foreach(rowFilter => {...
}

但是对方付费呼叫很昂贵,所以必须避免这样做。如何在不损失性能的情况下有效地提取新数据集?

下面是createProductInfo代码:

private def createProductInfo(validFrom: String, validTo: String, dfDateFiltered: Dataset[Row], rowFilter: String) : ProductInfo = {
val dfPerFile = dfDateFiltered.filter(col("SourceHash") === rowFilter)
val dfRow = dfPerFile.head
val clientCount = dfPerFile.filter(col("ServerOrClient") === "Client").count
val buildVersion = dfPerFile.filter(col("ServerOrClient") === "Server").select(col("BuildVersion")).head.getString(0)
val productInfo = ProductInfo(dfRow.getInt(0),
dfRow.getInt(1),
dfRow.getString(12),
dfRow.getString(13),
dfRow.getString(14),
validFrom,
validTo,
dfRow.getString(8),
dfRow.getTimestamp(9),
clientCount,
buildVersion
)
productInfo
}

Function "createProductInfo"可以避免,值可以分组收集。原始数据集不存在问题,方法可以在这些数据上显示:

val dfDateFiltered = Seq(
(1, "Server", 1),
(1, "Client", 2),
(2, "Client", 3)
).toDF("SourceHash", "ServerOrClient", "BuildVersion")
val validFrom = "Today"
dfDateFiltered
.groupBy("SourceHash")
.agg(sum(when($"ServerOrClient" === lit("Client"), 1).otherwise(0)).alias("clientCount"),
first(when($"ServerOrClient" === lit("Server"), col("BuildVersion")).otherwise(null), true).alias("buildVersion")
)
.withColumn("validFrom", lit(validFrom))
.as[Product]

输出:

+----------+-----------+------------+---------+
|SourceHash|clientCount|buildVersion|validFrom|
+----------+-----------+------------+---------+
|1         |1          |1           |Today    |
|2         |1          |null        |Today    |
+----------+-----------+------------+---------+

最新更新