对包含爆炸和groupby的代码进行调优的建议



我为下面的probelem编写了代码,但它有以下问题。请建议我是否可以进行一些调整。

  1. 我想这需要更多的时间
  2. 截至目前,共有3个品牌。它是硬编码的。如果要添加更多的品牌,我需要手动添加代码

输入数据帧模式:

root
|-- id: string (nullable = true)
|-- attrib: map (nullable = true)
|    |-- key: string
|    |-- value: string (valueContainsNull = true)
|-- pref: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- pref_type: string (nullable = true)
|    |    |-- brand: string (nullable = true)
|    |    |-- tp_id: string (nullable = true)
|    |    |-- aff: float (nullable = true)
|    |    |-- pre_id: string (nullable = true)
|    |    |-- cr_date: string (nullable = true)
|    |    |-- up_date: string (nullable = true)
|    |    |-- pref_attrib: map (nullable = true)
|    |    |    |-- key: string
|    |    |    |-- value: string (valueContainsNull = true)

预期输出模式:

root
|-- id: string (nullable = true)
|-- attrib: map (nullable = true)
|    |-- key: string
|    |-- value: string (valueContainsNull = true)
|-- pref: struct (nullable = false)
|    |-- brandA: array (nullable = true)
|    |    |-- element: struct (containsNull = false)
|    |    |    |-- pref_type: string (nullable = true)
|    |    |    |-- tp_id: string (nullable = true)
|    |    |    |-- aff: float (nullable = true)
|    |    |    |-- pref_id: string (nullable = true)
|    |    |    |-- cr_date: string (nullable = true)
|    |    |    |-- up_date: string (nullable = true)
|    |    |    |-- pref_attrib: map (nullable = true)
|    |    |    |    |-- key: string
|    |    |    |    |-- value: string (valueContainsNull = true)
|    |-- brandB: array (nullable = true)
|    |    |-- element: struct (containsNull = false)
|    |    |    |-- pref_type: string (nullable = true)
|    |    |    |-- tp_id: string (nullable = true)
|    |    |    |-- aff: float (nullable = true)
|    |    |    |-- pref_id: string (nullable = true)
|    |    |    |-- cr_date: string (nullable = true)
|    |    |    |-- up_date: string (nullable = true)
|    |    |    |-- pref_attrib: map (nullable = true)
|    |    |    |    |-- key: string
|    |    |    |    |-- value: string (valueContainsNull = true)
|    |-- brandC: array (nullable = true)
|    |    |-- element: struct (containsNull = false)
|    |    |    |-- pref_type: string (nullable = true)
|    |    |    |-- tp_id: string (nullable = true)
|    |    |    |-- aff: float (nullable = true)
|    |    |    |-- pref_id: string (nullable = true)
|    |    |    |-- cr_date: string (nullable = true)
|    |    |    |-- up_date: string (nullable = true)
|    |    |    |-- pref_attrib: map (nullable = true)
|    |    |    |    |-- key: string
|    |    |    |    |-- value: string (valueContainsNull = true)

可以根据偏好下的品牌属性(preferences.brand(进行处理

我已经为此写了以下代码:

def modifyBrands(inputDf: DataFrame): DataFrame ={
val PreferenceProps = Array("pref_type", "tp_id", "aff", "pref_id", "cr_date", "up_date", "pref_attrib")
import org.apache.spark.sql.functions._
val explodedDf = inputDf.select(col("id"), explode(col("pref")))
.select(
col("id"),
col("col.pref_type"),
col("col.brand"),
col("col.tp_id"),
col("col.aff"),
col("col.pre_id"),
col("col.cr_dt"),
col("col.up_dt"),
col("col.pref_attrib")
).cache()
val brandAddedDf = explodedDf
.withColumn("brandA", when(col("brand") === "brandA", struct(PreferenceProps.head, PreferenceProps.tail:_*)).as("brandA"))
.withColumn("brandB", when(col("brand") === "brandB", struct(PreferenceProps.head, PreferenceProps.tail:_*)).as("brandB"))
.withColumn("brandC", when(col("brand") === "brandC", struct(PreferenceProps.head, PreferenceProps.tail:_*)).as("brandC"))
.cache()
explodedDf.unpersist()
val groupedDf = brandAddedDf.groupBy("id").agg(
collect_list("brandA").alias("brandA"),
collect_list("brandB").alias("brandB"),
collect_list("brandC").alias("brandC")
).withColumn("preferences", struct(
when(size(col("brandA")).notEqual(0), col("brandA")).alias("brandA"),
when(size(col("brandB")).notEqual(0), col("brandB")).alias("brandB"),
when(size(col("brandC")).notEqual(0), col("brandC")).alias("brandC"),
)).drop("brandA", "brandB", "brandC")
.cache()
brandAddedDf.unpersist()
val idAttributesDf = inputDf.select("id", "attrib").cache()
val joinedDf = idAttributesDf.join(groupedDf, "id")
groupedDf.unpersist()
idAttributesDf.unpersist()
joinedDf.printSchema()
joinedDf // returning joined df which will be wrote as paquet file.
}

您可以在数组上使用高阶函数filter来简化代码。只需映射品牌名称,并为每个品牌名称返回一个来自pref的过滤数组。这样可以避免分解/分组零件。

这里有一个完整的例子:

val data = """{"id":1,"attrib":{"key":"k","value":"v"},"pref":[{"pref_type":"type1","brand":"brandA","tp_id":"id1","aff":"aff1","pre_id":"pre_id1","cr_date":"2021-01-06","up_date":"2021-01-06","pref_attrib":{"key":"k","value":"v"}},{"pref_type":"type1","brand":"brandB","tp_id":"id1","aff":"aff1","pre_id":"pre_id1","cr_date":"2021-01-06","up_date":"2021-01-06","pref_attrib":{"key":"k","value":"v"}},{"pref_type":"type1","brand":"brandC","tp_id":"id1","aff":"aff1","pre_id":"pre_id1","cr_date":"2021-01-06","up_date":"2021-01-06","pref_attrib":{"key":"k","value":"v"}}]}"""
val inputDf = spark.read.json(Seq(data).toDS)
val brands = Seq("brandA", "brandB", "brandC")
// or getting them from input dataframe
// val brands = inputDf.select("pref.brand").as[Seq[String]].collect.flatten

val brandAddedDf = inputDf.withColumn(
"pref",
struct(brands.map(b => expr(s"filter(pref, x -> x.brand = '$b')").as(b)): _*)
)
brandAddedDf.printSchema
//root
// |-- attrib: struct (nullable = true)
// |    |-- key: string (nullable = true)
// |    |-- value: string (nullable = true)
// |-- id: long (nullable = true)
// |-- pref: struct (nullable = false)
// |    |-- brandA: array (nullable = true)
// |    |    |-- element: struct (containsNull = true)
// |    |    |    |-- aff: string (nullable = true)
// |    |    |    |-- brand: string (nullable = true)
// |    |    |    |-- cr_date: string (nullable = true)
// |    |    |    |-- pre_id: string (nullable = true)
// |    |    |    |-- pref_attrib: struct (nullable = true)
// |    |    |    |    |-- key: string (nullable = true)
// |    |    |    |    |-- value: string (nullable = true)
// |    |    |    |-- pref_type: string (nullable = true)
// |    |    |    |-- tp_id: string (nullable = true)
// |    |    |    |-- up_date: string (nullable = true)
// |    |-- brandB: array (nullable = true)
// |    |    |-- element: struct (containsNull = true)
// |    |    |    |-- aff: string (nullable = true)
// |    |    |    |-- brand: string (nullable = true)
// |    |    |    |-- cr_date: string (nullable = true)
// |    |    |    |-- pre_id: string (nullable = true)
// |    |    |    |-- pref_attrib: struct (nullable = true)
// |    |    |    |    |-- key: string (nullable = true)
// |    |    |    |    |-- value: string (nullable = true)
// |    |    |    |-- pref_type: string (nullable = true)
// |    |    |    |-- tp_id: string (nullable = true)
// |    |    |    |-- up_date: string (nullable = true)
// |    |-- brandC: array (nullable = true)
// |    |    |-- element: struct (containsNull = true)
// |    |    |    |-- aff: string (nullable = true)
// |    |    |    |-- brand: string (nullable = true)
// |    |    |    |-- cr_date: string (nullable = true)
// |    |    |    |-- pre_id: string (nullable = true)
// |    |    |    |-- pref_attrib: struct (nullable = true)
// |    |    |    |    |-- key: string (nullable = true)
// |    |    |    |    |-- value: string (nullable = true)
// |    |    |    |-- pref_type: string (nullable = true)
// |    |    |    |-- tp_id: string (nullable = true)
// |    |    |    |-- up_date: string (nullable = true)

我认为它们是如何编写代码的几个问题,但判断代码哪里有问题的真正方法是查看SPARK UI。我发现";乔布斯;选项卡和";SQL";选项卡提供了非常丰富的信息,可以了解代码大部分时间花在哪里。然后看看这些部分是否可以重写以提高速度。如果其他地方确实存在瓶颈,我在下面指出的一些项目可能并不重要,因为大部分时间都花在了瓶颈上。

创建嵌套结构是有原因的(就像你为品牌所做的那样(。我只是不确定我在这里看到了回报,而且没有解释。应该考虑你为什么要维护这个结构,以及它的好处是什么。维护它会带来性能提升吗?或者它只是数据创建过程中的一个人工产物?

可能有所帮助的一般提示:

通常,您应该只缓存将要使用多次的代码。你有很多代码不会多次使用,但你仍然可以缓存。

小小的性能提升。(换句话说,当你需要每毫秒…(withColumn实际上并没有select那么好。(可能是由于创建了一些对象(在可能的情况下使用select而不是with Column。除非你真的需要每一毫秒,否则不值得重写你的代码。

最新更新