在火花 scala 中按列名计算行比例



我有一个数字数据框,想按列名计算行比例,但无法弄清楚

我的数据如下所示:

A1  B1  A2  B2
1   2   3   3

我想将数据帧转换为:

A1  B1  A2  B2
.25 .2  .75 .6

即新值是由其列名确定的逐行比例

如果我正确回答了您的问题,您可以通过以下方式进行操作


考虑这个例子DataFrame

val df: DataFrame = spark.createDataFrame(
rowRDD = spark.sparkContext.parallelize(Seq(
Row(1, 10, 100, 2, 20, 3),
Row(2, 20, 200, 4, 40, 6)
)),
schema = StructType(List(
StructField("A1", IntegerType, true),
StructField("B1", IntegerType, true),
StructField("C1", IntegerType, true),
StructField("A2", IntegerType, true),
StructField("B2", IntegerType, true),
StructField("A3", IntegerType, true)
))
)
+---+---+---+---+---+---+
| A1| B1| C1| A2| B2| A3|
+---+---+---+---+---+---+
|  1| 10|100|  2| 20|  3|
|  2| 20|200|  4| 40|  6|
+---+---+---+---+---+---+

还有这个列前缀列表

val columnNamePrefixes: Seq[String] = Seq("A", "B", "C")

1. 查找与给定前缀匹配的列的名称

def getColumnNameGroups(df: DataFrame, columnNamePrefixes: Seq[String]): Map[String, Seq[String]] = {
columnNamePrefixes.foldLeft(Map.empty[String, Seq[String]]) { (colNameGroupsMod: Map[String, Seq[String]], colNamePrefix: String) =>
val colNames: Seq[String] = df.columns.
toList.
filter(_.startsWith(colNamePrefix))
colNameGroupsMod + (colNamePrefix -> colNames)
}
}
(A -> List(A1, A2, A3))
(B -> List(B1, B2))
(C -> List(C1))

2.通过添加以相同前缀开头的所有列来创建求和列

def addSumColumns(df: DataFrame, colNameGroups: Map[String, Seq[String]]): DataFrame = {
colNameGroups.foldLeft(df) { (modDf: DataFrame, tup: (String, Seq[String])) =>
val sumColName: String = s"${tup._1}_sum"
val columnsToSum: Seq[Column] = tup._2.map(df(_))
val modDfSum: DataFrame = modDf.withColumn(sumColName, columnsToSum.reduce(_ + _))
modDfSum
}
}
+---+---+---+---+---+---+-----+-----+-----+
| A1| B1| C1| A2| B2| A3|A_sum|B_sum|C_sum|
+---+---+---+---+---+---+-----+-----+-----+
|  1| 10|100|  2| 20|  3|    6|   30|  100|
|  2| 20|200|  4| 40|  6|   12|   60|  200|
+---+---+---+---+---+---+-----+-----+-----+

3.通过将列与总和列除法来将列转换为其比率

def convertToRatioColumns(df: DataFrame, colNameGroups: Map[String, Seq[String]]): DataFrame = {
colNameGroups.foldLeft(df) { (modDf: DataFrame, tup: (String, Seq[String])) =>
val sumColName: String = s"${tup._1}_sum"
val modDfSum: DataFrame = tup._2.foldLeft(modDf) { (modDfTmp: DataFrame, colName: String) =>
modDfTmp.withColumn(colName, modDfTmp(colName).divide(modDfTmp(sumColName)))
}
modDfSum
}
}
+--------+-------+---+-------+-------+---+-----+-----+-----+
|      A1|     B1| C1|     A2|     B2| A3|A_sum|B_sum|C_sum|
+--------+-------+---+-------+-------+---+-----+-----+-----+
|0.166666|0.33333|1.0|0.33333|0.66666|0.5|    6|   30|  100|
|0.166666|0.33333|1.0|0.33333|0.66666|0.5|   12|   60|  200|
+--------+-------+---+-------+-------+---+-----+-----+-----+

4.删除总和列

def dropSumColumns(df: DataFrame, colNameGroups: Map[String, Seq[String]]): DataFrame = {
colNameGroups.foldLeft(df) { (modDf: DataFrame, tup: (String, Seq[String])) =>
val sumColName: String = s"${tup._1}_sum"
modDf.drop(sumColName)
}
}
+--------+-------+---+-------+-------+---+
|      A1|     B1| C1|     A2|     B2| A3|
+--------+-------+---+-------+-------+---+
|0.166666|0.33333|1.0|0.33333|0.66666|0.5|
|0.166666|0.33333|1.0|0.33333|0.66666|0.5|
+--------+-------+---+-------+-------+---+

我们将数据框称为df

可以使用df.schema.fieldNames获取需要计算的列名的列表。

然后使用df.withColumn(colName, expr)根据每个字段的字段名称更改值。这可能需要多次调用 df.withColumn,您可以递归执行此操作以链接操作。

您也可以使用可能更简单的.map

你可以这样做:)

val source_DF = spark.sparkContext.parallelize(List((1,2,3,3))).toDF("A1","B1","A2","B2")
val sum_DF = source_DF.withColumn("SUM", source_DF.columns.map(c => col(c)).reduce((c1, c2) => c1 + c2))
val proportions_DF = sum_DF.withColumn("A1", col("A1").divide(col("SUM")))
.withColumn("B1", col("B1").divide(col("SUM")))
.withColumn("A2", col("A2").divide(col("SUM")))
.withColumn("B2", col("B2").divide(col("SUM")))
val Result_DF = proportions_DF.drop("SUM").show()
+-------------------+-------------------+------------------+------------------+
|                 A1|                 B1|                A2|                B2|
+-------------------+-------------------+------------------+------------------+
| 0.1111111111111111| 0.2222222222222222|0.3333333333333333|0.3333333333333333|

最新更新