我有一个数字数据框,想按列名计算行比例,但无法弄清楚
我的数据如下所示:
A1 B1 A2 B2
1 2 3 3
我想将数据帧转换为:
A1 B1 A2 B2
.25 .2 .75 .6
即新值是由其列名确定的逐行比例
如果我正确回答了您的问题,您可以通过以下方式进行操作
考虑这个例子DataFrame
val df: DataFrame = spark.createDataFrame(
rowRDD = spark.sparkContext.parallelize(Seq(
Row(1, 10, 100, 2, 20, 3),
Row(2, 20, 200, 4, 40, 6)
)),
schema = StructType(List(
StructField("A1", IntegerType, true),
StructField("B1", IntegerType, true),
StructField("C1", IntegerType, true),
StructField("A2", IntegerType, true),
StructField("B2", IntegerType, true),
StructField("A3", IntegerType, true)
))
)
+---+---+---+---+---+---+ | A1| B1| C1| A2| B2| A3| +---+---+---+---+---+---+ | 1| 10|100| 2| 20| 3| | 2| 20|200| 4| 40| 6| +---+---+---+---+---+---+
还有这个列前缀列表
val columnNamePrefixes: Seq[String] = Seq("A", "B", "C")
1. 查找与给定前缀匹配的列的名称
def getColumnNameGroups(df: DataFrame, columnNamePrefixes: Seq[String]): Map[String, Seq[String]] = {
columnNamePrefixes.foldLeft(Map.empty[String, Seq[String]]) { (colNameGroupsMod: Map[String, Seq[String]], colNamePrefix: String) =>
val colNames: Seq[String] = df.columns.
toList.
filter(_.startsWith(colNamePrefix))
colNameGroupsMod + (colNamePrefix -> colNames)
}
}
(A -> List(A1, A2, A3)) (B -> List(B1, B2)) (C -> List(C1))
2.通过添加以相同前缀开头的所有列来创建求和列
def addSumColumns(df: DataFrame, colNameGroups: Map[String, Seq[String]]): DataFrame = {
colNameGroups.foldLeft(df) { (modDf: DataFrame, tup: (String, Seq[String])) =>
val sumColName: String = s"${tup._1}_sum"
val columnsToSum: Seq[Column] = tup._2.map(df(_))
val modDfSum: DataFrame = modDf.withColumn(sumColName, columnsToSum.reduce(_ + _))
modDfSum
}
}
+---+---+---+---+---+---+-----+-----+-----+ | A1| B1| C1| A2| B2| A3|A_sum|B_sum|C_sum| +---+---+---+---+---+---+-----+-----+-----+ | 1| 10|100| 2| 20| 3| 6| 30| 100| | 2| 20|200| 4| 40| 6| 12| 60| 200| +---+---+---+---+---+---+-----+-----+-----+
3.通过将列与总和列除法来将列转换为其比率
def convertToRatioColumns(df: DataFrame, colNameGroups: Map[String, Seq[String]]): DataFrame = {
colNameGroups.foldLeft(df) { (modDf: DataFrame, tup: (String, Seq[String])) =>
val sumColName: String = s"${tup._1}_sum"
val modDfSum: DataFrame = tup._2.foldLeft(modDf) { (modDfTmp: DataFrame, colName: String) =>
modDfTmp.withColumn(colName, modDfTmp(colName).divide(modDfTmp(sumColName)))
}
modDfSum
}
}
+--------+-------+---+-------+-------+---+-----+-----+-----+ | A1| B1| C1| A2| B2| A3|A_sum|B_sum|C_sum| +--------+-------+---+-------+-------+---+-----+-----+-----+ |0.166666|0.33333|1.0|0.33333|0.66666|0.5| 6| 30| 100| |0.166666|0.33333|1.0|0.33333|0.66666|0.5| 12| 60| 200| +--------+-------+---+-------+-------+---+-----+-----+-----+
4.删除总和列
def dropSumColumns(df: DataFrame, colNameGroups: Map[String, Seq[String]]): DataFrame = {
colNameGroups.foldLeft(df) { (modDf: DataFrame, tup: (String, Seq[String])) =>
val sumColName: String = s"${tup._1}_sum"
modDf.drop(sumColName)
}
}
+--------+-------+---+-------+-------+---+ | A1| B1| C1| A2| B2| A3| +--------+-------+---+-------+-------+---+ |0.166666|0.33333|1.0|0.33333|0.66666|0.5| |0.166666|0.33333|1.0|0.33333|0.66666|0.5| +--------+-------+---+-------+-------+---+
我们将数据框称为df
。
可以使用df.schema.fieldNames
获取需要计算的列名的列表。
然后使用df.withColumn(colName, expr)
根据每个字段的字段名称更改值。这可能需要多次调用 df.withColumn,您可以递归执行此操作以链接操作。
您也可以使用可能更简单的.map
。
你可以这样做:)
val source_DF = spark.sparkContext.parallelize(List((1,2,3,3))).toDF("A1","B1","A2","B2")
val sum_DF = source_DF.withColumn("SUM", source_DF.columns.map(c => col(c)).reduce((c1, c2) => c1 + c2))
val proportions_DF = sum_DF.withColumn("A1", col("A1").divide(col("SUM")))
.withColumn("B1", col("B1").divide(col("SUM")))
.withColumn("A2", col("A2").divide(col("SUM")))
.withColumn("B2", col("B2").divide(col("SUM")))
val Result_DF = proportions_DF.drop("SUM").show()
+-------------------+-------------------+------------------+------------------+
| A1| B1| A2| B2|
+-------------------+-------------------+------------------+------------------+
| 0.1111111111111111| 0.2222222222222222|0.3333333333333333|0.3333333333333333|